You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/02 14:43:38 UTC

spark git commit: [SPARK-23312][SQL] add a config to turn off vectorized cache reader

Repository: spark
Updated Branches:
  refs/heads/master 19c7c7ebd -> b9503fcbb


[SPARK-23312][SQL] add a config to turn off vectorized cache reader

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-23309 reported a performance regression about cached table in Spark 2.3. While the investigating is still going on, this PR adds a conf to turn off the vectorized cache reader, to unblock the 2.3 release.

## How was this patch tested?

a new test

Author: Wenchen Fan <we...@databricks.com>

Closes #20483 from cloud-fan/cache.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9503fcb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9503fcb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9503fcb

Branch: refs/heads/master
Commit: b9503fcbb3f4a3ce263164d1f11a8e99b9ca5710
Parents: 19c7c7e
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Feb 2 22:43:28 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Feb 2 22:43:28 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala      |  8 ++++++++
 .../execution/columnar/InMemoryTableScanExec.scala   |  2 +-
 .../org/apache/spark/sql/CachedTableSuite.scala      | 15 +++++++++++++--
 3 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b9503fcb/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 90654e6..1e2501e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -141,6 +141,12 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val CACHE_VECTORIZED_READER_ENABLED =
+    buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
+      .doc("Enables vectorized reader for columnar caching.")
+      .booleanConf
+      .createWithDefault(true)
+
   val COLUMN_VECTOR_OFFHEAP_ENABLED =
     buildConf("spark.sql.columnVector.offheap.enabled")
       .internal()
@@ -1272,6 +1278,8 @@ class SQLConf extends Serializable with Logging {
 
   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
 
+  def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)
+
   def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
 
   def targetPostShuffleInputSize: Long =

http://git-wip-us.apache.org/repos/asf/spark/blob/b9503fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index c167f1e..e972f8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -54,7 +54,7 @@ case class InMemoryTableScanExec(
   override val supportsBatch: Boolean = {
     // In the initial implementation, for ease of review
     // support only primitive data types and # of fields is less than wholeStageMaxNumFields
-    relation.schema.fields.forall(f => f.dataType match {
+    conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => f.dataType match {
       case BooleanType | ByteType | ShortType | IntegerType | LongType |
            FloatType | DoubleType => true
       case _ => false

http://git-wip-us.apache.org/repos/asf/spark/blob/b9503fcb/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 72fe0f4..9f27fa0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -21,8 +21,6 @@ import scala.collection.mutable.HashSet
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-import org.scalatest.concurrent.Eventually._
-
 import org.apache.spark.CleanerListener
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
@@ -30,6 +28,7 @@ import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.{AccumulatorContext, Utils}
@@ -782,4 +781,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       assert(getNumInMemoryRelations(cachedDs2) == 1)
     }
   }
+
+  test("SPARK-23312: vectorized cache reader can be disabled") {
+    Seq(true, false).foreach { vectorized =>
+      withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+        val df = spark.range(10).cache()
+        df.queryExecution.executedPlan.foreach {
+          case i: InMemoryTableScanExec => assert(i.supportsBatch == vectorized)
+          case _ =>
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org