You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/25 05:33:47 UTC

spark git commit: [SPARK-22348][SQL] The table cache providing ColumnarBatch should also do partition batch pruning

Repository: spark
Updated Branches:
  refs/heads/master 3f5ba968c -> bc1e76632


[SPARK-22348][SQL] The table cache providing ColumnarBatch should also do partition batch pruning

## What changes were proposed in this pull request?

We enable table cache `InMemoryTableScanExec` to provide `ColumnarBatch` now. But the cached batches are retrieved without pruning. In this case, we still need to do partition batch pruning.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #19569 from viirya/SPARK-22348.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc1e7663
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc1e7663
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc1e7663

Branch: refs/heads/master
Commit: bc1e76632ddec8fc64726086905183d1f312bca4
Parents: 3f5ba96
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Wed Oct 25 06:33:44 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Oct 25 06:33:44 2017 +0100

----------------------------------------------------------------------
 .../columnar/InMemoryTableScanExec.scala        | 70 +++++++++++---------
 .../columnar/InMemoryColumnarQuerySuite.scala   | 27 +++++++-
 2 files changed, 64 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc1e7663/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 43386e7..2ae3f35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -78,7 +78,7 @@ case class InMemoryTableScanExec(
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
     assert(supportCodegen)
-    val buffers = relation.cachedColumnBuffers
+    val buffers = filteredCachedBatches()
     // HACK ALERT: This is actually an RDD[ColumnarBatch].
     // We're taking advantage of Scala's type erasure here to pass these batches along.
     Seq(buffers.map(createAndDecompressColumn(_)).asInstanceOf[RDD[InternalRow]])
@@ -180,19 +180,11 @@ case class InMemoryTableScanExec(
 
   private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
 
-  protected override def doExecute(): RDD[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
-
-    if (enableAccumulators) {
-      readPartitions.setValue(0)
-      readBatches.setValue(0)
-    }
-
+  private def filteredCachedBatches(): RDD[CachedBatch] = {
     // Using these variables here to avoid serialization of entire objects (if referenced directly)
     // within the map Partitions closure.
     val schema = relation.partitionStatistics.schema
     val schemaIndex = schema.zipWithIndex
-    val relOutput: AttributeSeq = relation.output
     val buffers = relation.cachedColumnBuffers
 
     buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) =>
@@ -201,35 +193,49 @@ case class InMemoryTableScanExec(
         schema)
       partitionFilter.initialize(index)
 
+      // Do partition batch pruning if enabled
+      if (inMemoryPartitionPruningEnabled) {
+        cachedBatchIterator.filter { cachedBatch =>
+          if (!partitionFilter.eval(cachedBatch.stats)) {
+            logDebug {
+              val statsString = schemaIndex.map { case (a, i) =>
+                val value = cachedBatch.stats.get(i, a.dataType)
+                s"${a.name}: $value"
+              }.mkString(", ")
+              s"Skipping partition based on stats $statsString"
+            }
+            false
+          } else {
+            true
+          }
+        }
+      } else {
+        cachedBatchIterator
+      }
+    }
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+
+    if (enableAccumulators) {
+      readPartitions.setValue(0)
+      readBatches.setValue(0)
+    }
+
+    // Using these variables here to avoid serialization of entire objects (if referenced directly)
+    // within the map Partitions closure.
+    val relOutput: AttributeSeq = relation.output
+
+    filteredCachedBatches().mapPartitionsInternal { cachedBatchIterator =>
       // Find the ordinals and data types of the requested columns.
       val (requestedColumnIndices, requestedColumnDataTypes) =
         attributes.map { a =>
           relOutput.indexOf(a.exprId) -> a.dataType
         }.unzip
 
-      // Do partition batch pruning if enabled
-      val cachedBatchesToScan =
-        if (inMemoryPartitionPruningEnabled) {
-          cachedBatchIterator.filter { cachedBatch =>
-            if (!partitionFilter.eval(cachedBatch.stats)) {
-              logDebug {
-                val statsString = schemaIndex.map { case (a, i) =>
-                  val value = cachedBatch.stats.get(i, a.dataType)
-                  s"${a.name}: $value"
-                }.mkString(", ")
-                s"Skipping partition based on stats $statsString"
-              }
-              false
-            } else {
-              true
-            }
-          }
-        } else {
-          cachedBatchIterator
-        }
-
       // update SQL metrics
-      val withMetrics = cachedBatchesToScan.map { batch =>
+      val withMetrics = cachedBatchIterator.map { batch =>
         if (enableAccumulators) {
           readBatches.add(1)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1e7663/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 2f249c8..e662e29 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}
 import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.LocalTableScanExec
+import org.apache.spark.sql.execution.{FilterExec, LocalTableScanExec, WholeStageCodegenExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -454,4 +454,29 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
       Seq(In(attribute, Nil)), testRelation)
     assert(tableScanExec.partitionFilters.isEmpty)
   }
+
+  test("SPARK-22348: table cache should do partition batch pruning") {
+    Seq("true", "false").foreach { enabled =>
+      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> enabled) {
+        val df1 = Seq((1, 1), (1, 1), (2, 2)).toDF("x", "y")
+        df1.unpersist()
+        df1.cache()
+
+        // Push predicate to the cached table.
+        val df2 = df1.where("y = 3")
+
+        val planBeforeFilter = df2.queryExecution.executedPlan.collect {
+          case f: FilterExec => f.child
+        }
+        assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec])
+
+        val execPlan = if (enabled == "true") {
+          WholeStageCodegenExec(planBeforeFilter.head)
+        } else {
+          planBeforeFilter.head
+        }
+        assert(execPlan.executeCollectPublic().length == 0)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org