You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/30 04:36:56 UTC

spark git commit: [SPARK-22122][SQL] Use analyzed logical plans to count input rows in TPCDSQueryBenchmark

Repository: spark
Updated Branches:
  refs/heads/master 530fe6832 -> c6610a997


[SPARK-22122][SQL] Use analyzed logical plans to count input rows in TPCDSQueryBenchmark

## What changes were proposed in this pull request?
Since the current code ignores WITH clauses to check input relations in TPCDS queries, this leads to inaccurate per-row processing time for benchmark results. For example, in `q2`, this fix could catch all the input relations: `web_sales`, `date_dim`, and `catalog_sales` (the current code catches `date_dim` only). The one-third of the TPCDS queries uses WITH clauses, so I think it is worth fixing this.

## How was this patch tested?
Manually checked.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #19344 from maropu/RespectWithInTPCDSBench.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6610a99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6610a99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6610a99

Branch: refs/heads/master
Commit: c6610a997f69148a1f1bbf69360e8f39e24cb70a
Parents: 530fe68
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Fri Sep 29 21:36:52 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Sep 29 21:36:52 2017 -0700

----------------------------------------------------------------------
 .../benchmark/TPCDSQueryBenchmark.scala         | 32 +++++++-------------
 1 file changed, 11 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6610a99/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index 99c6df7..69247d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -20,11 +20,10 @@ package org.apache.spark.sql.execution.benchmark
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.util.Benchmark
 
 /**
@@ -66,24 +65,15 @@ object TPCDSQueryBenchmark extends Logging {
         classLoader = Thread.currentThread().getContextClassLoader)
 
       // This is an indirect hack to estimate the size of each query's input by traversing the
-      // logical plan and adding up the sizes of all tables that appear in the plan. Note that this
-      // currently doesn't take WITH subqueries into account which might lead to fairly inaccurate
-      // per-row processing time for those cases.
+      // logical plan and adding up the sizes of all tables that appear in the plan.
       val queryRelations = scala.collection.mutable.HashSet[String]()
-      spark.sql(queryString).queryExecution.logical.map {
-        case UnresolvedRelation(t: TableIdentifier) =>
-          queryRelations.add(t.table)
-        case lp: LogicalPlan =>
-          lp.expressions.foreach { _ foreach {
-            case subquery: SubqueryExpression =>
-              subquery.plan.foreach {
-                case UnresolvedRelation(t: TableIdentifier) =>
-                  queryRelations.add(t.table)
-                case _ =>
-              }
-            case _ =>
-          }
-        }
+      spark.sql(queryString).queryExecution.analyzed.foreach {
+        case SubqueryAlias(alias, _: LogicalRelation) =>
+          queryRelations.add(alias)
+        case LogicalRelation(_, _, Some(catalogTable), _) =>
+          queryRelations.add(catalogTable.identifier.table)
+        case HiveTableRelation(tableMeta, _, _) =>
+          queryRelations.add(tableMeta.identifier.table)
         case _ =>
       }
       val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org