You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/04/07 07:58:57 UTC
spark git commit: [SPARK-20245][SQL][MINOR] pass output to
LogicalRelation directly
Repository: spark
Updated Branches:
refs/heads/master 626b4cafc -> ad3cc1312
[SPARK-20245][SQL][MINOR] pass output to LogicalRelation directly
## What changes were proposed in this pull request?
Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic
## How was this patch tested?
existing tests
Author: Wenchen Fan <we...@databricks.com>
Closes #17552 from cloud-fan/minor.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad3cc131
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad3cc131
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad3cc131
Branch: refs/heads/master
Commit: ad3cc1312db3b5667cea134940a09896a4609b74
Parents: 626b4ca
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Apr 7 15:58:50 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Apr 7 15:58:50 2017 +0800
----------------------------------------------------------------------
.../spark/sql/catalyst/catalog/interface.scala | 8 ++--
.../datasources/DataSourceStrategy.scala | 15 ++++----
.../execution/datasources/LogicalRelation.scala | 39 +++++++-------------
.../datasources/PruneFileSourcePartitions.scala | 4 +-
.../spark/sql/sources/PathOptionSuite.scala | 19 +++++-----
.../spark/sql/hive/HiveMetastoreCatalog.scala | 13 +++++--
.../spark/sql/hive/CachedTableSuite.scala | 4 +-
.../PruneFileSourcePartitionsSuite.scala | 2 +-
8 files changed, 49 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index dc2e404..360e55d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -27,7 +27,7 @@ import com.google.common.base.Objects
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -403,14 +403,14 @@ object CatalogTypes {
*/
case class CatalogRelation(
tableMeta: CatalogTable,
- dataCols: Seq[Attribute],
- partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
+ dataCols: Seq[AttributeReference],
+ partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
assert(tableMeta.identifier.database.isDefined)
assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
assert(tableMeta.dataSchema.sameType(dataCols.toStructType))
// The partition column should always appear after data columns.
- override def output: Seq[Attribute] = dataCols ++ partitionCols
+ override def output: Seq[AttributeReference] = dataCols ++ partitionCols
def isPartitioned: Boolean = partitionCols.nonEmpty
http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e5c7c38..2d83d51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -231,16 +231,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
options = table.storage.properties ++ pathOption,
catalogTable = Some(table))
- LogicalRelation(
- dataSource.resolveRelation(checkFilesExist = false),
- catalogTable = Some(table))
+ LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
}
}).asInstanceOf[LogicalRelation]
- // It's possible that the table schema is empty and need to be inferred at runtime. We should
- // not specify expected outputs for this case.
- val expectedOutputs = if (r.output.isEmpty) None else Some(r.output)
- plan.copy(expectedOutputAttributes = expectedOutputs)
+ if (r.output.isEmpty) {
+ // It's possible that the table schema is empty and need to be inferred at runtime. For this
+ // case, we don't need to change the output of the cached plan.
+ plan
+ } else {
+ plan.copy(output = r.output)
+ }
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 3b14b79..4215203 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
@@ -26,31 +26,13 @@ import org.apache.spark.util.Utils
/**
* Used to link a [[BaseRelation]] in to a logical query plan.
- *
- * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without
- * changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for
- * this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details.
*/
case class LogicalRelation(
relation: BaseRelation,
- expectedOutputAttributes: Option[Seq[Attribute]] = None,
- catalogTable: Option[CatalogTable] = None)
+ output: Seq[AttributeReference],
+ catalogTable: Option[CatalogTable])
extends LeafNode with MultiInstanceRelation {
- override val output: Seq[AttributeReference] = {
- val attrs = relation.schema.toAttributes
- expectedOutputAttributes.map { expectedAttrs =>
- assert(expectedAttrs.length == attrs.length)
- attrs.zip(expectedAttrs).map {
- // We should respect the attribute names provided by base relation and only use the
- // exprId in `expectedOutputAttributes`.
- // The reason is that, some relations(like parquet) will reconcile attribute names to
- // workaround case insensitivity issue.
- case (attr, expected) => attr.withExprId(expected.exprId)
- }
- }.getOrElse(attrs)
- }
-
// Logical Relations are distinct if they have different output for the sake of transformations.
override def equals(other: Any): Boolean = other match {
case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
@@ -87,11 +69,8 @@ case class LogicalRelation(
* unique expression ids. We respect the `expectedOutputAttributes` and create
* new instances of attributes in it.
*/
- override def newInstance(): this.type = {
- LogicalRelation(
- relation,
- expectedOutputAttributes.map(_.map(_.newInstance())),
- catalogTable).asInstanceOf[this.type]
+ override def newInstance(): LogicalRelation = {
+ this.copy(output = output.map(_.newInstance()))
}
override def refresh(): Unit = relation match {
@@ -101,3 +80,11 @@ case class LogicalRelation(
override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
}
+
+object LogicalRelation {
+ def apply(relation: BaseRelation): LogicalRelation =
+ LogicalRelation(relation, relation.schema.toAttributes, None)
+
+ def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
+ LogicalRelation(relation, relation.schema.toAttributes, Some(table))
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 8566a80..905b868 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -59,9 +59,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileIndex)(sparkSession)
- val prunedLogicalRelation = logicalRelation.copy(
- relation = prunedFsRelation,
- expectedOutputAttributes = Some(logicalRelation.output))
+ val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)
http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index 60adee4..6dd4847 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -75,13 +75,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '/tmp/path')
""".stripMargin)
- assert(getPathOption("src") == Some("file:/tmp/path"))
+ assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path")))
}
// should exist even path option is not specified when creating table
withTable("src") {
sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
- assert(getPathOption("src") == Some(CatalogUtils.URIToString(defaultTablePath("src"))))
+ assert(getPathOption("src").map(makeQualifiedPath) == Some(defaultTablePath("src")))
}
}
@@ -95,9 +95,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|OPTIONS (PATH '$p')
|AS SELECT 1
""".stripMargin)
- assert(CatalogUtils.stringToURI(
- spark.table("src").schema.head.metadata.getString("path")) ==
- makeQualifiedPath(p.getAbsolutePath))
+ assert(
+ spark.table("src").schema.head.metadata.getString("path") ==
+ p.getAbsolutePath)
}
}
@@ -109,8 +109,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|AS SELECT 1
""".stripMargin)
- assert(spark.table("src").schema.head.metadata.getString("path") ==
- CatalogUtils.URIToString(defaultTablePath("src")))
+ assert(
+ makeQualifiedPath(spark.table("src").schema.head.metadata.getString("path")) ==
+ defaultTablePath("src"))
}
}
@@ -122,13 +123,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '/tmp/path')""".stripMargin)
sql("ALTER TABLE src SET LOCATION '/tmp/path2'")
- assert(getPathOption("src") == Some("/tmp/path2"))
+ assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path2")))
}
withTable("src", "src2") {
sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
sql("ALTER TABLE src RENAME TO src2")
- assert(getPathOption("src2") == Some(CatalogUtils.URIToString(defaultTablePath("src2"))))
+ assert(getPathOption("src2").map(makeQualifiedPath) == Some(defaultTablePath("src2")))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 10f4325..6b98066 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -175,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
bucketSpec = None,
fileFormat = fileFormat,
options = options)(sparkSession = sparkSession)
- val created = LogicalRelation(fsRelation, catalogTable = Some(updatedTable))
+ val created = LogicalRelation(fsRelation, updatedTable)
tableRelationCache.put(tableIdentifier, created)
created
}
@@ -203,7 +203,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
bucketSpec = None,
options = options,
className = fileType).resolveRelation(),
- catalogTable = Some(updatedTable))
+ table = updatedTable)
tableRelationCache.put(tableIdentifier, created)
created
@@ -212,7 +212,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
logicalRelation
})
}
- result.copy(expectedOutputAttributes = Some(relation.output))
+ // The inferred schema may have different filed names as the table schema, we should respect
+ // it, but also respect the exprId in table relation output.
+ assert(result.output.length == relation.output.length &&
+ result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
+ val newOutput = result.output.zip(relation.output).map {
+ case (a1, a2) => a1.withExprId(a2.exprId)
+ }
+ result.copy(output = newOutput)
}
private def inferIfNeeded(
http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 2b3f360..d3cbf89 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -329,7 +329,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
- val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
+ val plan = LogicalRelation(relation, tableMeta)
spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))
assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
@@ -342,7 +342,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
- val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta))
+ val samePlan = LogicalRelation(sameRelation, tableMeta)
assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index cd8f94b..f818e29 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -58,7 +58,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
- val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta))
+ val logicalRelation = LogicalRelation(relation, tableMeta)
val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze
val optimized = Optimize.execute(query)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org