You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/29 04:38:14 UTC
spark git commit: [SPARK-19152][SQL][FOLLOWUP] simplify
CreateHiveTableAsSelectCommand
Repository: spark
Updated Branches:
refs/heads/master cfcfc92f7 -> f7c07db85
[SPARK-19152][SQL][FOLLOWUP] simplify CreateHiveTableAsSelectCommand
## What changes were proposed in this pull request?
After https://github.com/apache/spark/pull/16552 , `CreateHiveTableAsSelectCommand` becomes very similar to `CreateDataSourceTableAsSelectCommand`, and we can further simplify it by only creating table in the table-not-exist branch.
This PR also adds hive provider checking in DataStream reader/writer, which is missed in #16552
## How was this patch tested?
N/A
Author: Wenchen Fan <we...@databricks.com>
Closes #16693 from cloud-fan/minor.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7c07db8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7c07db8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7c07db8
Branch: refs/heads/master
Commit: f7c07db852f22d694ca49792e4ceae04d45b71ef
Parents: cfcfc92
Author: Wenchen Fan <we...@databricks.com>
Authored: Sat Jan 28 20:38:03 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Jan 28 20:38:03 2017 -0800
----------------------------------------------------------------------
.../spark/sql/streaming/DataStreamReader.scala | 6 ++
.../spark/sql/streaming/DataStreamWriter.scala | 6 ++
.../CreateHiveTableAsSelectCommand.scala | 66 ++++++++------------
.../spark/sql/hive/execution/HiveDDLSuite.scala | 13 +++-
4 files changed, 49 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 7db9d92..6d2cede 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType
@@ -116,6 +117,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* @since 2.0.0
*/
def load(): DataFrame = {
+ if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
+ throw new AnalysisException("Hive data source can only be used with tables, you can not " +
+ "read files of Hive data source directly.")
+ }
+
val dataSource =
DataSource(
sparkSession,
http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 7e7a1ba..0f7a337 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink}
@@ -221,6 +222,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* @since 2.0.0
*/
def start(): StreamingQuery = {
+ if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
+ throw new AnalysisException("Hive data source can only be used with tables, you can not " +
+ "write files of Hive data source directly.")
+ }
+
if (source == "memory") {
assertNotPartitioned("memory")
if (extraOptions.get("queryName").isEmpty) {
http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 2c754d7..41c6b18 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.MetastoreRelation
/**
@@ -44,40 +44,6 @@ case class CreateHiveTableAsSelectCommand(
override def innerChildren: Seq[LogicalPlan] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
- lazy val metastoreRelation: MetastoreRelation = {
- import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
- import org.apache.hadoop.io.Text
- import org.apache.hadoop.mapred.TextInputFormat
-
- val withFormat =
- tableDesc.withNewStorage(
- inputFormat =
- tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
- outputFormat =
- tableDesc.storage.outputFormat
- .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
- serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)),
- compressed = tableDesc.storage.compressed)
-
- val withSchema = if (withFormat.schema.isEmpty) {
- tableDesc.copy(schema = query.schema)
- } else {
- withFormat
- }
-
- sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true)
-
- // Get the Metastore Relation
- sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
- case SubqueryAlias(_, r: SimpleCatalogRelation, _) =>
- val tableMeta = r.metadata
- MetastoreRelation(tableMeta.database, tableMeta.identifier.table)(tableMeta, sparkSession)
- }
- }
- // TODO ideally, we should get the output data ready first and then
- // add the relation into catalog, just in case of failure occurs while data
- // processing.
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
@@ -89,12 +55,30 @@ case class CreateHiveTableAsSelectCommand(
// Since the table already exists and the save mode is Ignore, we will just return.
return Seq.empty
}
- sparkSession.sessionState.executePlan(InsertIntoTable(
- metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd
+
+ sparkSession.sessionState.executePlan(
+ InsertIntoTable(
+ UnresolvedRelation(tableIdentifier),
+ Map(),
+ query,
+ overwrite = false,
+ ifNotExists = false)).toRdd
} else {
+ // TODO ideally, we should get the output data ready first and then
+ // add the relation into catalog, just in case of failure occurs while data
+ // processing.
+ assert(tableDesc.schema.isEmpty)
+ sparkSession.sessionState.catalog.createTable(
+ tableDesc.copy(schema = query.schema), ignoreIfExists = false)
+
try {
- sparkSession.sessionState.executePlan(InsertIntoTable(
- metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
+ sparkSession.sessionState.executePlan(
+ InsertIntoTable(
+ UnresolvedRelation(tableIdentifier),
+ Map(),
+ query,
+ overwrite = true,
+ ifNotExists = false)).toRdd
} catch {
case NonFatal(e) =>
// drop the created table.
http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 2827183..58be079 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
- import spark.implicits._
+ import testImplicits._
override def afterEach(): Unit = {
try {
@@ -1425,6 +1425,17 @@ class HiveDDLSuite
Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath)
}
assert(e2.message.contains("Hive data source can only be used with tables"))
+
+ val e3 = intercept[AnalysisException] {
+ spark.readStream.format("hive").load(dir.getAbsolutePath)
+ }
+ assert(e3.message.contains("Hive data source can only be used with tables"))
+
+ val e4 = intercept[AnalysisException] {
+ spark.readStream.schema(new StructType()).parquet(dir.getAbsolutePath)
+ .writeStream.format("hive").start(dir.getAbsolutePath)
+ }
+ assert(e4.message.contains("Hive data source can only be used with tables"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org