You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/22 06:48:59 UTC

spark git commit: [SPARK-17625][SQL] set expectedOutputAttributes when converting SimpleCatalogRelation to LogicalRelation

Repository: spark
Updated Branches:
  refs/heads/master 3a80f92f8 -> de7df7def


[SPARK-17625][SQL] set expectedOutputAttributes when converting SimpleCatalogRelation to LogicalRelation

## What changes were proposed in this pull request?

We should set expectedOutputAttributes when converting SimpleCatalogRelation to LogicalRelation, otherwise the outputs of LogicalRelation are different from outputs of SimpleCatalogRelation - they have different exprId's.

## How was this patch tested?

add a test case

Author: Zhenhua Wang <wz...@163.com>

Closes #15182 from wzhfy/expectedAttributes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de7df7de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de7df7de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de7df7de

Branch: refs/heads/master
Commit: de7df7defc99e04fefd990974151a701f64b75b4
Parents: 3a80f92
Author: Zhenhua Wang <wz...@163.com>
Authored: Thu Sep 22 14:48:49 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Sep 22 14:48:49 2016 +0800

----------------------------------------------------------------------
 .../execution/datasources/DataSourceStrategy.scala    | 10 +++++++---
 .../scala/org/apache/spark/sql/DataFrameSuite.scala   | 14 +++++++++++++-
 2 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de7df7de/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index c8ad5b3..63f01c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -197,7 +197,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
  * source information.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
-  private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
+  private def readDataSourceTable(
+      sparkSession: SparkSession,
+      simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
+    val table = simpleCatalogRelation.catalogTable
     val dataSource =
       DataSource(
         sparkSession,
@@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
 
     LogicalRelation(
       dataSource.resolveRelation(),
+      expectedOutputAttributes = Some(simpleCatalogRelation.output),
       catalogTable = Some(table))
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
         if DDLUtils.isDatasourceTable(s.metadata) =>
-      i.copy(table = readDataSourceTable(sparkSession, s.metadata))
+      i.copy(table = readDataSourceTable(sparkSession, s))
 
     case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
-      readDataSourceTable(sparkSession, s.metadata)
+      readDataSourceTable(sparkSession, s)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de7df7de/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c2d256b..2c60a7d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -26,7 +26,8 @@ import scala.util.Random
 import org.scalatest.Matchers._
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, Union}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
@@ -1585,4 +1586,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     val d = sampleDf.withColumn("c", monotonically_increasing_id).select($"c").collect
     assert(d.size == d.distinct.size)
   }
+
+  test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") {
+    val tableName = "tbl"
+    withTable(tableName) {
+      spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
+      val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
+      val expr = relation.resolve("i")
+      val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
+      qe.assertAnalyzed()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org