You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/12/04 15:20:31 UTC

[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22514#discussion_r238706120
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala ---
    @@ -95,9 +77,127 @@ case class CreateHiveTableAsSelectCommand(
         Seq.empty[Row]
       }
     
    +  // Returns `DataWritingCommand` used to write data when the table exists.
    +  def writingCommandForExistingTable(
    +    catalog: SessionCatalog,
    +    tableDesc: CatalogTable): DataWritingCommand
    +
    +  // Returns `DataWritingCommand` used to write data when the table doesn't exist.
    +  def writingCommandForNewTable(
    +    catalog: SessionCatalog,
    +    tableDesc: CatalogTable): DataWritingCommand
    +
       override def argString: String = {
         s"[Database:${tableDesc.database}, " +
         s"TableName: ${tableDesc.identifier.table}, " +
         s"InsertIntoHiveTable]"
       }
     }
    +
    +/**
    + * Create table and insert the query result into it.
    + *
    + * @param tableDesc the table description, which may contain serde, storage handler etc.
    + * @param query the query whose result will be insert into the new relation
    + * @param mode SaveMode
    + */
    +case class CreateHiveTableAsSelectCommand(
    +    tableDesc: CatalogTable,
    +    query: LogicalPlan,
    +    outputColumnNames: Seq[String],
    +    mode: SaveMode)
    +  extends CreateHiveTableAsSelectBase {
    +
    +  override def writingCommandForExistingTable(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): DataWritingCommand = {
    +    InsertIntoHiveTable(
    +      tableDesc,
    +      Map.empty,
    +      query,
    +      overwrite = false,
    +      ifPartitionNotExists = false,
    +      outputColumnNames = outputColumnNames)
    +  }
    +
    +  override def writingCommandForNewTable(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): DataWritingCommand = {
    +    // For CTAS, there is no static partition values to insert.
    +    val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
    +    InsertIntoHiveTable(
    +      tableDesc,
    +      partition,
    +      query,
    +      overwrite = true,
    +      ifPartitionNotExists = false,
    +      outputColumnNames = outputColumnNames)
    +  }
    +}
    +
    +/**
    + * Create table and insert the query result into it. This creates Hive table but inserts
    + * the query result into it by using data source.
    + *
    + * @param tableDesc the table description, which may contain serde, storage handler etc.
    + * @param query the query whose result will be insert into the new relation
    + * @param mode SaveMode
    + */
    +case class OptimizedCreateHiveTableAsSelectCommand(
    +    tableDesc: CatalogTable,
    +    query: LogicalPlan,
    +    outputColumnNames: Seq[String],
    +    mode: SaveMode)
    +  extends CreateHiveTableAsSelectBase {
    +
    +  private def getHadoopRelation(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): HadoopFsRelation = {
    +    val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
    +    val hiveTable = DDLUtils.readHiveTable(tableDesc)
    +
    +    metastoreCatalog.convert(hiveTable) match {
    +      case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
    +      case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " +
    +        "HadoopFsRelation.")
    +    }
    +  }
    +
    +  override def writingCommandForExistingTable(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): DataWritingCommand = {
    +    val hadoopRelation = getHadoopRelation(catalog, tableDesc)
    +    InsertIntoHadoopFsRelationCommand(
    +      hadoopRelation.location.rootPaths.head,
    +      Map.empty, // We don't support to convert partitioned table.
    +      false,
    +      Seq.empty, // We don't support to convert partitioned table.
    +      hadoopRelation.bucketSpec,
    +      hadoopRelation.fileFormat,
    +      hadoopRelation.options,
    +      query,
    +      mode,
    +      Some(tableDesc),
    +      Some(hadoopRelation.location),
    +      query.output.map(_.name))
    +  }
    +
    +  override def writingCommandForNewTable(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): DataWritingCommand = {
    +    val hadoopRelation = getHadoopRelation(catalog, tableDesc)
    +    InsertIntoHadoopFsRelationCommand(
    +      hadoopRelation.location.rootPaths.head,
    +      Map.empty, // We don't support to convert partitioned table.
    +      false,
    +      Seq.empty, // We don't support to convert partitioned table.
    +      hadoopRelation.bucketSpec,
    +      hadoopRelation.fileFormat,
    +      hadoopRelation.options,
    +      query,
    +      SaveMode.Overwrite,
    --- End diff --
    
    if the only difference is this `mode`, maybe we can further deduplicate the code. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org