You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gatorsmile <gi...@git.apache.org> on 2017/05/26 18:19:37 UTC

[GitHub] spark pull request #14548: [SPARK-16958] [SQL] Reuse subqueries within the s...

Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14548#discussion_r118761757
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ---
    @@ -68,14 +103,90 @@ case class ScalarSubquery(
     }
     
     /**
    + * A subquery that will check the value of `child` whether is in the result of a query or not.
    + */
    +case class InSubquery(
    +    child: Expression,
    +    executedPlan: SubqueryExec,
    +    exprId: ExprId,
    +    private var result: Array[Any] = null,
    +    private var updated: Boolean = false) extends ExecSubqueryExpression {
    +
    +  override def dataType: DataType = BooleanType
    +  override def children: Seq[Expression] = child :: Nil
    +  override def nullable: Boolean = child.nullable
    +  override def toString: String = s"$child IN ${executedPlan.name}"
    +
    +  def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression = copy(executedPlan = plan)
    +
    +  override def semanticEquals(other: Expression): Boolean = other match {
    +    case in: InSubquery => child.semanticEquals(in.child) &&
    +      executedPlan.sameResult(in.executedPlan)
    +    case _ => false
    +  }
    +
    +  def updateResult(rows: Array[InternalRow]): Unit = {
    +    result = rows.map(_.get(0, child.dataType)).asInstanceOf[Array[Any]]
    +    updated = true
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    require(updated, s"$this has not finished")
    +    val v = child.eval(input)
    +    if (v == null) {
    +      null
    +    } else {
    +      result.contains(v)
    +    }
    +  }
    +
    +  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    +    require(updated, s"$this has not finished")
    +    InSet(child, result.toSet).doGenCode(ctx, ev)
    +  }
    +}
    +
    +/**
      * Plans scalar subqueries from that are present in the given [[SparkPlan]].
      */
     case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
       def apply(plan: SparkPlan): SparkPlan = {
         plan.transformAllExpressions {
           case subquery: expressions.ScalarSubquery =>
             val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
    -        ScalarSubquery(executedPlan, subquery.exprId)
    +        ScalarSubquery(
    +          SubqueryExec(s"subquery${subquery.exprId.id}", executedPlan),
    +          subquery.exprId)
    +      case expressions.PredicateSubquery(plan, Seq(e: Expression), _, exprId) =>
    +        val executedPlan = new QueryExecution(sparkSession, plan).executedPlan
    +        InSubquery(e, SubqueryExec(s"subquery${exprId.id}", executedPlan), exprId)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * Find out duplicated exchanges in the spark plan, then use the same exchange for all the
    + * references.
    + */
    +case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
    +
    +  def apply(plan: SparkPlan): SparkPlan = {
    +    if (!conf.exchangeReuseEnabled) {
    --- End diff --
    
    Why sharing the same conf `spark.sql.exchange.reuse` with ReuseExchange?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org