You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/09/02 22:10:20 UTC

spark git commit: [SPARK-17230] [SQL] Should not pass optimized query into QueryExecution in DataFrameWriter

Repository: spark
Updated Branches:
  refs/heads/master eac1d0e92 -> ed9c884dc


[SPARK-17230] [SQL] Should not pass optimized query into QueryExecution in DataFrameWriter

## What changes were proposed in this pull request?

Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs.

For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result.

Ideally, we should make all the analyzer rules all idempotent, that may require lots of effort to double checking them one by one (may be not easy).

An easier approach could be never feed a optimized plan into Analyzer, this PR fix the case for RunnableComand, they will be optimized, during execution, the passed `query` will also be passed into QueryExecution again. This PR make these `query` not part of the children, so they will not be optimized and analyzed again.

Right now, we did not know a logical plan is optimized or not, we could introduce a flag for that, and make sure a optimized logical plan will not be analyzed again.

## How was this patch tested?

Added regression tests.

Author: Davies Liu <da...@databricks.com>

Closes #14797 from davies/fix_writer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed9c884d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed9c884d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed9c884d

Branch: refs/heads/master
Commit: ed9c884dcf925500ceb388b06b33bd2c95cd2ada
Parents: eac1d0e
Author: Davies Liu <da...@databricks.com>
Authored: Fri Sep 2 15:10:12 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Fri Sep 2 15:10:12 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/command/commands.scala  |  2 +-
 .../sql/execution/command/createDataSourceTables.scala |  2 +-
 .../spark/sql/execution/datasources/DataSource.scala   | 13 ++++++++++++-
 .../sql/execution/datasources/DataSourceStrategy.scala |  2 +-
 .../InsertIntoHadoopFsRelationCommand.scala            |  2 +-
 .../spark/sql/test/DataFrameReaderWriterSuite.scala    |  8 ++++++++
 .../execution/CreateHiveTableAsSelectCommand.scala     |  2 +-
 7 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ed9c884d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index cce1489..424a962 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types._
  */
 trait RunnableCommand extends LogicalPlan with logical.Command {
   override def output: Seq[Attribute] = Seq.empty
-  override def children: Seq[LogicalPlan] = Seq.empty
+  final override def children: Seq[LogicalPlan] = Seq.empty
   def run(sparkSession: SparkSession): Seq[Row]
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ed9c884d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index da3f6c6..c7e3279 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -113,7 +113,7 @@ case class CreateDataSourceTableAsSelectCommand(
     query: LogicalPlan)
   extends RunnableCommand {
 
-  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     assert(table.tableType != CatalogTableType.VIEW)

http://git-wip-us.apache.org/repos/asf/spark/blob/ed9c884d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 3485308..5968db8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -479,13 +480,23 @@ case class DataSource(
           }
         }
 
+        // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
+        // not need to have the query as child, to avoid to analyze an optimized query,
+        // because InsertIntoHadoopFsRelationCommand will be optimized first.
+        val columns = partitionColumns.map { name =>
+          val plan = data.logicalPlan
+          plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+            throw new AnalysisException(
+              s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]")
+          }.asInstanceOf[Attribute]
+        }
         // For partitioned relation r, r.schema's column ordering can be different from the column
         // ordering of data.logicalPlan (partition columns are all moved after data column).  This
         // will be adjusted within InsertIntoHadoopFsRelation.
         val plan =
           InsertIntoHadoopFsRelationCommand(
             outputPath,
-            partitionColumns.map(UnresolvedAttribute.quoted),
+            columns,
             bucketSpec,
             format,
             () => Unit, // No existing table needs to be refreshed.

http://git-wip-us.apache.org/repos/asf/spark/blob/ed9c884d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a662105..8286467 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -181,7 +181,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
 
       InsertIntoHadoopFsRelationCommand(
         outputPath,
-        t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)),
+        query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
         t.bucketSpec,
         t.fileFormat,
         () => t.refresh(),

http://git-wip-us.apache.org/repos/asf/spark/blob/ed9c884d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index de82218..02ce7fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -66,7 +66,7 @@ case class InsertIntoHadoopFsRelationCommand(
     mode: SaveMode)
   extends RunnableCommand {
 
-  override def children: Seq[LogicalPlan] = query :: Nil
+  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     // Most formats don't do well with duplicate columns, so lets not allow that

http://git-wip-us.apache.org/repos/asf/spark/blob/ed9c884d/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 05935ce..63b0e45 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -449,6 +449,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
     }
   }
 
+  test("SPARK-17230: write out results of decimal calculation") {
+    val df = spark.range(99, 101)
+      .selectExpr("id", "cast(id as long) * cast('1.0' as decimal(38, 18)) as num")
+    df.write.mode(SaveMode.Overwrite).parquet(dir)
+    val df2 = spark.read.parquet(dir)
+    checkAnswer(df2, df)
+  }
+
   private def testRead(
       df: => DataFrame,
       expectedResult: Seq[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/ed9c884d/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 6e6b1c2..ef5a5a0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -42,7 +42,7 @@ case class CreateHiveTableAsSelectCommand(
 
   private val tableIdentifier = tableDesc.identifier
 
-  override def children: Seq[LogicalPlan] = Seq(query)
+  override def innerChildren: Seq[LogicalPlan] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     lazy val metastoreRelation: MetastoreRelation = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org