You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/30 04:36:56 UTC
spark git commit: [SPARK-22122][SQL] Use analyzed logical plans to
count input rows in TPCDSQueryBenchmark
Repository: spark
Updated Branches:
refs/heads/master 530fe6832 -> c6610a997
[SPARK-22122][SQL] Use analyzed logical plans to count input rows in TPCDSQueryBenchmark
## What changes were proposed in this pull request?
Since the current code ignores WITH clauses to check input relations in TPCDS queries, this leads to inaccurate per-row processing time for benchmark results. For example, in `q2`, this fix could catch all the input relations: `web_sales`, `date_dim`, and `catalog_sales` (the current code catches `date_dim` only). The one-third of the TPCDS queries uses WITH clauses, so I think it is worth fixing this.
## How was this patch tested?
Manually checked.
Author: Takeshi Yamamuro <ya...@apache.org>
Closes #19344 from maropu/RespectWithInTPCDSBench.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6610a99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6610a99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6610a99
Branch: refs/heads/master
Commit: c6610a997f69148a1f1bbf69360e8f39e24cb70a
Parents: 530fe68
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Fri Sep 29 21:36:52 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Sep 29 21:36:52 2017 -0700
----------------------------------------------------------------------
.../benchmark/TPCDSQueryBenchmark.scala | 32 +++++++-------------
1 file changed, 11 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c6610a99/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index 99c6df7..69247d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -20,11 +20,10 @@ package org.apache.spark.sql.execution.benchmark
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.util.Benchmark
/**
@@ -66,24 +65,15 @@ object TPCDSQueryBenchmark extends Logging {
classLoader = Thread.currentThread().getContextClassLoader)
// This is an indirect hack to estimate the size of each query's input by traversing the
- // logical plan and adding up the sizes of all tables that appear in the plan. Note that this
- // currently doesn't take WITH subqueries into account which might lead to fairly inaccurate
- // per-row processing time for those cases.
+ // logical plan and adding up the sizes of all tables that appear in the plan.
val queryRelations = scala.collection.mutable.HashSet[String]()
- spark.sql(queryString).queryExecution.logical.map {
- case UnresolvedRelation(t: TableIdentifier) =>
- queryRelations.add(t.table)
- case lp: LogicalPlan =>
- lp.expressions.foreach { _ foreach {
- case subquery: SubqueryExpression =>
- subquery.plan.foreach {
- case UnresolvedRelation(t: TableIdentifier) =>
- queryRelations.add(t.table)
- case _ =>
- }
- case _ =>
- }
- }
+ spark.sql(queryString).queryExecution.analyzed.foreach {
+ case SubqueryAlias(alias, _: LogicalRelation) =>
+ queryRelations.add(alias)
+ case LogicalRelation(_, _, Some(catalogTable), _) =>
+ queryRelations.add(catalogTable.identifier.table)
+ case HiveTableRelation(tableMeta, _, _) =>
+ queryRelations.add(tableMeta.identifier.table)
case _ =>
}
val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org