You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/04 18:31:51 UTC

[spark] branch master updated: [SPARK-33988][SQL][TEST] Add an option to enable CBO in TPCDSQueryBenchmark

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 414d323  [SPARK-33988][SQL][TEST] Add an option to enable CBO in TPCDSQueryBenchmark
414d323 is described below

commit 414d323d6c92584beb87e1c426e4beab5ddbd452
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Mon Jan 4 10:31:20 2021 -0800

    [SPARK-33988][SQL][TEST] Add an option to enable CBO in TPCDSQueryBenchmark
    
    ### What changes were proposed in this pull request?
    
    This PR intends to add a new option `--cbo` to enable CBO in TPCDSQueryBenchmark. I think this option is useful so as to monitor performance changes with CBO enabled.
    
    ### Why are the changes needed?
    
    To monitor performance chaneges with CBO enabled.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually checked.
    
    Closes #31011 from maropu/AddOptionForCBOInTPCDSBenchmark.
    
    Authored-by: Takeshi Yamamuro <ya...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../execution/benchmark/TPCDSQueryBenchmark.scala  | 39 +++++++++++++++++++---
 .../benchmark/TPCDSQueryBenchmarkArguments.scala   |  6 ++++
 2 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index f931914..b34eac5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -19,11 +19,15 @@ package org.apache.spark.sql.execution.benchmark
 
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 /**
  * Benchmark to measure TPCDS query performance.
@@ -38,7 +42,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
  *      Results will be written to "benchmarks/TPCDSQueryBenchmark-results.txt".
  * }}}
  */
-object TPCDSQueryBenchmark extends SqlBasedBenchmark {
+object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging {
+
+  private lazy val warehousePath =
+    Utils.createTempDir(namePrefix = "spark-warehouse").getAbsolutePath
 
   override def getSparkSession: SparkSession = {
     val conf = new SparkConf()
@@ -50,6 +57,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark {
       .set("spark.executor.memory", "3g")
       .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
       .set("spark.sql.crossJoin.enabled", "true")
+      .set("spark.sql.warehouse.dir", warehousePath)
 
     SparkSession.builder.config(conf).getOrCreate()
   }
@@ -60,9 +68,14 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark {
     "web_returns", "web_site", "reason", "call_center", "warehouse", "ship_mode", "income_band",
     "time_dim", "web_page")
 
-  def setupTables(dataLocation: String): Map[String, Long] = {
+  def setupTables(dataLocation: String, createTempView: Boolean): Map[String, Long] = {
     tables.map { tableName =>
-      spark.read.parquet(s"$dataLocation/$tableName").createOrReplaceTempView(tableName)
+      val df = spark.read.parquet(s"$dataLocation/$tableName")
+      if (createTempView) {
+        df.createOrReplaceTempView(tableName)
+      } else {
+        df.write.saveAsTable(tableName)
+      }
       tableName -> spark.table(tableName).count()
     }.toMap
   }
@@ -146,7 +159,25 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark {
         s"Empty queries to run. Bad query name filter: ${benchmarkArgs.queryFilter}")
     }
 
-    val tableSizes = setupTables(benchmarkArgs.dataLocation)
+    val tableSizes = setupTables(benchmarkArgs.dataLocation,
+      createTempView = !benchmarkArgs.cboEnabled)
+    if (benchmarkArgs.cboEnabled) {
+      spark.sql(s"SET ${SQLConf.CBO_ENABLED.key}=true")
+      spark.sql(s"SET ${SQLConf.PLAN_STATS_ENABLED.key}=true")
+      spark.sql(s"SET ${SQLConf.JOIN_REORDER_ENABLED.key}=true")
+      spark.sql(s"SET ${SQLConf.HISTOGRAM_ENABLED.key}=true")
+
+      // Analyze all the tables before running TPCDS queries
+      val startTime = System.nanoTime()
+      tables.foreach { tableName =>
+        spark.sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR ALL COLUMNS")
+      }
+      logInfo("The elapsed time to analyze all the tables is " +
+        s"${(System.nanoTime() - startTime) / NANOS_PER_SECOND.toDouble} seconds")
+    } else {
+      spark.sql(s"SET ${SQLConf.CBO_ENABLED.key}=false")
+    }
+
     runTpcdsQueries(queryLocation = "tpcds", queries = queriesV1_4ToRun, tableSizes)
     runTpcdsQueries(queryLocation = "tpcds-v2.7.0", queries = queriesV2_7ToRun, tableSizes,
       nameSuffix = nameSuffixForQueriesV2_7)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala
index 184ffff..80a6bff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala
@@ -23,6 +23,7 @@ import java.util.Locale
 class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
   var dataLocation: String = null
   var queryFilter: Set[String] = Set.empty
+  var cboEnabled: Boolean = false
 
   parseArgs(args.toList)
   validateArguments()
@@ -44,6 +45,10 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
           queryFilter = value.toLowerCase(Locale.ROOT).split(",").map(_.trim).toSet
           args = tail
 
+        case optName :: tail if optionMatch("--cbo", optName) =>
+          cboEnabled = true
+          args = tail
+
         case _ =>
           // scalastyle:off println
           System.err.println("Unknown/unsupported param " + args)
@@ -60,6 +65,7 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
       |Options:
       |  --data-location      Path to TPCDS data
       |  --query-filter       Queries to filter, e.g., q3,q5,q13
+      |  --cbo                Whether to enable cost-based optimization
       |
       |------------------------------------------------------------------------------------------------------------------
       |In order to run this benchmark, please follow the instructions at


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org