You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/27 11:35:51 UTC

[GitHub] [spark] dbaliafroozeh commented on a change in pull request #28885: [SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse

dbaliafroozeh commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r446514163



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
##########
@@ -326,7 +327,8 @@ object QueryExecution {
    */
   private[execution] def preparations(
       sparkSession: SparkSession,
-      adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
+      adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
+      subquery: Boolean): Seq[Rule[SparkPlan]] = {

Review comment:
       Why do we need this boolean parameter here? What will happen if we just always run `WholePlanReuse` the rule? 

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
##########
@@ -341,7 +341,7 @@ object QueryPlan extends PredicateHelper {
         // Normalize the outer references in the subquery plan.
         val normalizedPlan = s.plan.transformAllExpressions {
           case OuterReference(r) => OuterReference(QueryPlan.normalizeExpressions(r, input))
-        }
+        }.canonicalized

Review comment:
       I think this is not the right place for canonicalizing the plan in a subquery. I checked the subclasses of `PlanExpression` and it seems like that only `org.apache.spark.sql.execution.ScalarSubquery` doesn't have the canonicalized method implemented, which also makes sense given your example plans, the reuse doesn't happen when there is a scalar subquery in the plan. Could you please remove this call from here and implement the `canoncalized` method for `ScalarSubquery` and verify if it has the same effect?	

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
##########
@@ -474,9 +475,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
       Inner,
       None,
       shuffle,
-      shuffle)
+      shuffle.copy())

Review comment:
       I don't follow this part, could you please let me know how copying here makes it different? 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/Reuse.scala
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.reuse
+
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Find out duplicated exchanges and subqueries in the whole spark plan including subqueries, then
+ * use the same exhange or subquery for all the references.
+ */
+case class WholePlanReuse(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
+      // To avoid costly canonicalization of an exchange or a subquery:

Review comment:
       Do you have some measurements how much do we save when using `Map[StructType, ...` instead of a map from the canonicalized form? I know in theory it's beneficial when there is no match, but was wondering if it has some tangible effect.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/Reuse.scala
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.reuse
+
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Find out duplicated exchanges and subqueries in the whole spark plan including subqueries, then
+ * use the same exhange or subquery for all the references.
+ */
+case class WholePlanReuse(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {

Review comment:
       Should we add a flag for this rule? Like `exchangeAndSubqueryReuseEanbled`? I think this rule should only happen if both the flags are enabled. Do we have cases when we only want to reuse subquery and not exchange? Also, It will simplify the logic in your new rule. We can keep the old rules and then fallback to them if only one of the flags is off or the new flag is off. 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/Reuse.scala
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.reuse
+
+import scala.collection.mutable.Map
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Find out duplicated exchanges and subqueries in the whole spark plan including subqueries, then
+ * use the same exhange or subquery for all the references.
+ */
+case class WholePlanReuse(conf: SQLConf) extends Rule[SparkPlan] {

Review comment:
       I know what you mean by whole plan, but I think the name is a bit too generic, since we only can reuse exchanges and subqueries. How about `ReuseExchangeAndSubquery`?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
##########
@@ -1646,4 +1646,25 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     checkAnswer(df, df2)
     checkAnswer(df, Nil)
   }
+
+  test("Subquery reuse across the whole plan") {
+    val df = sql(

Review comment:
       Can you put the physical plan in a comment? It'll help to see what gets reused.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org