You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/02 23:12:24 UTC

spark git commit: [SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries in TPCDSQueryBenchmark

Repository: spark
Updated Branches:
  refs/heads/master 5be8aab14 -> e4c91c089


[SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries in TPCDSQueryBenchmark

## What changes were proposed in this pull request?
This pr added  the TPCDS v2.7 (latest) queries in `TPCDSQueryBenchmark`.
These query files have been added in `SPARK-23167`.

## How was this patch tested?
Manually checked.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #21177 from maropu/AddTpcdsV2_7InBenchmark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4c91c08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4c91c08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4c91c08

Branch: refs/heads/master
Commit: e4c91c089a701117af82f585d14d8afc5245fc64
Parents: 5be8aab
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Wed May 2 16:12:21 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed May 2 16:12:21 2018 -0700

----------------------------------------------------------------------
 .../benchmark/TPCDSQueryBenchmark.scala         | 52 +++++++++++++-------
 1 file changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e4c91c08/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index 69247d7..abe61a2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -58,10 +58,13 @@ object TPCDSQueryBenchmark extends Logging {
     }.toMap
   }
 
-  def tpcdsAll(dataLocation: String, queries: Seq[String]): Unit = {
-    val tableSizes = setupTables(dataLocation)
+  def runTpcdsQueries(
+      queryLocation: String,
+      queries: Seq[String],
+      tableSizes: Map[String, Long],
+      nameSuffix: String = ""): Unit = {
     queries.foreach { name =>
-      val queryString = resourceToString(s"tpcds/$name.sql",
+      val queryString = resourceToString(s"$queryLocation/$name.sql",
         classLoader = Thread.currentThread().getContextClassLoader)
 
       // This is an indirect hack to estimate the size of each query's input by traversing the
@@ -78,7 +81,7 @@ object TPCDSQueryBenchmark extends Logging {
       }
       val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
       val benchmark = new Benchmark(s"TPCDS Snappy", numRows, 5)
-      benchmark.addCase(name) { i =>
+      benchmark.addCase(s"$name$nameSuffix") { _ =>
         spark.sql(queryString).collect()
       }
       logInfo(s"\n\n===== TPCDS QUERY BENCHMARK OUTPUT FOR $name =====\n")
@@ -87,10 +90,20 @@ object TPCDSQueryBenchmark extends Logging {
     }
   }
 
+  def filterQueries(
+      origQueries: Seq[String],
+      args: TPCDSQueryBenchmarkArguments): Seq[String] = {
+    if (args.queryFilter.nonEmpty) {
+      origQueries.filter(args.queryFilter.contains)
+    } else {
+      origQueries
+    }
+  }
+
   def main(args: Array[String]): Unit = {
     val benchmarkArgs = new TPCDSQueryBenchmarkArguments(args)
 
-    // List of all TPC-DS queries
+    // List of all TPC-DS v1.4 queries
     val tpcdsQueries = Seq(
       "q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10", "q11",
       "q12", "q13", "q14a", "q14b", "q15", "q16", "q17", "q18", "q19", "q20",
@@ -103,20 +116,25 @@ object TPCDSQueryBenchmark extends Logging {
       "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
       "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
 
+    // This list only includes TPC-DS v2.7 queries that are different from v1.4 ones
+    val tpcdsQueriesV2_7 = Seq(
+      "q5a", "q6", "q10a", "q11", "q12", "q14", "q14a", "q18a",
+      "q20", "q22", "q22a", "q24", "q27a", "q34", "q35", "q35a", "q36a", "q47", "q49",
+      "q51a", "q57", "q64", "q67a", "q70a", "q72", "q74", "q75", "q77a", "q78",
+      "q80a", "q86a", "q98")
+
     // If `--query-filter` defined, filters the queries that this option selects
-    val queriesToRun = if (benchmarkArgs.queryFilter.nonEmpty) {
-      val queries = tpcdsQueries.filter { case queryName =>
-        benchmarkArgs.queryFilter.contains(queryName)
-      }
-      if (queries.isEmpty) {
-        throw new RuntimeException(
-          s"Empty queries to run. Bad query name filter: ${benchmarkArgs.queryFilter}")
-      }
-      queries
-    } else {
-      tpcdsQueries
+    val queriesV1_4ToRun = filterQueries(tpcdsQueries, benchmarkArgs)
+    val queriesV2_7ToRun = filterQueries(tpcdsQueriesV2_7, benchmarkArgs)
+
+    if ((queriesV1_4ToRun ++ queriesV2_7ToRun).isEmpty) {
+      throw new RuntimeException(
+        s"Empty queries to run. Bad query name filter: ${benchmarkArgs.queryFilter}")
     }
 
-    tpcdsAll(benchmarkArgs.dataLocation, queries = queriesToRun)
+    val tableSizes = setupTables(benchmarkArgs.dataLocation)
+    runTpcdsQueries(queryLocation = "tpcds", queries = queriesV1_4ToRun, tableSizes)
+    runTpcdsQueries(queryLocation = "tpcds-v2.7.0", queries = queriesV2_7ToRun, tableSizes,
+      nameSuffix = "-v2.7")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org