You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "beliefer (via GitHub)" <gi...@apache.org> on 2023/07/04 04:41:58 UTC

[GitHub] [spark] beliefer commented on a diff in pull request #41839: [SPARK-44287][SQL] Use PartitionEvaluator API for RowToColumnarExec & ColumnarToRowExec SQL operators.

beliefer commented on code in PR #41839:
URL: https://github.com/apache/spark/pull/41839#discussion_r1251471826


##########
sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala:
##########
@@ -278,37 +278,42 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper {
         MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
     }
     withSession(extensions) { session =>
-      session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
-      assert(session.sessionState.columnarRules.contains(
-        MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
-      import session.sqlContext.implicits._
-      // perform a join to inject a broadcast exchange
-      val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
-      val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
-      val data = left.join(right, $"l1" === $"r1")
-        // repartitioning avoids having the add operation pushed up into the LocalTableScan
-        .repartition(1)
-      val df = data.selectExpr("l2 + r2")
-      // execute the plan so that the final adaptive plan is available when AQE is on
-      df.collect()
-      val found = collectPlanSteps(df.queryExecution.executedPlan).sum
-      // 1 MyBroadcastExchangeExec
-      // 1 MyShuffleExchangeExec
-      // 1 ColumnarToRowExec
-      // 2 ColumnarProjectExec
-      // 1 ReplacedRowToColumnarExec
-      // so 11121 is expected.
-      assert(found == 11121)
-
-      // Verify that we get back the expected, wrong, result
-      val result = df.collect()
-      assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used.
-      assert(result(1).getLong(0) == 201L)
-      assert(result(2).getLong(0) == 301L)
-
-      withTempPath { path =>
-        val e = intercept[Exception](df.write.parquet(path.getCanonicalPath))
-        assert(e.getMessage == "columnar write")
+      Seq(true, false).foreach { enableEvaluator =>
+        withSQLConf(
+          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,

Review Comment:
   Shall we move this config to outside.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala:
##########
@@ -453,51 +454,25 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
   )
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
     val numInputRows = longMetric("numInputRows")
     val numOutputBatches = longMetric("numOutputBatches")
     // Instead of creating a new config we are reusing columnBatchSize. In the future if we do
     // combine with some of the Arrow conversion tools we will need to unify some of the configs.
     val numRows = conf.columnBatchSize
-    // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
-    // plan (this) in the closure.

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala:
##########
@@ -126,19 +126,22 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
   }
 
   test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") {
-    withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
-      withTempPath { path =>
-        spark.range(1).write.parquet(path.getAbsolutePath)
-        val df = spark.read.parquet(path.getAbsolutePath)
-        val columnarToRowExec =
-          df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
-        try {
-          spark.range(1).foreach { _ =>
-            columnarToRowExec.canonicalized
-            ()
+    Seq(true, false).foreach { enable =>
+      withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala:
##########
@@ -89,15 +87,18 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
   override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
     val numInputBatches = longMetric("numInputBatches")
-    // This avoids calling `output` in the RDD closure, so that we don't need to include the entire
-    // plan (this) in the closure.

Review Comment:
   It seems we need move this comment into `ColumnarToRowEvaluatorFactory` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org