You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by yucai <gi...@git.apache.org> on 2018/06/14 02:31:59 UTC

[GitHub] spark pull request #18121: [SPARK-20897][SQL] cached self-join should not fa...

Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18121#discussion_r195288117
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
    @@ -58,6 +59,24 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
       override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
         child.executeBroadcast()
       }
    +
    +  // `ReusedExchangeExec` can have distinct set of output attribute ids from its child, we need
    +  // to update the attribute ids in `outputPartitioning` and `outputOrdering`.
    +  private lazy val updateAttr: Expression => Expression = {
    +    val originalAttrToNewAttr = AttributeMap(child.output.zip(output))
    +    e => e.transform {
    +      case attr: Attribute => originalAttrToNewAttr.getOrElse(attr, attr)
    +    }
    +  }
    +
    +  override def outputPartitioning: Partitioning = child.outputPartitioning match {
    +    case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr))
    +    case other => other
    +  }
    +
    --- End diff --
    
    @cloud-fan @viirya Could you help explain why we only consider `HashPartitioning` here?
    How about `RangePartitioning`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org