You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/12/04 15:20:31 UTC
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22514#discussion_r238706120
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala ---
@@ -95,9 +77,127 @@ case class CreateHiveTableAsSelectCommand(
Seq.empty[Row]
}
+ // Returns `DataWritingCommand` used to write data when the table exists.
+ def writingCommandForExistingTable(
+ catalog: SessionCatalog,
+ tableDesc: CatalogTable): DataWritingCommand
+
+ // Returns `DataWritingCommand` used to write data when the table doesn't exist.
+ def writingCommandForNewTable(
+ catalog: SessionCatalog,
+ tableDesc: CatalogTable): DataWritingCommand
+
override def argString: String = {
s"[Database:${tableDesc.database}, " +
s"TableName: ${tableDesc.identifier.table}, " +
s"InsertIntoHiveTable]"
}
}
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+ tableDesc: CatalogTable,
+ query: LogicalPlan,
+ outputColumnNames: Seq[String],
+ mode: SaveMode)
+ extends CreateHiveTableAsSelectBase {
+
+ override def writingCommandForExistingTable(
+ catalog: SessionCatalog,
+ tableDesc: CatalogTable): DataWritingCommand = {
+ InsertIntoHiveTable(
+ tableDesc,
+ Map.empty,
+ query,
+ overwrite = false,
+ ifPartitionNotExists = false,
+ outputColumnNames = outputColumnNames)
+ }
+
+ override def writingCommandForNewTable(
+ catalog: SessionCatalog,
+ tableDesc: CatalogTable): DataWritingCommand = {
+ // For CTAS, there is no static partition values to insert.
+ val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
+ InsertIntoHiveTable(
+ tableDesc,
+ partition,
+ query,
+ overwrite = true,
+ ifPartitionNotExists = false,
+ outputColumnNames = outputColumnNames)
+ }
+}
+
+/**
+ * Create table and insert the query result into it. This creates Hive table but inserts
+ * the query result into it by using data source.
+ *
+ * @param tableDesc the table description, which may contain serde, storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class OptimizedCreateHiveTableAsSelectCommand(
+ tableDesc: CatalogTable,
+ query: LogicalPlan,
+ outputColumnNames: Seq[String],
+ mode: SaveMode)
+ extends CreateHiveTableAsSelectBase {
+
+ private def getHadoopRelation(
+ catalog: SessionCatalog,
+ tableDesc: CatalogTable): HadoopFsRelation = {
+ val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
+ val hiveTable = DDLUtils.readHiveTable(tableDesc)
+
+ metastoreCatalog.convert(hiveTable) match {
+ case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
+ case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " +
+ "HadoopFsRelation.")
+ }
+ }
+
+ override def writingCommandForExistingTable(
+ catalog: SessionCatalog,
+ tableDesc: CatalogTable): DataWritingCommand = {
+ val hadoopRelation = getHadoopRelation(catalog, tableDesc)
+ InsertIntoHadoopFsRelationCommand(
+ hadoopRelation.location.rootPaths.head,
+ Map.empty, // We don't support to convert partitioned table.
+ false,
+ Seq.empty, // We don't support to convert partitioned table.
+ hadoopRelation.bucketSpec,
+ hadoopRelation.fileFormat,
+ hadoopRelation.options,
+ query,
+ mode,
+ Some(tableDesc),
+ Some(hadoopRelation.location),
+ query.output.map(_.name))
+ }
+
+ override def writingCommandForNewTable(
+ catalog: SessionCatalog,
+ tableDesc: CatalogTable): DataWritingCommand = {
+ val hadoopRelation = getHadoopRelation(catalog, tableDesc)
+ InsertIntoHadoopFsRelationCommand(
+ hadoopRelation.location.rootPaths.head,
+ Map.empty, // We don't support to convert partitioned table.
+ false,
+ Seq.empty, // We don't support to convert partitioned table.
+ hadoopRelation.bucketSpec,
+ hadoopRelation.fileFormat,
+ hadoopRelation.options,
+ query,
+ SaveMode.Overwrite,
--- End diff --
if the only difference is this `mode`, maybe we can further deduplicate the code.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org