You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by yucai <gi...@git.apache.org> on 2018/06/14 02:31:59 UTC
[GitHub] spark pull request #18121: [SPARK-20897][SQL] cached self-join should not fa...
Github user yucai commented on a diff in the pull request:
https://github.com/apache/spark/pull/18121#discussion_r195288117
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
@@ -58,6 +59,24 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
child.executeBroadcast()
}
+
+ // `ReusedExchangeExec` can have distinct set of output attribute ids from its child, we need
+ // to update the attribute ids in `outputPartitioning` and `outputOrdering`.
+ private lazy val updateAttr: Expression => Expression = {
+ val originalAttrToNewAttr = AttributeMap(child.output.zip(output))
+ e => e.transform {
+ case attr: Attribute => originalAttrToNewAttr.getOrElse(attr, attr)
+ }
+ }
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning match {
+ case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr))
+ case other => other
+ }
+
--- End diff --
@cloud-fan @viirya Could you help explain why we only consider `HashPartitioning` here?
How about `RangePartitioning`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org