You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wangyum <gi...@git.apache.org> on 2018/08/27 01:46:50 UTC

[GitHub] spark pull request #20020: [SPARK-22834][SQL] Make insertion commands have r...

Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20020#discussion_r212850054
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala ---
    @@ -20,30 +20,32 @@ package org.apache.spark.sql.execution.command
     import org.apache.hadoop.conf.Configuration
     
     import org.apache.spark.SparkContext
    -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
    +import org.apache.spark.sql.execution.SparkPlan
     import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
    +import org.apache.spark.sql.execution.datasources.FileFormatWriter
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.util.SerializableConfiguration
     
    -
     /**
    - * A special `RunnableCommand` which writes data out and updates metrics.
    + * A special `Command` which writes data out and updates metrics.
      */
    -trait DataWritingCommand extends RunnableCommand {
    -
    +trait DataWritingCommand extends Command {
       /**
        * The input query plan that produces the data to be written.
    +   * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns
    +   *            to [[FileFormatWriter]].
        */
       def query: LogicalPlan
     
    -  // We make the input `query` an inner child instead of a child in order to hide it from the
    -  // optimizer. This is because optimizer may not preserve the output schema names' case, and we
    -  // have to keep the original analyzed plan here so that we can pass the corrected schema to the
    -  // writer. The schema of analyzed plan is what user expects(or specifies), so we should respect
    -  // it when writing.
    -  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
    +  override final def children: Seq[LogicalPlan] = query :: Nil
     
    -  override lazy val metrics: Map[String, SQLMetric] = {
    +  // Output columns of the analyzed input query plan
    +  def outputColumns: Seq[Attribute]
    --- End diff --
    
    `outputColumns` changed from analyzed to optimized. For example:
    ```scala
    withTempDir { dir =>
      val path = dir.getCanonicalPath
      val cnt = 30
      val table1Path = s"$path/table1"
      val table3Path = s"$path/table3"
      spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id % 3 as bigint) as col2")
        .write.mode(SaveMode.Overwrite).parquet(table1Path)
      withTable("table1", "table3") {
        spark.sql(
          s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$table1Path/'")
        spark.sql("CREATE TABLE table3(COL1 bigint, COL2 bigint) using parquet " +
          "PARTITIONED BY (COL2) " +
          s"CLUSTERED BY (COL1) INTO 2 BUCKETS location '$table3Path/'")
    
        withView("view1") {
          spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20")
          spark.sql("INSERT OVERWRITE TABLE table3 select COL1, COL2 from view1 CLUSTER BY COL1")
          spark.table("table3").show
        }
      }
    }
    ```
    ```
    outputColumns: List(COL1#19L, COL2#20L)
    outputColumns: List(COL1#19L, COL2#20L)
    outputColumns: List(COL1#19L, COL2#20L)
    outputColumns: List(COL1#19L, COL2#20L)
    outputColumns: List(COL1#19L, COL2#20L)
    outputColumns: List(COL1#19L, COL2#20L)
    outputColumns: List(COL1#19L, COL2#20L)
    outputColumns: List(COL1#19L, COL2#20L)
    outputColumns: List(col1#16L, col2#17L)
    outputColumns: List(col1#16L, col2#17L)
    outputColumns: List(col1#16L, col2#17L)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org