You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/08/17 20:47:18 UTC
spark git commit: [SPARK-18394][SQL] Make an AttributeSet.toSeq
output order consistent
Repository: spark
Updated Branches:
refs/heads/master ae9e42479 -> 6aad02d03
[SPARK-18394][SQL] Make an AttributeSet.toSeq output order consistent
## What changes were proposed in this pull request?
This pr sorted output attributes on their name and exprId in `AttributeSet.toSeq` to make the order consistent. If the order is different, spark possibly generates different code and then misses cache in `CodeGenerator`, e.g., `GenerateColumnAccessor` generates code depending on an input attribute order.
## How was this patch tested?
Added tests in `AttributeSetSuite` and manually checked if the cache worked well in the given query of the JIRA.
Author: Takeshi Yamamuro <ya...@apache.org>
Closes #18959 from maropu/SPARK-18394.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6aad02d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6aad02d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6aad02d0
Branch: refs/heads/master
Commit: 6aad02d03632df964363a144c96371e86f7b207e
Parents: ae9e424
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Thu Aug 17 22:47:14 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Aug 17 22:47:14 2017 +0200
----------------------------------------------------------------------
.../sql/catalyst/expressions/AttributeSet.scala | 7 +++-
.../expressions/AttributeSetSuite.scala | 40 ++++++++++++++++++++
.../spark/sql/hive/execution/PruningSuite.scala | 7 +++-
3 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6aad02d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
index b77f933..7420b6b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
@@ -121,7 +121,12 @@ class AttributeSet private (val baseSet: Set[AttributeEquals])
// We must force toSeq to not be strict otherwise we end up with a [[Stream]] that captures all
// sorts of things in its closure.
- override def toSeq: Seq[Attribute] = baseSet.map(_.a).toArray.toSeq
+ override def toSeq: Seq[Attribute] = {
+ // We need to keep a deterministic output order for `baseSet` because this affects a variable
+ // order in generated code (e.g., `GenerateColumnAccessor`).
+ // See SPARK-18394 for details.
+ baseSet.map(_.a).toSeq.sortBy { a => (a.name, a.exprId.id) }
+ }
override def toString: String = "{" + baseSet.map(_.a).mkString(", ") + "}"
http://git-wip-us.apache.org/repos/asf/spark/blob/6aad02d0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala
index 273f95f..b6e8b66 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala
@@ -78,4 +78,44 @@ class AttributeSetSuite extends SparkFunSuite {
assert(aSet == aSet)
assert(aSet == AttributeSet(aUpper :: Nil))
}
+
+ test("SPARK-18394 keep a deterministic output order along with attribute names and exprIds") {
+ // Checks a simple case
+ val attrSeqA = {
+ val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(1098))
+ val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(107))
+ val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(838))
+ val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil)
+
+ val attr4 = AttributeReference("c4", IntegerType)(exprId = ExprId(389))
+ val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(89329))
+
+ val attrSetB = AttributeSet(attr4 :: attr5 :: Nil)
+ (attrSetA ++ attrSetB).toSeq.map(_.name)
+ }
+
+ val attrSeqB = {
+ val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(392))
+ val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(92))
+ val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(87))
+ val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil)
+
+ val attr4 = AttributeReference("c4", IntegerType)(exprId = ExprId(9023920))
+ val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(522))
+ val attrSetB = AttributeSet(attr4 :: attr5 :: Nil)
+
+ (attrSetA ++ attrSetB).toSeq.map(_.name)
+ }
+
+ assert(attrSeqA === attrSeqB)
+
+ // Checks the same column names having different exprIds
+ val attr1 = AttributeReference("c", IntegerType)(exprId = ExprId(1098))
+ val attr2 = AttributeReference("c", IntegerType)(exprId = ExprId(107))
+ val attrSetA = AttributeSet(attr1 :: attr2 :: Nil)
+ val attr3 = AttributeReference("c", IntegerType)(exprId = ExprId(389))
+ val attrSetB = AttributeSet(attr3 :: Nil)
+
+ assert((attrSetA ++ attrSetB).toSeq === attr2 :: attr3 :: attr1 :: Nil)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6aad02d0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index d535bef..cc592cf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -162,7 +162,12 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
}.head
assert(actualOutputColumns === expectedOutputColumns, "Output columns mismatch")
- assert(actualScannedColumns === expectedScannedColumns, "Scanned columns mismatch")
+
+ // Scanned columns in `HiveTableScanExec` are generated by the `pruneFilterProject` method
+ // in `SparkPlanner`. This method internally uses `AttributeSet.toSeq`, in which
+ // the returned output columns are sorted by the names and expression ids.
+ assert(actualScannedColumns.sorted === expectedScannedColumns.sorted,
+ "Scanned columns mismatch")
val actualPartitions = actualPartValues.map(_.asScala.mkString(",")).sorted
val expectedPartitions = expectedPartValues.map(_.mkString(",")).sorted
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org