You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/08/01 19:10:39 UTC

[GitHub] [spark] hvanhovell commented on a change in pull request #25264: [SPARK-28213][SQL][followup] code cleanup and bug fix for columnar execution framework

hvanhovell commented on a change in pull request #25264: [SPARK-28213][SQL][followup] code cleanup and bug fix for columnar execution framework
URL: https://github.com/apache/spark/pull/25264#discussion_r309854374
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
 ##########
 @@ -57,40 +57,38 @@ class ColumnarRule {
  * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
  * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
  */
-case class ColumnarToRowExec(child: SparkPlan)
-  extends UnaryExecNode with CodegenSupport {
+case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
+  assert(child.supportsColumnar)
 
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  // `ColumnarToRowExec` processes the input RDD directly, which is kind of a leaf node in the
+  // codegen stage and needs to do the limit check.
+  protected override def canCheckLimitNotReached: Boolean = true
+
   override lazy val metrics: Map[String, SQLMetric] = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-    "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
-    "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")
+    "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches")
   )
 
   override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
     val numInputBatches = longMetric("numInputBatches")
-    val scanTime = longMetric("scanTime")
-    // UnsafeProjection is not serializable so do it on the executor side, which is why it is lazy
-    @transient lazy val outputProject = UnsafeProjection.create(output, output)
-    val batches = child.executeColumnar()
-    batches.flatMap(batch => {
-      val batchStartNs = System.nanoTime()
-      numInputBatches += 1
-      // In order to match the numOutputRows metric in the generated code we update
-      // numOutputRows for each batch. This is less accurate than doing it at output
-      // because it will over count the number of rows output in the case of a limit,
-      // but it is more efficient.
-      numOutputRows += batch.numRows()
-      val ret = batch.rowIterator().asScala
-      scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000))
-      ret.map(outputProject)
-    })
+    // This avoids calling `output` in the RDD closure, so that we don't need to include the entire
+    // plan (this) in the closure.
+    val localOutput = this.output
+    child.executeColumnar().mapPartitionsInternal { batches =>
+      val outputProject = UnsafeProjection.create(localOutput, localOutput)
 
 Review comment:
   NIT: maybe name this `toUnsafe` to better convey the intent of the projection

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org