You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wangyum <gi...@git.apache.org> on 2018/08/27 01:46:50 UTC
[GitHub] spark pull request #20020: [SPARK-22834][SQL] Make insertion commands have r...
Github user wangyum commented on a diff in the pull request:
https://github.com/apache/spark/pull/20020#discussion_r212850054
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala ---
@@ -20,30 +20,32 @@ package org.apache.spark.sql.execution.command
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
+import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.SerializableConfiguration
-
/**
- * A special `RunnableCommand` which writes data out and updates metrics.
+ * A special `Command` which writes data out and updates metrics.
*/
-trait DataWritingCommand extends RunnableCommand {
-
+trait DataWritingCommand extends Command {
/**
* The input query plan that produces the data to be written.
+ * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns
+ * to [[FileFormatWriter]].
*/
def query: LogicalPlan
- // We make the input `query` an inner child instead of a child in order to hide it from the
- // optimizer. This is because optimizer may not preserve the output schema names' case, and we
- // have to keep the original analyzed plan here so that we can pass the corrected schema to the
- // writer. The schema of analyzed plan is what user expects(or specifies), so we should respect
- // it when writing.
- override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
+ override final def children: Seq[LogicalPlan] = query :: Nil
- override lazy val metrics: Map[String, SQLMetric] = {
+ // Output columns of the analyzed input query plan
+ def outputColumns: Seq[Attribute]
--- End diff --
`outputColumns` changed from analyzed to optimized. For example:
```scala
withTempDir { dir =>
val path = dir.getCanonicalPath
val cnt = 30
val table1Path = s"$path/table1"
val table3Path = s"$path/table3"
spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id % 3 as bigint) as col2")
.write.mode(SaveMode.Overwrite).parquet(table1Path)
withTable("table1", "table3") {
spark.sql(
s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$table1Path/'")
spark.sql("CREATE TABLE table3(COL1 bigint, COL2 bigint) using parquet " +
"PARTITIONED BY (COL2) " +
s"CLUSTERED BY (COL1) INTO 2 BUCKETS location '$table3Path/'")
withView("view1") {
spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20")
spark.sql("INSERT OVERWRITE TABLE table3 select COL1, COL2 from view1 CLUSTER BY COL1")
spark.table("table3").show
}
}
}
```
```
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(col1#16L, col2#17L)
outputColumns: List(col1#16L, col2#17L)
outputColumns: List(col1#16L, col2#17L)
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org