You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/25 05:33:47 UTC
spark git commit: [SPARK-22348][SQL] The table cache providing
ColumnarBatch should also do partition batch pruning
Repository: spark
Updated Branches:
refs/heads/master 3f5ba968c -> bc1e76632
[SPARK-22348][SQL] The table cache providing ColumnarBatch should also do partition batch pruning
## What changes were proposed in this pull request?
We enable table cache `InMemoryTableScanExec` to provide `ColumnarBatch` now. But the cached batches are retrieved without pruning. In this case, we still need to do partition batch pruning.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #19569 from viirya/SPARK-22348.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc1e7663
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc1e7663
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc1e7663
Branch: refs/heads/master
Commit: bc1e76632ddec8fc64726086905183d1f312bca4
Parents: 3f5ba96
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Wed Oct 25 06:33:44 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Oct 25 06:33:44 2017 +0100
----------------------------------------------------------------------
.../columnar/InMemoryTableScanExec.scala | 70 +++++++++++---------
.../columnar/InMemoryColumnarQuerySuite.scala | 27 +++++++-
2 files changed, 64 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/bc1e7663/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 43386e7..2ae3f35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -78,7 +78,7 @@ case class InMemoryTableScanExec(
override def inputRDDs(): Seq[RDD[InternalRow]] = {
assert(supportCodegen)
- val buffers = relation.cachedColumnBuffers
+ val buffers = filteredCachedBatches()
// HACK ALERT: This is actually an RDD[ColumnarBatch].
// We're taking advantage of Scala's type erasure here to pass these batches along.
Seq(buffers.map(createAndDecompressColumn(_)).asInstanceOf[RDD[InternalRow]])
@@ -180,19 +180,11 @@ case class InMemoryTableScanExec(
private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
- protected override def doExecute(): RDD[InternalRow] = {
- val numOutputRows = longMetric("numOutputRows")
-
- if (enableAccumulators) {
- readPartitions.setValue(0)
- readBatches.setValue(0)
- }
-
+ private def filteredCachedBatches(): RDD[CachedBatch] = {
// Using these variables here to avoid serialization of entire objects (if referenced directly)
// within the map Partitions closure.
val schema = relation.partitionStatistics.schema
val schemaIndex = schema.zipWithIndex
- val relOutput: AttributeSeq = relation.output
val buffers = relation.cachedColumnBuffers
buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) =>
@@ -201,35 +193,49 @@ case class InMemoryTableScanExec(
schema)
partitionFilter.initialize(index)
+ // Do partition batch pruning if enabled
+ if (inMemoryPartitionPruningEnabled) {
+ cachedBatchIterator.filter { cachedBatch =>
+ if (!partitionFilter.eval(cachedBatch.stats)) {
+ logDebug {
+ val statsString = schemaIndex.map { case (a, i) =>
+ val value = cachedBatch.stats.get(i, a.dataType)
+ s"${a.name}: $value"
+ }.mkString(", ")
+ s"Skipping partition based on stats $statsString"
+ }
+ false
+ } else {
+ true
+ }
+ }
+ } else {
+ cachedBatchIterator
+ }
+ }
+ }
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+
+ if (enableAccumulators) {
+ readPartitions.setValue(0)
+ readBatches.setValue(0)
+ }
+
+ // Using these variables here to avoid serialization of entire objects (if referenced directly)
+ // within the map Partitions closure.
+ val relOutput: AttributeSeq = relation.output
+
+ filteredCachedBatches().mapPartitionsInternal { cachedBatchIterator =>
// Find the ordinals and data types of the requested columns.
val (requestedColumnIndices, requestedColumnDataTypes) =
attributes.map { a =>
relOutput.indexOf(a.exprId) -> a.dataType
}.unzip
- // Do partition batch pruning if enabled
- val cachedBatchesToScan =
- if (inMemoryPartitionPruningEnabled) {
- cachedBatchIterator.filter { cachedBatch =>
- if (!partitionFilter.eval(cachedBatch.stats)) {
- logDebug {
- val statsString = schemaIndex.map { case (a, i) =>
- val value = cachedBatch.stats.get(i, a.dataType)
- s"${a.name}: $value"
- }.mkString(", ")
- s"Skipping partition based on stats $statsString"
- }
- false
- } else {
- true
- }
- }
- } else {
- cachedBatchIterator
- }
-
// update SQL metrics
- val withMetrics = cachedBatchesToScan.map { batch =>
+ val withMetrics = cachedBatchIterator.map { batch =>
if (enableAccumulators) {
readBatches.add(1)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/bc1e7663/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 2f249c8..e662e29 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.LocalTableScanExec
+import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, WholeStageCodegenExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -454,4 +454,29 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
Seq(In(attribute, Nil)), testRelation)
assert(tableScanExec.partitionFilters.isEmpty)
}
+
+ test("SPARK-22348: table cache should do partition batch pruning") {
+ Seq("true", "false").foreach { enabled =>
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> enabled) {
+ val df1 = Seq((1, 1), (1, 1), (2, 2)).toDF("x", "y")
+ df1.unpersist()
+ df1.cache()
+
+ // Push predicate to the cached table.
+ val df2 = df1.where("y = 3")
+
+ val planBeforeFilter = df2.queryExecution.executedPlan.collect {
+ case f: FilterExec => f.child
+ }
+ assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec])
+
+ val execPlan = if (enabled == "true") {
+ WholeStageCodegenExec(planBeforeFilter.head)
+ } else {
+ planBeforeFilter.head
+ }
+ assert(execPlan.executeCollectPublic().length == 0)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org