You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/28 19:43:58 UTC

spark git commit: [SPARK-14156][SQL] Use executedPlan in HiveComparisonTest for the messages of computed tables

Repository: spark
Updated Branches:
  refs/heads/master 4a7636f2d -> 1528ff4c9


[SPARK-14156][SQL] Use executedPlan in HiveComparisonTest for the messages of computed tables

## What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-14156

In HiveComparisonTest, when catalyst results are different to hive results, we will collect the messages for computed tables during the test. During creating the message, we use sparkPlan. But we actually run the query with executedPlan. So the error message is sometimes confusing.

For example, as wholestage codegen is enabled by default now. The shown spark plan for computed tables is the plan before wholestage codegen.

A concrete is the following error message shown before this patch. It is the error shown when running `HiveCompatibilityTest` `auto_join26`.

auto_join26 has one SQL to create table:

    INSERT OVERWRITE TABLE dest_j1
    SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;   (1)

Then a SQL to retrieve the result:

    select * from dest_j1 x order by x.key;   (2)

When the above SQL (2) to retrieve the result fails, In `HiveComparisonTest` we will try to collect and show the generated data from table `dest_j1` using the SQL (1)'s spark plan. The you will see this error:

    TungstenAggregate(key=[key#8804], functions=[(count(1),mode=Partial,isDistinct=false)], output=[key#8804,count#8834L])
    +- Project [key#8804]
       +- BroadcastHashJoin [key#8804], [key#8806], Inner, BuildRight, None
          :- Filter isnotnull(key#8804)
          :  +- InMemoryColumnarTableScan [key#8804], [isnotnull(key#8804)], InMemoryRelation [key#8804,value#8805], true, 5, StorageLevel(true, true, false, true, 1), HiveTableScan [key#8717,value#8718], MetastoreRelation default, src1, None, Some(src1)
          +- Filter isnotnull(key#8806)
             +- InMemoryColumnarTableScan [key#8806], [isnotnull(key#8806)], InMemoryRelation [key#8806,value#8807], true, 5, StorageLevel(true, true, false, true, 1), HiveTableScan [key#8760,value#8761], MetastoreRelation default, src, None, Some(src)

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:82)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:121)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:121)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:140)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:137)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:120)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:87)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:82)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
	... 70 more
    Caused by: java.lang.UnsupportedOperationException: Filter does not implement doExecuteBroadcast
	at org.apache.spark.sql.execution.SparkPlan.doExecuteBroadcast(SparkPlan.scala:221)

The message is confusing because it is not the plan actually run by SparkSQL engine to create the generated table. The plan actually run is no problem. But as before this patch, we run `e.sparkPlan.collect` to retrieve and show the generated data, spark plan is not the plan we can run. So the above error will be shown.

After this patch, we won't see the error because the executed plan is no problem and works.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh <si...@tw.ibm.com>

Closes #11957 from viirya/use-executedplan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1528ff4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1528ff4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1528ff4c

Branch: refs/heads/master
Commit: 1528ff4c9affe1df103c4b3abd56a86c71d8b753
Parents: 4a7636f
Author: Liang-Chi Hsieh <si...@tw.ibm.com>
Authored: Mon Mar 28 10:43:54 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Mar 28 10:43:54 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/hive/execution/HiveComparisonTest.scala   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1528ff4c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index cfca93b..4c1b425 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -480,7 +480,11 @@ abstract class HiveComparisonTest
                 val executions = queryList.map(new TestHive.QueryExecution(_))
                 executions.foreach(_.toRdd)
                 val tablesGenerated = queryList.zip(executions).flatMap {
-                  case (q, e) => e.sparkPlan.collect {
+                  // We should take executedPlan instead of sparkPlan, because in following codes we
+                  // will run the collected plans. As we will do extra processing for sparkPlan such
+                  // as adding exchage, collapsing codegen stages, etc., collecing sparkPlan here
+                  // will cause some errors when running these plans later.
+                  case (q, e) => e.executedPlan.collect {
                     case i: InsertIntoHiveTable if tablesRead contains i.table.tableName =>
                       (q, e, i)
                   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org