You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/04 18:31:51 UTC
[spark] branch master updated: [SPARK-33988][SQL][TEST] Add an
option to enable CBO in TPCDSQueryBenchmark
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 414d323 [SPARK-33988][SQL][TEST] Add an option to enable CBO in TPCDSQueryBenchmark
414d323 is described below
commit 414d323d6c92584beb87e1c426e4beab5ddbd452
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Mon Jan 4 10:31:20 2021 -0800
[SPARK-33988][SQL][TEST] Add an option to enable CBO in TPCDSQueryBenchmark
### What changes were proposed in this pull request?
This PR intends to add a new option `--cbo` to enable CBO in TPCDSQueryBenchmark. I think this option is useful so as to monitor performance changes with CBO enabled.
### Why are the changes needed?
To monitor performance chaneges with CBO enabled.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually checked.
Closes #31011 from maropu/AddOptionForCBOInTPCDSBenchmark.
Authored-by: Takeshi Yamamuro <ya...@apache.org>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../execution/benchmark/TPCDSQueryBenchmark.scala | 39 +++++++++++++++++++---
.../benchmark/TPCDSQueryBenchmarkArguments.scala | 6 ++++
2 files changed, 41 insertions(+), 4 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index f931914..b34eac5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -19,11 +19,15 @@ package org.apache.spark.sql.execution.benchmark
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
/**
* Benchmark to measure TPCDS query performance.
@@ -38,7 +42,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
* Results will be written to "benchmarks/TPCDSQueryBenchmark-results.txt".
* }}}
*/
-object TPCDSQueryBenchmark extends SqlBasedBenchmark {
+object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging {
+
+ private lazy val warehousePath =
+ Utils.createTempDir(namePrefix = "spark-warehouse").getAbsolutePath
override def getSparkSession: SparkSession = {
val conf = new SparkConf()
@@ -50,6 +57,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark {
.set("spark.executor.memory", "3g")
.set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
.set("spark.sql.crossJoin.enabled", "true")
+ .set("spark.sql.warehouse.dir", warehousePath)
SparkSession.builder.config(conf).getOrCreate()
}
@@ -60,9 +68,14 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark {
"web_returns", "web_site", "reason", "call_center", "warehouse", "ship_mode", "income_band",
"time_dim", "web_page")
- def setupTables(dataLocation: String): Map[String, Long] = {
+ def setupTables(dataLocation: String, createTempView: Boolean): Map[String, Long] = {
tables.map { tableName =>
- spark.read.parquet(s"$dataLocation/$tableName").createOrReplaceTempView(tableName)
+ val df = spark.read.parquet(s"$dataLocation/$tableName")
+ if (createTempView) {
+ df.createOrReplaceTempView(tableName)
+ } else {
+ df.write.saveAsTable(tableName)
+ }
tableName -> spark.table(tableName).count()
}.toMap
}
@@ -146,7 +159,25 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark {
s"Empty queries to run. Bad query name filter: ${benchmarkArgs.queryFilter}")
}
- val tableSizes = setupTables(benchmarkArgs.dataLocation)
+ val tableSizes = setupTables(benchmarkArgs.dataLocation,
+ createTempView = !benchmarkArgs.cboEnabled)
+ if (benchmarkArgs.cboEnabled) {
+ spark.sql(s"SET ${SQLConf.CBO_ENABLED.key}=true")
+ spark.sql(s"SET ${SQLConf.PLAN_STATS_ENABLED.key}=true")
+ spark.sql(s"SET ${SQLConf.JOIN_REORDER_ENABLED.key}=true")
+ spark.sql(s"SET ${SQLConf.HISTOGRAM_ENABLED.key}=true")
+
+ // Analyze all the tables before running TPCDS queries
+ val startTime = System.nanoTime()
+ tables.foreach { tableName =>
+ spark.sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR ALL COLUMNS")
+ }
+ logInfo("The elapsed time to analyze all the tables is " +
+ s"${(System.nanoTime() - startTime) / NANOS_PER_SECOND.toDouble} seconds")
+ } else {
+ spark.sql(s"SET ${SQLConf.CBO_ENABLED.key}=false")
+ }
+
runTpcdsQueries(queryLocation = "tpcds", queries = queriesV1_4ToRun, tableSizes)
runTpcdsQueries(queryLocation = "tpcds-v2.7.0", queries = queriesV2_7ToRun, tableSizes,
nameSuffix = nameSuffixForQueriesV2_7)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala
index 184ffff..80a6bff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmarkArguments.scala
@@ -23,6 +23,7 @@ import java.util.Locale
class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
var dataLocation: String = null
var queryFilter: Set[String] = Set.empty
+ var cboEnabled: Boolean = false
parseArgs(args.toList)
validateArguments()
@@ -44,6 +45,10 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
queryFilter = value.toLowerCase(Locale.ROOT).split(",").map(_.trim).toSet
args = tail
+ case optName :: tail if optionMatch("--cbo", optName) =>
+ cboEnabled = true
+ args = tail
+
case _ =>
// scalastyle:off println
System.err.println("Unknown/unsupported param " + args)
@@ -60,6 +65,7 @@ class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
|Options:
| --data-location Path to TPCDS data
| --query-filter Queries to filter, e.g., q3,q5,q13
+ | --cbo Whether to enable cost-based optimization
|
|------------------------------------------------------------------------------------------------------------------
|In order to run this benchmark, please follow the instructions at
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org