You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/04/07 07:58:57 UTC

spark git commit: [SPARK-20245][SQL][MINOR] pass output to LogicalRelation directly

Repository: spark
Updated Branches:
  refs/heads/master 626b4cafc -> ad3cc1312


[SPARK-20245][SQL][MINOR] pass output to LogicalRelation directly

## What changes were proposed in this pull request?

Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic

## How was this patch tested?

existing tests

Author: Wenchen Fan <we...@databricks.com>

Closes #17552 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad3cc131
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad3cc131
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad3cc131

Branch: refs/heads/master
Commit: ad3cc1312db3b5667cea134940a09896a4609b74
Parents: 626b4ca
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Apr 7 15:58:50 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Apr 7 15:58:50 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  8 ++--
 .../datasources/DataSourceStrategy.scala        | 15 ++++----
 .../execution/datasources/LogicalRelation.scala | 39 +++++++-------------
 .../datasources/PruneFileSourcePartitions.scala |  4 +-
 .../spark/sql/sources/PathOptionSuite.scala     | 19 +++++-----
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 13 +++++--
 .../spark/sql/hive/CachedTableSuite.scala       |  4 +-
 .../PruneFileSourcePartitionsSuite.scala        |  2 +-
 8 files changed, 49 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index dc2e404..360e55d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -27,7 +27,7 @@ import com.google.common.base.Objects
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -403,14 +403,14 @@ object CatalogTypes {
  */
 case class CatalogRelation(
     tableMeta: CatalogTable,
-    dataCols: Seq[Attribute],
-    partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
+    dataCols: Seq[AttributeReference],
+    partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
   assert(tableMeta.identifier.database.isDefined)
   assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
   assert(tableMeta.dataSchema.sameType(dataCols.toStructType))
 
   // The partition column should always appear after data columns.
-  override def output: Seq[Attribute] = dataCols ++ partitionCols
+  override def output: Seq[AttributeReference] = dataCols ++ partitionCols
 
   def isPartitioned: Boolean = partitionCols.nonEmpty
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e5c7c38..2d83d51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -231,16 +231,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
             options = table.storage.properties ++ pathOption,
             catalogTable = Some(table))
 
-        LogicalRelation(
-          dataSource.resolveRelation(checkFilesExist = false),
-          catalogTable = Some(table))
+        LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
       }
     }).asInstanceOf[LogicalRelation]
 
-    // It's possible that the table schema is empty and need to be inferred at runtime. We should
-    // not specify expected outputs for this case.
-    val expectedOutputs = if (r.output.isEmpty) None else Some(r.output)
-    plan.copy(expectedOutputAttributes = expectedOutputs)
+    if (r.output.isEmpty) {
+      // It's possible that the table schema is empty and need to be inferred at runtime. For this
+      // case, we don't need to change the output of the cached plan.
+      plan
+    } else {
+      plan.copy(output = r.output)
+    }
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 3b14b79..4215203 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.BaseRelation
@@ -26,31 +26,13 @@ import org.apache.spark.util.Utils
 
 /**
  * Used to link a [[BaseRelation]] in to a logical query plan.
- *
- * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without
- * changing the output attributes' IDs.  The `expectedOutputAttributes` parameter is used for
- * this purpose.  See https://issues.apache.org/jira/browse/SPARK-10741 for more details.
  */
 case class LogicalRelation(
     relation: BaseRelation,
-    expectedOutputAttributes: Option[Seq[Attribute]] = None,
-    catalogTable: Option[CatalogTable] = None)
+    output: Seq[AttributeReference],
+    catalogTable: Option[CatalogTable])
   extends LeafNode with MultiInstanceRelation {
 
-  override val output: Seq[AttributeReference] = {
-    val attrs = relation.schema.toAttributes
-    expectedOutputAttributes.map { expectedAttrs =>
-      assert(expectedAttrs.length == attrs.length)
-      attrs.zip(expectedAttrs).map {
-        // We should respect the attribute names provided by base relation and only use the
-        // exprId in `expectedOutputAttributes`.
-        // The reason is that, some relations(like parquet) will reconcile attribute names to
-        // workaround case insensitivity issue.
-        case (attr, expected) => attr.withExprId(expected.exprId)
-      }
-    }.getOrElse(attrs)
-  }
-
   // Logical Relations are distinct if they have different output for the sake of transformations.
   override def equals(other: Any): Boolean = other match {
     case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
@@ -87,11 +69,8 @@ case class LogicalRelation(
    * unique expression ids. We respect the `expectedOutputAttributes` and create
    * new instances of attributes in it.
    */
-  override def newInstance(): this.type = {
-    LogicalRelation(
-      relation,
-      expectedOutputAttributes.map(_.map(_.newInstance())),
-      catalogTable).asInstanceOf[this.type]
+  override def newInstance(): LogicalRelation = {
+    this.copy(output = output.map(_.newInstance()))
   }
 
   override def refresh(): Unit = relation match {
@@ -101,3 +80,11 @@ case class LogicalRelation(
 
   override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
 }
+
+object LogicalRelation {
+  def apply(relation: BaseRelation): LogicalRelation =
+    LogicalRelation(relation, relation.schema.toAttributes, None)
+
+  def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
+    LogicalRelation(relation, relation.schema.toAttributes, Some(table))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 8566a80..905b868 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -59,9 +59,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
         val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
         val prunedFsRelation =
           fsRelation.copy(location = prunedFileIndex)(sparkSession)
-        val prunedLogicalRelation = logicalRelation.copy(
-          relation = prunedFsRelation,
-          expectedOutputAttributes = Some(logicalRelation.output))
+        val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
 
         // Keep partition-pruning predicates so that they are visible in physical planning
         val filterExpression = filters.reduceLeft(And)

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index 60adee4..6dd4847 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -75,13 +75,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
            |USING ${classOf[TestOptionsSource].getCanonicalName}
            |OPTIONS (PATH '/tmp/path')
         """.stripMargin)
-      assert(getPathOption("src") == Some("file:/tmp/path"))
+      assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path")))
     }
 
     // should exist even path option is not specified when creating table
     withTable("src") {
       sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
-      assert(getPathOption("src") == Some(CatalogUtils.URIToString(defaultTablePath("src"))))
+      assert(getPathOption("src").map(makeQualifiedPath) == Some(defaultTablePath("src")))
     }
   }
 
@@ -95,9 +95,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
             |OPTIONS (PATH '$p')
             |AS SELECT 1
           """.stripMargin)
-        assert(CatalogUtils.stringToURI(
-          spark.table("src").schema.head.metadata.getString("path")) ==
-          makeQualifiedPath(p.getAbsolutePath))
+        assert(
+          spark.table("src").schema.head.metadata.getString("path") ==
+          p.getAbsolutePath)
       }
     }
 
@@ -109,8 +109,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
            |USING ${classOf[TestOptionsSource].getCanonicalName}
            |AS SELECT 1
           """.stripMargin)
-      assert(spark.table("src").schema.head.metadata.getString("path") ==
-        CatalogUtils.URIToString(defaultTablePath("src")))
+      assert(
+        makeQualifiedPath(spark.table("src").schema.head.metadata.getString("path")) ==
+        defaultTablePath("src"))
     }
   }
 
@@ -122,13 +123,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
            |USING ${classOf[TestOptionsSource].getCanonicalName}
            |OPTIONS (PATH '/tmp/path')""".stripMargin)
       sql("ALTER TABLE src SET LOCATION '/tmp/path2'")
-      assert(getPathOption("src") == Some("/tmp/path2"))
+      assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path2")))
     }
 
     withTable("src", "src2") {
       sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
       sql("ALTER TABLE src RENAME TO src2")
-      assert(getPathOption("src2") == Some(CatalogUtils.URIToString(defaultTablePath("src2"))))
+      assert(getPathOption("src2").map(makeQualifiedPath) == Some(defaultTablePath("src2")))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 10f4325..6b98066 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -175,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             bucketSpec = None,
             fileFormat = fileFormat,
             options = options)(sparkSession = sparkSession)
-          val created = LogicalRelation(fsRelation, catalogTable = Some(updatedTable))
+          val created = LogicalRelation(fsRelation, updatedTable)
           tableRelationCache.put(tableIdentifier, created)
           created
         }
@@ -203,7 +203,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 bucketSpec = None,
                 options = options,
                 className = fileType).resolveRelation(),
-              catalogTable = Some(updatedTable))
+              table = updatedTable)
 
           tableRelationCache.put(tableIdentifier, created)
           created
@@ -212,7 +212,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         logicalRelation
       })
     }
-    result.copy(expectedOutputAttributes = Some(relation.output))
+    // The inferred schema may have different filed names as the table schema, we should respect
+    // it, but also respect the exprId in table relation output.
+    assert(result.output.length == relation.output.length &&
+      result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
+    val newOutput = result.output.zip(relation.output).map {
+      case (a1, a2) => a1.withExprId(a2.exprId)
+    }
+    result.copy(output = newOutput)
   }
 
   private def inferIfNeeded(

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 2b3f360..d3cbf89 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -329,7 +329,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
         fileFormat = new ParquetFileFormat(),
         options = Map.empty)(sparkSession = spark)
 
-      val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
+      val plan = LogicalRelation(relation, tableMeta)
       spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))
 
       assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
@@ -342,7 +342,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
         bucketSpec = None,
         fileFormat = new ParquetFileFormat(),
         options = Map.empty)(sparkSession = spark)
-      val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta))
+      val samePlan = LogicalRelation(sameRelation, tableMeta)
 
       assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index cd8f94b..f818e29 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -58,7 +58,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
           fileFormat = new ParquetFileFormat(),
           options = Map.empty)(sparkSession = spark)
 
-        val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta))
+        val logicalRelation = LogicalRelation(relation, tableMeta)
         val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze
 
         val optimized = Optimize.execute(query)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org