You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2019/07/11 14:03:55 UTC

[spark] branch master updated: [SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Columnar

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dff711  [SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Columnar
8dff711 is described below

commit 8dff711ce732d476593a4e235d68e5e1728046cb
Author: Robert (Bobby) Evans <bo...@apache.org>
AuthorDate: Thu Jul 11 09:03:30 2019 -0500

    [SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Columnar
    
    ## What changes were proposed in this pull request?
    
    This is a second part of the https://issues.apache.org/jira/browse/SPARK-27396 and a follow on to #24795
    
    ## How was this patch tested?
    
    I did some manual tests and ran/updated the automated tests
    
    I did some simple performance tests on a single node to try to verify that there is no performance impact, and I was not able to measure anything beyond noise.
    
    Closes #25008 from revans2/columnar-remove-batch-scan.
    
    Authored-by: Robert (Bobby) Evans <bo...@apache.org>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../org/apache/spark/sql/execution/Columnar.scala  |  10 +-
 .../spark/sql/execution/ColumnarBatchScan.scala    | 167 ---------------------
 .../spark/sql/execution/DataSourceScanExec.scala   |  60 ++++----
 .../sql/execution/WholeStageCodegenExec.scala      |   6 +-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |   2 +
 .../execution/columnar/InMemoryTableScanExec.scala | 109 +++++++-------
 .../execution/datasources/v2/BatchScanExec.scala   |   2 +-
 .../datasources/v2/DataSourceV2ScanExecBase.scala  |  34 +++--
 .../datasources/v2/MicroBatchScanExec.scala        |   2 +-
 .../org/apache/spark/sql/CachedTableSuite.scala    |   2 +-
 .../scala/org/apache/spark/sql/SubquerySuite.scala |   5 +-
 .../execution/LogicalPlanTagInSparkPlanSuite.scala |  13 +-
 .../sql/execution/WholeStageCodegenSuite.scala     |  42 ------
 .../columnar/InMemoryColumnarQuerySuite.scala      |  11 +-
 .../datasources/parquet/ParquetQuerySuite.scala    |   4 +-
 .../sql/execution/metric/SQLMetricsSuite.scala     |   6 +-
 .../org/apache/spark/sql/test/SQLTestUtils.scala   |   2 +-
 17 files changed, 142 insertions(+), 335 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 315eba6..4385843 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -53,8 +53,8 @@ class ColumnarRule {
  * Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] into an [[RDD]] of
  * [[InternalRow]]. This is inserted whenever such a transition is determined to be needed.
  *
- * The implementation is based off of similar implementations in [[ColumnarBatchScan]],
- * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]], and
+ * The implementation is based off of similar implementations in
+ * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
  * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
  */
 case class ColumnarToRowExec(child: SparkPlan)
@@ -96,9 +96,6 @@ case class ColumnarToRowExec(child: SparkPlan)
   /**
    * Generate [[ColumnVector]] expressions for our parent to consume as rows.
    * This is called once per [[ColumnVector]] in the batch.
-   *
-   * This code came unchanged from [[ColumnarBatchScan]] and will hopefully replace it
-   * at some point.
    */
   private def genCodeColumnVector(
       ctx: CodegenContext,
@@ -130,9 +127,6 @@ case class ColumnarToRowExec(child: SparkPlan)
    * Produce code to process the input iterator as [[ColumnarBatch]]es.
    * This produces an [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in
    * each batch.
-   *
-   * This code came almost completely unchanged from [[ColumnarBatchScan]] and will
-   * hopefully replace it at some point.
    */
   override protected def doProduce(ctx: CodegenContext): String = {
     // PhysicalRDD always just has one input
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
deleted file mode 100644
index b2e9f76..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, UnsafeRow}
-import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.expressions.codegen.Block._
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
-
-
-/**
- * Helper trait for abstracting scan functionality using [[ColumnarBatch]]es.
- */
-private[sql] trait ColumnarBatchScan extends CodegenSupport {
-
-  protected def supportsBatch: Boolean = true
-
-  override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-    "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
-
-  /**
-   * Generate [[ColumnVector]] expressions for our parent to consume as rows.
-   * This is called once per [[ColumnarBatch]].
-   */
-  private def genCodeColumnVector(
-      ctx: CodegenContext,
-      columnVar: String,
-      ordinal: String,
-      dataType: DataType,
-      nullable: Boolean): ExprCode = {
-    val javaType = CodeGenerator.javaType(dataType)
-    val value = CodeGenerator.getValueFromVector(columnVar, dataType, ordinal)
-    val isNullVar = if (nullable) {
-      JavaCode.isNullVariable(ctx.freshName("isNull"))
-    } else {
-      FalseLiteral
-    }
-    val valueVar = ctx.freshName("value")
-    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
-    val code = code"${ctx.registerComment(str)}" + (if (nullable) {
-      code"""
-        boolean $isNullVar = $columnVar.isNullAt($ordinal);
-        $javaType $valueVar = $isNullVar ? ${CodeGenerator.defaultValue(dataType)} : ($value);
-      """
-    } else {
-      code"$javaType $valueVar = $value;"
-    })
-    ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType))
-  }
-
-  /**
-   * Produce code to process the input iterator as [[ColumnarBatch]]es.
-   * This produces an [[UnsafeRow]] for each row in each batch.
-   */
-  // TODO: return ColumnarBatch.Rows instead
-  override protected def doProduce(ctx: CodegenContext): String = {
-    // PhysicalRDD always just has one input
-    val input = ctx.addMutableState("scala.collection.Iterator", "input",
-      v => s"$v = inputs[0];")
-    if (supportsBatch) {
-      produceBatches(ctx, input)
-    } else {
-      produceRows(ctx, input)
-    }
-  }
-
-  private def produceBatches(ctx: CodegenContext, input: String): String = {
-    // metrics
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
-    val scanTimeMetric = metricTerm(ctx, "scanTime")
-    val scanTimeTotalNs =
-      ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0
-
-    val columnarBatchClz = classOf[ColumnarBatch].getName
-    val batch = ctx.addMutableState(columnarBatchClz, "batch")
-
-    val idx = ctx.addMutableState(CodeGenerator.JAVA_INT, "batchIdx") // init as batchIdx = 0
-    val columnVectorClzs = vectorTypes.getOrElse(
-      Seq.fill(output.indices.size)(classOf[ColumnVector].getName))
-    val (colVars, columnAssigns) = columnVectorClzs.zipWithIndex.map {
-      case (columnVectorClz, i) =>
-        val name = ctx.addMutableState(columnVectorClz, s"colInstance$i")
-        (name, s"$name = ($columnVectorClz) $batch.column($i);")
-    }.unzip
-
-    val nextBatch = ctx.freshName("nextBatch")
-    val nextBatchFuncName = ctx.addNewFunction(nextBatch,
-      s"""
-         |private void $nextBatch() throws java.io.IOException {
-         |  long getBatchStart = System.nanoTime();
-         |  if ($input.hasNext()) {
-         |    $batch = ($columnarBatchClz)$input.next();
-         |    $numOutputRows.add($batch.numRows());
-         |    $idx = 0;
-         |    ${columnAssigns.mkString("", "\n", "\n")}
-         |  }
-         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
-         |}""".stripMargin)
-
-    ctx.currentVars = null
-    val rowidx = ctx.freshName("rowIdx")
-    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
-      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
-    }
-    val localIdx = ctx.freshName("localIdx")
-    val localEnd = ctx.freshName("localEnd")
-    val numRows = ctx.freshName("numRows")
-    val shouldStop = if (parent.needStopCheck) {
-      s"if (shouldStop()) { $idx = $rowidx + 1; return; }"
-    } else {
-      "// shouldStop check is eliminated"
-    }
-    s"""
-       |if ($batch == null) {
-       |  $nextBatchFuncName();
-       |}
-       |while ($limitNotReachedCond $batch != null) {
-       |  int $numRows = $batch.numRows();
-       |  int $localEnd = $numRows - $idx;
-       |  for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
-       |    int $rowidx = $idx + $localIdx;
-       |    ${consume(ctx, columnsBatchInput).trim}
-       |    $shouldStop
-       |  }
-       |  $idx = $numRows;
-       |  $batch = null;
-       |  $nextBatchFuncName();
-       |}
-       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
-       |$scanTimeTotalNs = 0;
-     """.stripMargin
-  }
-
-  private def produceRows(ctx: CodegenContext, input: String): String = {
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
-    val row = ctx.freshName("row")
-
-    ctx.INPUT_ROW = row
-    ctx.currentVars = null
-    s"""
-       |while ($limitNotReachedCond $input.hasNext()) {
-       |  InternalRow $row = (InternalRow) $input.next();
-       |  $numOutputRows.add(1);
-       |  ${consume(ctx, null, row).trim}
-       |  if (shouldStop()) return;
-       |}
-     """.stripMargin
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 518460d..728ac3a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -37,10 +37,11 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat =>
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.BitSet
 
-trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
+trait DataSourceScanExec extends LeafExecNode {
   val relation: BaseRelation
   val tableIdentifier: Option[TableIdentifier]
 
@@ -69,6 +70,12 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
   private def redact(text: String): String = {
     Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, text)
   }
+
+  /**
+   * The data being read in.  This is to provide input to the tests in a way compatible with
+   * [[InputRDDCodegen]] which all implementations used to extend.
+   */
+  def inputRDDs(): Seq[RDD[InternalRow]]
 }
 
 /** Physical plan node for scanning data from a relation. */
@@ -141,11 +148,11 @@ case class FileSourceScanExec(
     optionalBucketSet: Option[BitSet],
     dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier])
-  extends DataSourceScanExec with ColumnarBatchScan  {
+  extends DataSourceScanExec {
 
   // Note that some vals referring the file-based relation are lazy intentionally
   // so that this plan can be canonicalized on executor side too. See SPARK-23731.
-  override lazy val supportsBatch: Boolean = {
+  override lazy val supportsColumnar: Boolean = {
     relation.fileFormat.supportBatch(relation.sparkSession, schema)
   }
 
@@ -275,7 +282,7 @@ case class FileSourceScanExec(
       Map(
         "Format" -> relation.fileFormat.toString,
         "ReadSchema" -> requiredSchema.catalogString,
-        "Batched" -> supportsBatch.toString,
+        "Batched" -> supportsColumnar.toString,
         "PartitionFilters" -> seqToString(partitionFilters),
         "PushedFilters" -> seqToString(pushedDownFilters),
         "DataFilters" -> seqToString(dataFilters),
@@ -302,7 +309,7 @@ case class FileSourceScanExec(
     withSelectedBucketsCount
   }
 
-  private lazy val inputRDD: RDD[InternalRow] = {
+  lazy val inputRDD: RDD[InternalRow] = {
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,
@@ -334,29 +341,30 @@ case class FileSourceScanExec(
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    if (supportsBatch) {
-      // in the case of fallback, this batched scan should never fail because of:
-      // 1) only primitive types are supported
-      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
-      WholeStageCodegenExec(this)(codegenStageId = 0).execute()
-    } else {
-      val numOutputRows = longMetric("numOutputRows")
-
-      if (needsUnsafeRowConversion) {
-        inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
-          val proj = UnsafeProjection.create(schema)
-          proj.initialize(index)
-          iter.map( r => {
-            numOutputRows += 1
-            proj(r)
-          })
-        }
-      } else {
-        inputRDD.map { r =>
+    val numOutputRows = longMetric("numOutputRows")
+
+    if (needsUnsafeRowConversion) {
+      inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+        val proj = UnsafeProjection.create(schema)
+        proj.initialize(index)
+        iter.map( r => {
           numOutputRows += 1
-          r
-        }
+          proj(r)
+        })
       }
+    } else {
+      inputRDD.map { r =>
+        numOutputRows += 1
+        r
+      }
+    }
+  }
+
+  protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch =>
+      numOutputRows += batch.numRows()
+      batch
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 94a5ede..a0afa9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -709,11 +709,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
         s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
         s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +
         s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
-      child match {
-        // The fallback solution of batch file source scan still uses WholeStageCodegenExec
-        case f: FileSourceScanExec if f.supportsBatch => // do nothing
-        case _ => return child.execute()
-      }
+      return child.execute()
     }
 
     val references = ctx.references.toArray
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 0708878..61dbc58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -84,6 +84,8 @@ case class AdaptiveSparkPlanExec(
   // optimizations should be stage-independent.
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
     ReduceNumShufflePartitions(conf),
+    ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf,
+      session.sessionState.columnarRules),
     CollapseCodegenStages(conf)
   )
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 06634c1..7a8c6d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.vectorized._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
@@ -34,7 +35,10 @@ case class InMemoryTableScanExec(
     attributes: Seq[Attribute],
     predicates: Seq[Expression],
     @transient relation: InMemoryRelation)
-  extends LeafExecNode with ColumnarBatchScan {
+  extends LeafExecNode {
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override val nodeName: String = {
     relation.cacheBuilder.tableName match {
@@ -65,7 +69,7 @@ case class InMemoryTableScanExec(
    * If true, get data from ColumnVector in ColumnarBatch, which are generally faster.
    * If false, get data from UnsafeRow build from CachedBatch
    */
-  override val supportsBatch: Boolean = {
+  override val supportsColumnar: Boolean = {
     // In the initial implementation, for ease of review
     // support only primitive data types and # of fields is less than wholeStageMaxNumFields
     conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => f.dataType match {
@@ -75,9 +79,6 @@ case class InMemoryTableScanExec(
     }) && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema)
   }
 
-  // TODO: revisit this. Shall we always turn off whole stage codegen if the output data are rows?
-  override def supportCodegen: Boolean = supportsBatch
-
   private val columnIndices =
     attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray
 
@@ -108,59 +109,61 @@ case class InMemoryTableScanExec(
     columnarBatch
   }
 
+  private lazy val columnarInputRDD: RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    val buffers = filteredCachedBatches()
+    val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled
+    buffers
+      .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled))
+      .map(b => {
+        numOutputRows += b.numRows()
+        b
+      })
+  }
+
   private lazy val inputRDD: RDD[InternalRow] = {
     val buffers = filteredCachedBatches()
     val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled
-    if (supportsBatch) {
-      // HACK ALERT: This is actually an RDD[ColumnarBatch].
-      // We're taking advantage of Scala's type erasure here to pass these batches along.
-      buffers
-        .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled))
-        .asInstanceOf[RDD[InternalRow]]
-    } else {
-      val numOutputRows = longMetric("numOutputRows")
+    val numOutputRows = longMetric("numOutputRows")
 
-      if (enableAccumulatorsForTest) {
-        readPartitions.setValue(0)
-        readBatches.setValue(0)
-      }
+    if (enableAccumulatorsForTest) {
+      readPartitions.setValue(0)
+      readBatches.setValue(0)
+    }
 
-      // Using these variables here to avoid serialization of entire objects (if referenced
-      // directly) within the map Partitions closure.
-      val relOutput: AttributeSeq = relation.output
-
-      filteredCachedBatches().mapPartitionsInternal { cachedBatchIterator =>
-        // Find the ordinals and data types of the requested columns.
-        val (requestedColumnIndices, requestedColumnDataTypes) =
-          attributes.map { a =>
-            relOutput.indexOf(a.exprId) -> a.dataType
-          }.unzip
-
-        // update SQL metrics
-        val withMetrics = cachedBatchIterator.map { batch =>
-          if (enableAccumulatorsForTest) {
-            readBatches.add(1)
-          }
-          numOutputRows += batch.numRows
-          batch
+    // Using these variables here to avoid serialization of entire objects (if referenced
+    // directly) within the map Partitions closure.
+    val relOutput: AttributeSeq = relation.output
+
+    filteredCachedBatches().mapPartitionsInternal { cachedBatchIterator =>
+      // Find the ordinals and data types of the requested columns.
+      val (requestedColumnIndices, requestedColumnDataTypes) =
+        attributes.map { a =>
+          relOutput.indexOf(a.exprId) -> a.dataType
+        }.unzip
+
+      // update SQL metrics
+      val withMetrics = cachedBatchIterator.map { batch =>
+        if (enableAccumulatorsForTest) {
+          readBatches.add(1)
         }
+        numOutputRows += batch.numRows
+        batch
+      }
 
-        val columnTypes = requestedColumnDataTypes.map {
-          case udt: UserDefinedType[_] => udt.sqlType
-          case other => other
-        }.toArray
-        val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
-        columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray)
-        if (enableAccumulatorsForTest && columnarIterator.hasNext) {
-          readPartitions.add(1)
-        }
-        columnarIterator
+      val columnTypes = requestedColumnDataTypes.map {
+        case udt: UserDefinedType[_] => udt.sqlType
+        case other => other
+      }.toArray
+      val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
+      columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray)
+      if (enableAccumulatorsForTest && columnarIterator.hasNext) {
+        readPartitions.add(1)
       }
+      columnarIterator
     }
   }
 
-  override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
-
   override def output: Seq[Attribute] = attributes
 
   private def updateAttribute(expr: Expression): Expression = {
@@ -339,10 +342,10 @@ case class InMemoryTableScanExec(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    if (supportsBatch) {
-      WholeStageCodegenExec(this)(codegenStageId = 0).execute()
-    } else {
-      inputRDD
-    }
+    inputRDD
+  }
+
+  protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    columnarInputRDD
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 3276ab5..c3cbb9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -45,7 +45,7 @@ case class BatchScanExec(
   override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()
 
   override lazy val inputRDD: RDD[InternalRow] = {
-    new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
+    new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)
   }
 
   override def doCanonicalize(): BatchScanExec = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index 9ad683f..c5c902f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -23,11 +23,16 @@ import org.apache.spark.sql.catalyst.expressions.AttributeMap
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning}
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.Utils
 
-trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
+trait DataSourceV2ScanExecBase extends LeafExecNode {
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   def scan: Scan
 
@@ -52,7 +57,7 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
     case _ => super.outputPartitioning
   }
 
-  override def supportsBatch: Boolean = {
+  override def supportsColumnar: Boolean = {
     require(partitions.forall(readerFactory.supportColumnarReads) ||
       !partitions.exists(readerFactory.supportColumnarReads),
       "Cannot mix row-based and columnar input partitions.")
@@ -62,17 +67,22 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
 
   def inputRDD: RDD[InternalRow]
 
-  override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
+  def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
 
   override def doExecute(): RDD[InternalRow] = {
-    if (supportsBatch) {
-      WholeStageCodegenExec(this)(codegenStageId = 0).execute()
-    } else {
-      val numOutputRows = longMetric("numOutputRows")
-      inputRDD.map { r =>
-        numOutputRows += 1
-        r
-      }
+    val numOutputRows = longMetric("numOutputRows")
+    inputRDD.map { r =>
+      numOutputRows += 1
+      r
+    }
+  }
+
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    inputRDD.asInstanceOf[RDD[ColumnarBatch]].map {
+      b =>
+        numOutputRows += b.numRows()
+        b
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
index d2e33d4..a9b0f5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
@@ -46,6 +46,6 @@ case class MicroBatchScanExec(
   override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()
 
   override lazy val inputRDD: RDD[InternalRow] = {
-    new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
+    new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 6049e89..267f255 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -832,7 +832,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
         val df = spark.range(10).cache()
         df.queryExecution.executedPlan.foreach {
           case i: InMemoryTableScanExec =>
-            assert(i.supportsBatch == vectorized && i.supportCodegen == vectorized)
+            assert(i.supportsColumnar == vectorized)
           case _ =>
         }
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index fddc4f6..b2c3868 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
-import org.apache.spark.sql.execution.{ExecSubqueryExpression, FileSourceScanExec, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -1293,7 +1293,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
       checkAnswer(df, Seq(Row(0, 0), Row(2, 0)))
       // need to execute the query before we can examine fs.inputRDDs()
       assert(df.queryExecution.executedPlan match {
-        case WholeStageCodegenExec(fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)) =>
+        case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
+            fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _), _))) =>
           partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
             fs.inputRDDs().forall(
               _.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
index b35348b..b114348 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
@@ -44,9 +44,14 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite {
   }
 
   // A scan plan tree is a plan tree that has a leaf node under zero or more Project/Filter nodes.
-  private def isScanPlanTree(plan: SparkPlan): Boolean = plan match {
-    case p: ProjectExec => isScanPlanTree(p.child)
-    case f: FilterExec => isScanPlanTree(f.child)
+  // Because of how codegen and columnar to row transitions work, we may have InputAdaptors
+  // and ColumnarToRow transformations in the middle of it, but they will not have the tag
+  // we want, so skip them if they are the first thing we see
+  private def isScanPlanTree(plan: SparkPlan, first: Boolean): Boolean = plan match {
+    case i: InputAdapter if !first => isScanPlanTree(i.child, false)
+    case c: ColumnarToRowExec if !first => isScanPlanTree(c.child, false)
+    case p: ProjectExec => isScanPlanTree(p.child, false)
+    case f: FilterExec => isScanPlanTree(f.child, false)
     case _: LeafExecNode => true
     case _ => false
   }
@@ -87,7 +92,7 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite {
       case _: SubqueryExec | _: ReusedSubqueryExec =>
         assert(plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty)
 
-      case _ if isScanPlanTree(plan) =>
+      case _ if isScanPlanTree(plan, true) =>
         // The strategies for planning scan can remove or add FilterExec/ProjectExec nodes,
         // so it's not simple to check. Instead, we only check that the origin LogicalPlan
         // contains the corresponding leaf node of the SparkPlan.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 9462ee1..a276e47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -121,29 +121,6 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
     assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
   }
 
-  test("cache for primitive type should be in WholeStageCodegen with InMemoryTableScanExec") {
-    import testImplicits._
-
-    val dsInt = spark.range(3).cache()
-    dsInt.count()
-    val dsIntFilter = dsInt.filter(_ > 0)
-    val planInt = dsIntFilter.queryExecution.executedPlan
-    assert(planInt.collect {
-      case WholeStageCodegenExec(FilterExec(_, i: InMemoryTableScanExec)) if i.supportsBatch => ()
-    }.length == 1)
-    assert(dsIntFilter.collect() === Array(1, 2))
-
-    // cache for string type is not supported for InMemoryTableScanExec
-    val dsString = spark.range(3).map(_.toString).cache()
-    dsString.count()
-    val dsStringFilter = dsString.filter(_ == "1")
-    val planString = dsStringFilter.queryExecution.executedPlan
-    assert(planString.collect {
-      case i: InMemoryTableScanExec if !i.supportsBatch => ()
-    }.length == 1)
-    assert(dsStringFilter.collect() === Array("1"))
-  }
-
   test("SPARK-19512 codegen for comparing structs is incorrect") {
     // this would raise CompileException before the fix
     spark.range(10)
@@ -213,25 +190,6 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
     assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
   }
 
-  ignore("bytecode of batch file scan exceeds the limit of WHOLESTAGE_HUGE_METHOD_LIMIT") {
-    import testImplicits._
-    withTempPath { dir =>
-      val path = dir.getCanonicalPath
-      val df = spark.range(10).select(Seq.tabulate(201) {i => ('id + i).as(s"c$i")} : _*)
-      df.write.mode(SaveMode.Overwrite).parquet(path)
-
-      withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "202",
-        SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "2000") {
-        // wide table batch scan causes the byte code of codegen exceeds the limit of
-        // WHOLESTAGE_HUGE_METHOD_LIMIT
-        val df2 = spark.read.parquet(path)
-        val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
-        assert(fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch)
-        checkAnswer(df2, df)
-      }
-    }
-  }
-
   test("Control splitting consume function by operators with config") {
     import testImplicits._
     val df = spark.range(10).select(Seq.tabulate(2) {i => ('id + i).as(s"c$i")} : _*)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index d31e49c..466baf2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{ColumnarToRowExec, FilterExec, InputAdapter, LocalTableScanExec, WholeStageCodegenExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -486,15 +486,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
     val df2 = df1.where("y = 3")
 
     val planBeforeFilter = df2.queryExecution.executedPlan.collect {
-      case f: FilterExec => f.child
+      case FilterExec(_, c: ColumnarToRowExec) => c.child
+      case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(i: InputAdapter))) => i.child
     }
     assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec])
 
-    val execPlan = if (codegenEnabled == "true") {
-      WholeStageCodegenExec(planBeforeFilter.head)(codegenStageId = 0)
-    } else {
-      planBeforeFilter.head
-    }
+    val execPlan = planBeforeFilter.head
     assert(execPlan.executeCollectPublic().length == 0)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 7aa0ba7..a6429bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -924,14 +924,14 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
         // donot return batch, because whole stage codegen is disabled for wide table (>200 columns)
         val df2 = spark.read.parquet(path)
         val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
-        assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch)
+        assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsColumnar)
         checkAnswer(df2, df)
 
         // return batch
         val columns = Seq.tabulate(9) {i => s"c$i"}
         val df3 = df2.selectExpr(columns : _*)
         val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
-        assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsBatch)
+        assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsColumnar)
         checkAnswer(df3, df.selectExpr(columns : _*))
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a8d2308..b260f5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -584,19 +584,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       sql("CREATE TEMPORARY VIEW inMemoryTable AS SELECT 1 AS c1")
       sql("CACHE TABLE inMemoryTable")
       testSparkPlanMetrics(spark.table("inMemoryTable"), 1,
-        Map(0L -> (("Scan In-memory table `inMemoryTable`", Map.empty)))
+        Map(1L -> (("Scan In-memory table `inMemoryTable`", Map.empty)))
       )
 
       sql("CREATE TEMPORARY VIEW ```a``b``` AS SELECT 2 AS c1")
       sql("CACHE TABLE ```a``b```")
       testSparkPlanMetrics(spark.table("```a``b```"), 1,
-        Map(0L -> (("Scan In-memory table ```a``b```", Map.empty)))
+        Map(1L -> (("Scan In-memory table ```a``b```", Map.empty)))
       )
     }
 
     // Show InMemoryTableScan on UI
     testSparkPlanMetrics(spark.range(1).cache().select("id"), 1,
-      Map(0L -> (("InMemoryTableScan", Map.empty)))
+      Map(1L -> (("InMemoryTableScan", Map.empty)))
     )
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index da0e553..115536d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -392,7 +392,7 @@ private[sql] trait SQLTestUtilsBase
    */
   protected def stripSparkFilter(df: DataFrame): DataFrame = {
     val schema = df.schema
-    val withoutFilters = df.queryExecution.sparkPlan.transform {
+    val withoutFilters = df.queryExecution.executedPlan.transform {
       case FilterExec(_, child) => child
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org