You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2019/07/11 14:03:55 UTC
[spark] branch master updated: [SPARK-28213][SQL] Replace
ColumnarBatchScan with equivilant from Columnar
This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8dff711 [SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Columnar
8dff711 is described below
commit 8dff711ce732d476593a4e235d68e5e1728046cb
Author: Robert (Bobby) Evans <bo...@apache.org>
AuthorDate: Thu Jul 11 09:03:30 2019 -0500
[SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Columnar
## What changes were proposed in this pull request?
This is a second part of the https://issues.apache.org/jira/browse/SPARK-27396 and a follow on to #24795
## How was this patch tested?
I did some manual tests and ran/updated the automated tests
I did some simple performance tests on a single node to try to verify that there is no performance impact, and I was not able to measure anything beyond noise.
Closes #25008 from revans2/columnar-remove-batch-scan.
Authored-by: Robert (Bobby) Evans <bo...@apache.org>
Signed-off-by: Thomas Graves <tg...@apache.org>
---
.../org/apache/spark/sql/execution/Columnar.scala | 10 +-
.../spark/sql/execution/ColumnarBatchScan.scala | 167 ---------------------
.../spark/sql/execution/DataSourceScanExec.scala | 60 ++++----
.../sql/execution/WholeStageCodegenExec.scala | 6 +-
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +
.../execution/columnar/InMemoryTableScanExec.scala | 109 +++++++-------
.../execution/datasources/v2/BatchScanExec.scala | 2 +-
.../datasources/v2/DataSourceV2ScanExecBase.scala | 34 +++--
.../datasources/v2/MicroBatchScanExec.scala | 2 +-
.../org/apache/spark/sql/CachedTableSuite.scala | 2 +-
.../scala/org/apache/spark/sql/SubquerySuite.scala | 5 +-
.../execution/LogicalPlanTagInSparkPlanSuite.scala | 13 +-
.../sql/execution/WholeStageCodegenSuite.scala | 42 ------
.../columnar/InMemoryColumnarQuerySuite.scala | 11 +-
.../datasources/parquet/ParquetQuerySuite.scala | 4 +-
.../sql/execution/metric/SQLMetricsSuite.scala | 6 +-
.../org/apache/spark/sql/test/SQLTestUtils.scala | 2 +-
17 files changed, 142 insertions(+), 335 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 315eba6..4385843 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -53,8 +53,8 @@ class ColumnarRule {
* Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] into an [[RDD]] of
* [[InternalRow]]. This is inserted whenever such a transition is determined to be needed.
*
- * The implementation is based off of similar implementations in [[ColumnarBatchScan]],
- * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]], and
+ * The implementation is based off of similar implementations in
+ * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
*/
case class ColumnarToRowExec(child: SparkPlan)
@@ -96,9 +96,6 @@ case class ColumnarToRowExec(child: SparkPlan)
/**
* Generate [[ColumnVector]] expressions for our parent to consume as rows.
* This is called once per [[ColumnVector]] in the batch.
- *
- * This code came unchanged from [[ColumnarBatchScan]] and will hopefully replace it
- * at some point.
*/
private def genCodeColumnVector(
ctx: CodegenContext,
@@ -130,9 +127,6 @@ case class ColumnarToRowExec(child: SparkPlan)
* Produce code to process the input iterator as [[ColumnarBatch]]es.
* This produces an [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in
* each batch.
- *
- * This code came almost completely unchanged from [[ColumnarBatchScan]] and will
- * hopefully replace it at some point.
*/
override protected def doProduce(ctx: CodegenContext): String = {
// PhysicalRDD always just has one input
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
deleted file mode 100644
index b2e9f76..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, UnsafeRow}
-import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.expressions.codegen.Block._
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
-
-
-/**
- * Helper trait for abstracting scan functionality using [[ColumnarBatch]]es.
- */
-private[sql] trait ColumnarBatchScan extends CodegenSupport {
-
- protected def supportsBatch: Boolean = true
-
- override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
- "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
-
- /**
- * Generate [[ColumnVector]] expressions for our parent to consume as rows.
- * This is called once per [[ColumnarBatch]].
- */
- private def genCodeColumnVector(
- ctx: CodegenContext,
- columnVar: String,
- ordinal: String,
- dataType: DataType,
- nullable: Boolean): ExprCode = {
- val javaType = CodeGenerator.javaType(dataType)
- val value = CodeGenerator.getValueFromVector(columnVar, dataType, ordinal)
- val isNullVar = if (nullable) {
- JavaCode.isNullVariable(ctx.freshName("isNull"))
- } else {
- FalseLiteral
- }
- val valueVar = ctx.freshName("value")
- val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
- val code = code"${ctx.registerComment(str)}" + (if (nullable) {
- code"""
- boolean $isNullVar = $columnVar.isNullAt($ordinal);
- $javaType $valueVar = $isNullVar ? ${CodeGenerator.defaultValue(dataType)} : ($value);
- """
- } else {
- code"$javaType $valueVar = $value;"
- })
- ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType))
- }
-
- /**
- * Produce code to process the input iterator as [[ColumnarBatch]]es.
- * This produces an [[UnsafeRow]] for each row in each batch.
- */
- // TODO: return ColumnarBatch.Rows instead
- override protected def doProduce(ctx: CodegenContext): String = {
- // PhysicalRDD always just has one input
- val input = ctx.addMutableState("scala.collection.Iterator", "input",
- v => s"$v = inputs[0];")
- if (supportsBatch) {
- produceBatches(ctx, input)
- } else {
- produceRows(ctx, input)
- }
- }
-
- private def produceBatches(ctx: CodegenContext, input: String): String = {
- // metrics
- val numOutputRows = metricTerm(ctx, "numOutputRows")
- val scanTimeMetric = metricTerm(ctx, "scanTime")
- val scanTimeTotalNs =
- ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0
-
- val columnarBatchClz = classOf[ColumnarBatch].getName
- val batch = ctx.addMutableState(columnarBatchClz, "batch")
-
- val idx = ctx.addMutableState(CodeGenerator.JAVA_INT, "batchIdx") // init as batchIdx = 0
- val columnVectorClzs = vectorTypes.getOrElse(
- Seq.fill(output.indices.size)(classOf[ColumnVector].getName))
- val (colVars, columnAssigns) = columnVectorClzs.zipWithIndex.map {
- case (columnVectorClz, i) =>
- val name = ctx.addMutableState(columnVectorClz, s"colInstance$i")
- (name, s"$name = ($columnVectorClz) $batch.column($i);")
- }.unzip
-
- val nextBatch = ctx.freshName("nextBatch")
- val nextBatchFuncName = ctx.addNewFunction(nextBatch,
- s"""
- |private void $nextBatch() throws java.io.IOException {
- | long getBatchStart = System.nanoTime();
- | if ($input.hasNext()) {
- | $batch = ($columnarBatchClz)$input.next();
- | $numOutputRows.add($batch.numRows());
- | $idx = 0;
- | ${columnAssigns.mkString("", "\n", "\n")}
- | }
- | $scanTimeTotalNs += System.nanoTime() - getBatchStart;
- |}""".stripMargin)
-
- ctx.currentVars = null
- val rowidx = ctx.freshName("rowIdx")
- val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
- genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
- }
- val localIdx = ctx.freshName("localIdx")
- val localEnd = ctx.freshName("localEnd")
- val numRows = ctx.freshName("numRows")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $idx = $rowidx + 1; return; }"
- } else {
- "// shouldStop check is eliminated"
- }
- s"""
- |if ($batch == null) {
- | $nextBatchFuncName();
- |}
- |while ($limitNotReachedCond $batch != null) {
- | int $numRows = $batch.numRows();
- | int $localEnd = $numRows - $idx;
- | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
- | int $rowidx = $idx + $localIdx;
- | ${consume(ctx, columnsBatchInput).trim}
- | $shouldStop
- | }
- | $idx = $numRows;
- | $batch = null;
- | $nextBatchFuncName();
- |}
- |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
- |$scanTimeTotalNs = 0;
- """.stripMargin
- }
-
- private def produceRows(ctx: CodegenContext, input: String): String = {
- val numOutputRows = metricTerm(ctx, "numOutputRows")
- val row = ctx.freshName("row")
-
- ctx.INPUT_ROW = row
- ctx.currentVars = null
- s"""
- |while ($limitNotReachedCond $input.hasNext()) {
- | InternalRow $row = (InternalRow) $input.next();
- | $numOutputRows.add(1);
- | ${consume(ctx, null, row).trim}
- | if (shouldStop()) return;
- |}
- """.stripMargin
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 518460d..728ac3a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -37,10 +37,11 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat =>
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.BitSet
-trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
+trait DataSourceScanExec extends LeafExecNode {
val relation: BaseRelation
val tableIdentifier: Option[TableIdentifier]
@@ -69,6 +70,12 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
private def redact(text: String): String = {
Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, text)
}
+
+ /**
+ * The data being read in. This is to provide input to the tests in a way compatible with
+ * [[InputRDDCodegen]] which all implementations used to extend.
+ */
+ def inputRDDs(): Seq[RDD[InternalRow]]
}
/** Physical plan node for scanning data from a relation. */
@@ -141,11 +148,11 @@ case class FileSourceScanExec(
optionalBucketSet: Option[BitSet],
dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier])
- extends DataSourceScanExec with ColumnarBatchScan {
+ extends DataSourceScanExec {
// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
- override lazy val supportsBatch: Boolean = {
+ override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
}
@@ -275,7 +282,7 @@ case class FileSourceScanExec(
Map(
"Format" -> relation.fileFormat.toString,
"ReadSchema" -> requiredSchema.catalogString,
- "Batched" -> supportsBatch.toString,
+ "Batched" -> supportsColumnar.toString,
"PartitionFilters" -> seqToString(partitionFilters),
"PushedFilters" -> seqToString(pushedDownFilters),
"DataFilters" -> seqToString(dataFilters),
@@ -302,7 +309,7 @@ case class FileSourceScanExec(
withSelectedBucketsCount
}
- private lazy val inputRDD: RDD[InternalRow] = {
+ lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
@@ -334,29 +341,30 @@ case class FileSourceScanExec(
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
protected override def doExecute(): RDD[InternalRow] = {
- if (supportsBatch) {
- // in the case of fallback, this batched scan should never fail because of:
- // 1) only primitive types are supported
- // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
- WholeStageCodegenExec(this)(codegenStageId = 0).execute()
- } else {
- val numOutputRows = longMetric("numOutputRows")
-
- if (needsUnsafeRowConversion) {
- inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
- val proj = UnsafeProjection.create(schema)
- proj.initialize(index)
- iter.map( r => {
- numOutputRows += 1
- proj(r)
- })
- }
- } else {
- inputRDD.map { r =>
+ val numOutputRows = longMetric("numOutputRows")
+
+ if (needsUnsafeRowConversion) {
+ inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+ val proj = UnsafeProjection.create(schema)
+ proj.initialize(index)
+ iter.map( r => {
numOutputRows += 1
- r
- }
+ proj(r)
+ })
}
+ } else {
+ inputRDD.map { r =>
+ numOutputRows += 1
+ r
+ }
+ }
+ }
+
+ protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numOutputRows = longMetric("numOutputRows")
+ inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch =>
+ numOutputRows += batch.numRows()
+ batch
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 94a5ede..a0afa9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -709,11 +709,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +
s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
- child match {
- // The fallback solution of batch file source scan still uses WholeStageCodegenExec
- case f: FileSourceScanExec if f.supportsBatch => // do nothing
- case _ => return child.execute()
- }
+ return child.execute()
}
val references = ctx.references.toArray
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 0708878..61dbc58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -84,6 +84,8 @@ case class AdaptiveSparkPlanExec(
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReduceNumShufflePartitions(conf),
+ ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf,
+ session.sessionState.columnarRules),
CollapseCodegenStages(conf)
)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 06634c1..7a8c6d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.vectorized._
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
@@ -34,7 +35,10 @@ case class InMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
@transient relation: InMemoryRelation)
- extends LeafExecNode with ColumnarBatchScan {
+ extends LeafExecNode {
+
+ override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
override val nodeName: String = {
relation.cacheBuilder.tableName match {
@@ -65,7 +69,7 @@ case class InMemoryTableScanExec(
* If true, get data from ColumnVector in ColumnarBatch, which are generally faster.
* If false, get data from UnsafeRow build from CachedBatch
*/
- override val supportsBatch: Boolean = {
+ override val supportsColumnar: Boolean = {
// In the initial implementation, for ease of review
// support only primitive data types and # of fields is less than wholeStageMaxNumFields
conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => f.dataType match {
@@ -75,9 +79,6 @@ case class InMemoryTableScanExec(
}) && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema)
}
- // TODO: revisit this. Shall we always turn off whole stage codegen if the output data are rows?
- override def supportCodegen: Boolean = supportsBatch
-
private val columnIndices =
attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray
@@ -108,59 +109,61 @@ case class InMemoryTableScanExec(
columnarBatch
}
+ private lazy val columnarInputRDD: RDD[ColumnarBatch] = {
+ val numOutputRows = longMetric("numOutputRows")
+ val buffers = filteredCachedBatches()
+ val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled
+ buffers
+ .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled))
+ .map(b => {
+ numOutputRows += b.numRows()
+ b
+ })
+ }
+
private lazy val inputRDD: RDD[InternalRow] = {
val buffers = filteredCachedBatches()
val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled
- if (supportsBatch) {
- // HACK ALERT: This is actually an RDD[ColumnarBatch].
- // We're taking advantage of Scala's type erasure here to pass these batches along.
- buffers
- .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled))
- .asInstanceOf[RDD[InternalRow]]
- } else {
- val numOutputRows = longMetric("numOutputRows")
+ val numOutputRows = longMetric("numOutputRows")
- if (enableAccumulatorsForTest) {
- readPartitions.setValue(0)
- readBatches.setValue(0)
- }
+ if (enableAccumulatorsForTest) {
+ readPartitions.setValue(0)
+ readBatches.setValue(0)
+ }
- // Using these variables here to avoid serialization of entire objects (if referenced
- // directly) within the map Partitions closure.
- val relOutput: AttributeSeq = relation.output
-
- filteredCachedBatches().mapPartitionsInternal { cachedBatchIterator =>
- // Find the ordinals and data types of the requested columns.
- val (requestedColumnIndices, requestedColumnDataTypes) =
- attributes.map { a =>
- relOutput.indexOf(a.exprId) -> a.dataType
- }.unzip
-
- // update SQL metrics
- val withMetrics = cachedBatchIterator.map { batch =>
- if (enableAccumulatorsForTest) {
- readBatches.add(1)
- }
- numOutputRows += batch.numRows
- batch
+ // Using these variables here to avoid serialization of entire objects (if referenced
+ // directly) within the map Partitions closure.
+ val relOutput: AttributeSeq = relation.output
+
+ filteredCachedBatches().mapPartitionsInternal { cachedBatchIterator =>
+ // Find the ordinals and data types of the requested columns.
+ val (requestedColumnIndices, requestedColumnDataTypes) =
+ attributes.map { a =>
+ relOutput.indexOf(a.exprId) -> a.dataType
+ }.unzip
+
+ // update SQL metrics
+ val withMetrics = cachedBatchIterator.map { batch =>
+ if (enableAccumulatorsForTest) {
+ readBatches.add(1)
}
+ numOutputRows += batch.numRows
+ batch
+ }
- val columnTypes = requestedColumnDataTypes.map {
- case udt: UserDefinedType[_] => udt.sqlType
- case other => other
- }.toArray
- val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
- columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray)
- if (enableAccumulatorsForTest && columnarIterator.hasNext) {
- readPartitions.add(1)
- }
- columnarIterator
+ val columnTypes = requestedColumnDataTypes.map {
+ case udt: UserDefinedType[_] => udt.sqlType
+ case other => other
+ }.toArray
+ val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
+ columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray)
+ if (enableAccumulatorsForTest && columnarIterator.hasNext) {
+ readPartitions.add(1)
}
+ columnarIterator
}
}
- override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
-
override def output: Seq[Attribute] = attributes
private def updateAttribute(expr: Expression): Expression = {
@@ -339,10 +342,10 @@ case class InMemoryTableScanExec(
}
protected override def doExecute(): RDD[InternalRow] = {
- if (supportsBatch) {
- WholeStageCodegenExec(this)(codegenStageId = 0).execute()
- } else {
- inputRDD
- }
+ inputRDD
+ }
+
+ protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ columnarInputRDD
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 3276ab5..c3cbb9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -45,7 +45,7 @@ case class BatchScanExec(
override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()
override lazy val inputRDD: RDD[InternalRow] = {
- new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
+ new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)
}
override def doCanonicalize(): BatchScanExec = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index 9ad683f..c5c902f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -23,11 +23,16 @@ import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning}
+import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
-trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
+trait DataSourceV2ScanExecBase extends LeafExecNode {
+
+ override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
def scan: Scan
@@ -52,7 +57,7 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
case _ => super.outputPartitioning
}
- override def supportsBatch: Boolean = {
+ override def supportsColumnar: Boolean = {
require(partitions.forall(readerFactory.supportColumnarReads) ||
!partitions.exists(readerFactory.supportColumnarReads),
"Cannot mix row-based and columnar input partitions.")
@@ -62,17 +67,22 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
def inputRDD: RDD[InternalRow]
- override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
+ def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
override def doExecute(): RDD[InternalRow] = {
- if (supportsBatch) {
- WholeStageCodegenExec(this)(codegenStageId = 0).execute()
- } else {
- val numOutputRows = longMetric("numOutputRows")
- inputRDD.map { r =>
- numOutputRows += 1
- r
- }
+ val numOutputRows = longMetric("numOutputRows")
+ inputRDD.map { r =>
+ numOutputRows += 1
+ r
+ }
+ }
+
+ override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numOutputRows = longMetric("numOutputRows")
+ inputRDD.asInstanceOf[RDD[ColumnarBatch]].map {
+ b =>
+ numOutputRows += b.numRows()
+ b
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
index d2e33d4..a9b0f5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala
@@ -46,6 +46,6 @@ case class MicroBatchScanExec(
override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()
override lazy val inputRDD: RDD[InternalRow] = {
- new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
+ new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 6049e89..267f255 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -832,7 +832,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
val df = spark.range(10).cache()
df.queryExecution.executedPlan.foreach {
case i: InMemoryTableScanExec =>
- assert(i.supportsBatch == vectorized && i.supportCodegen == vectorized)
+ assert(i.supportsColumnar == vectorized)
case _ =>
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index fddc4f6..b2c3868 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
-import org.apache.spark.sql.execution.{ExecSubqueryExpression, FileSourceScanExec, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -1293,7 +1293,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
checkAnswer(df, Seq(Row(0, 0), Row(2, 0)))
// need to execute the query before we can examine fs.inputRDDs()
assert(df.queryExecution.executedPlan match {
- case WholeStageCodegenExec(fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)) =>
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
+ fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _), _))) =>
partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
index b35348b..b114348 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
@@ -44,9 +44,14 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite {
}
// A scan plan tree is a plan tree that has a leaf node under zero or more Project/Filter nodes.
- private def isScanPlanTree(plan: SparkPlan): Boolean = plan match {
- case p: ProjectExec => isScanPlanTree(p.child)
- case f: FilterExec => isScanPlanTree(f.child)
+ // Because of how codegen and columnar to row transitions work, we may have InputAdaptors
+ // and ColumnarToRow transformations in the middle of it, but they will not have the tag
+ // we want, so skip them if they are the first thing we see
+ private def isScanPlanTree(plan: SparkPlan, first: Boolean): Boolean = plan match {
+ case i: InputAdapter if !first => isScanPlanTree(i.child, false)
+ case c: ColumnarToRowExec if !first => isScanPlanTree(c.child, false)
+ case p: ProjectExec => isScanPlanTree(p.child, false)
+ case f: FilterExec => isScanPlanTree(f.child, false)
case _: LeafExecNode => true
case _ => false
}
@@ -87,7 +92,7 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite {
case _: SubqueryExec | _: ReusedSubqueryExec =>
assert(plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty)
- case _ if isScanPlanTree(plan) =>
+ case _ if isScanPlanTree(plan, true) =>
// The strategies for planning scan can remove or add FilterExec/ProjectExec nodes,
// so it's not simple to check. Instead, we only check that the origin LogicalPlan
// contains the corresponding leaf node of the SparkPlan.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 9462ee1..a276e47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -121,29 +121,6 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
}
- test("cache for primitive type should be in WholeStageCodegen with InMemoryTableScanExec") {
- import testImplicits._
-
- val dsInt = spark.range(3).cache()
- dsInt.count()
- val dsIntFilter = dsInt.filter(_ > 0)
- val planInt = dsIntFilter.queryExecution.executedPlan
- assert(planInt.collect {
- case WholeStageCodegenExec(FilterExec(_, i: InMemoryTableScanExec)) if i.supportsBatch => ()
- }.length == 1)
- assert(dsIntFilter.collect() === Array(1, 2))
-
- // cache for string type is not supported for InMemoryTableScanExec
- val dsString = spark.range(3).map(_.toString).cache()
- dsString.count()
- val dsStringFilter = dsString.filter(_ == "1")
- val planString = dsStringFilter.queryExecution.executedPlan
- assert(planString.collect {
- case i: InMemoryTableScanExec if !i.supportsBatch => ()
- }.length == 1)
- assert(dsStringFilter.collect() === Array("1"))
- }
-
test("SPARK-19512 codegen for comparing structs is incorrect") {
// this would raise CompileException before the fix
spark.range(10)
@@ -213,25 +190,6 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
}
- ignore("bytecode of batch file scan exceeds the limit of WHOLESTAGE_HUGE_METHOD_LIMIT") {
- import testImplicits._
- withTempPath { dir =>
- val path = dir.getCanonicalPath
- val df = spark.range(10).select(Seq.tabulate(201) {i => ('id + i).as(s"c$i")} : _*)
- df.write.mode(SaveMode.Overwrite).parquet(path)
-
- withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "202",
- SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "2000") {
- // wide table batch scan causes the byte code of codegen exceeds the limit of
- // WHOLESTAGE_HUGE_METHOD_LIMIT
- val df2 = spark.read.parquet(path)
- val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
- assert(fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch)
- checkAnswer(df2, df)
- }
- }
- }
-
test("Control splitting consume function by operators with config") {
import testImplicits._
val df = spark.range(10).select(Seq.tabulate(2) {i => ('id + i).as(s"c$i")} : _*)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index d31e49c..466baf2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{ColumnarToRowExec, FilterExec, InputAdapter, LocalTableScanExec, WholeStageCodegenExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -486,15 +486,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val df2 = df1.where("y = 3")
val planBeforeFilter = df2.queryExecution.executedPlan.collect {
- case f: FilterExec => f.child
+ case FilterExec(_, c: ColumnarToRowExec) => c.child
+ case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(i: InputAdapter))) => i.child
}
assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec])
- val execPlan = if (codegenEnabled == "true") {
- WholeStageCodegenExec(planBeforeFilter.head)(codegenStageId = 0)
- } else {
- planBeforeFilter.head
- }
+ val execPlan = planBeforeFilter.head
assert(execPlan.executeCollectPublic().length == 0)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 7aa0ba7..a6429bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -924,14 +924,14 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
// donot return batch, because whole stage codegen is disabled for wide table (>200 columns)
val df2 = spark.read.parquet(path)
val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
- assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch)
+ assert(!fileScan2.asInstanceOf[FileSourceScanExec].supportsColumnar)
checkAnswer(df2, df)
// return batch
val columns = Seq.tabulate(9) {i => s"c$i"}
val df3 = df2.selectExpr(columns : _*)
val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
- assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsBatch)
+ assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsColumnar)
checkAnswer(df3, df.selectExpr(columns : _*))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a8d2308..b260f5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -584,19 +584,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
sql("CREATE TEMPORARY VIEW inMemoryTable AS SELECT 1 AS c1")
sql("CACHE TABLE inMemoryTable")
testSparkPlanMetrics(spark.table("inMemoryTable"), 1,
- Map(0L -> (("Scan In-memory table `inMemoryTable`", Map.empty)))
+ Map(1L -> (("Scan In-memory table `inMemoryTable`", Map.empty)))
)
sql("CREATE TEMPORARY VIEW ```a``b``` AS SELECT 2 AS c1")
sql("CACHE TABLE ```a``b```")
testSparkPlanMetrics(spark.table("```a``b```"), 1,
- Map(0L -> (("Scan In-memory table ```a``b```", Map.empty)))
+ Map(1L -> (("Scan In-memory table ```a``b```", Map.empty)))
)
}
// Show InMemoryTableScan on UI
testSparkPlanMetrics(spark.range(1).cache().select("id"), 1,
- Map(0L -> (("InMemoryTableScan", Map.empty)))
+ Map(1L -> (("InMemoryTableScan", Map.empty)))
)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index da0e553..115536d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -392,7 +392,7 @@ private[sql] trait SQLTestUtilsBase
*/
protected def stripSparkFilter(df: DataFrame): DataFrame = {
val schema = df.schema
- val withoutFilters = df.queryExecution.sparkPlan.transform {
+ val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org