You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Herman van Hovell (JIRA)" <ji...@apache.org> on 2016/08/17 12:03:20 UTC

[jira] [Comment Edited] (SPARK-17099) Incorrect result when HAVING clause is added to group by query

    [ https://issues.apache.org/jira/browse/SPARK-17099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424356#comment-15424356 ] 

Herman van Hovell edited comment on SPARK-17099 at 8/17/16 12:02 PM:
---------------------------------------------------------------------

TL;DR: This is caused by an interaction between the optimizer's {{InferFiltersFromConstraints}} and {{EliminateOuterJoin}} rules.

If you look at the (optimized) query plan:
{noformat}
== Analyzed Logical Plan ==
sum(coalesce(int_col_5, int_col_2)): bigint, (coalesce(int_col_5, int_col_2) * 2): int
Project [sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5, int_col_2) * 2)#32]
+- Filter (sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L > cast((coalesce(int_col_5#4, int_col_2#13)#38 * 2) as bigint))
   +- Aggregate [greatest(coalesce(int_col_5#14, 109), coalesce(int_col_5#4, -449)), coalesce(int_col_5#4, int_col_2#13)], [sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5#4, int_col_2#13) * 2) AS (coalesce(int_col_5, int_col_2) * 2)#32, sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L, coalesce(int_col_5#4, int_col_2#13) AS coalesce(int_col_5#4, int_col_2#13)#38]
      +- Join RightOuter, (int_col_2#13 = int_col_5#4)
         :- SubqueryAlias t1
         :  +- Project [value#2 AS int_col_5#4]
         :     +- SerializeFromObject [input[0, int, true] AS value#2]
         :        +- ExternalRDD [obj#1]
         +- SubqueryAlias t2
            +- Project [_1#10 AS int_col_2#13, _2#11 AS int_col_5#14]
               +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1 AS _1#10, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#11]
                  +- ExternalRDD [obj#9]

== Optimized Logical Plan ==
Project [sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5, int_col_2) * 2)#32]
+- Filter (isnotnull(sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L) && (sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L > cast((coalesce(int_col_5#4, int_col_2#13)#38 * 2) as bigint)))
   +- Aggregate [greatest(coalesce(int_col_5#14, 109), coalesce(int_col_5#4, -449)), coalesce(int_col_5#4, int_col_2#13)], [sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5#4, int_col_2#13) * 2) AS (coalesce(int_col_5, int_col_2) * 2)#32, sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L, coalesce(int_col_5#4, int_col_2#13) AS coalesce(int_col_5#4, int_col_2#13)#38]
      +- Join Inner, (isnotnull(coalesce(int_col_5#4, int_col_2#13)) && (int_col_2#13 = int_col_5#4))
         :- Project [value#2 AS int_col_5#4]
         :  +- Filter (isnotnull(value#2) && isnotnull(coalesce(value#2, value#2)))
         :     +- SerializeFromObject [input[0, int, true] AS value#2]
         :        +- ExternalRDD [obj#1]
         +- Project [_1#10 AS int_col_2#13, _2#11 AS int_col_5#14]
            +- Filter (isnotnull(coalesce(_1#10, _1#10)) && isnotnull(_1#10))
               +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1 AS _1#10, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#11]
                  +- ExternalRDD [obj#9]
{noformat}

{{InferFiltersFromConstraints}} infers {{is not null}} constraints which are pushed down into the plan. The {{EliminateOuterJoin}} rule then eliminates the Right Outer join because join condition can not be null anymore. Disabling the one of the rules gives the correct answer.

This is not trivial to fix. We probably should not change the nullability of a column because of constraint propagation.


was (Author: hvanhovell):
TL;DR: This is caused by a bug in the optimizer's {{InferFiltersFromConstraints}} rule.

If you look at the (optimized) query plan:
{noformat}
== Analyzed Logical Plan ==
sum(coalesce(int_col_5, int_col_2)): bigint, (coalesce(int_col_5, int_col_2) * 2): int
Project [sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5, int_col_2) * 2)#32]
+- Filter (sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L > cast((coalesce(int_col_5#4, int_col_2#13)#38 * 2) as bigint))
   +- Aggregate [greatest(coalesce(int_col_5#14, 109), coalesce(int_col_5#4, -449)), coalesce(int_col_5#4, int_col_2#13)], [sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5#4, int_col_2#13) * 2) AS (coalesce(int_col_5, int_col_2) * 2)#32, sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L, coalesce(int_col_5#4, int_col_2#13) AS coalesce(int_col_5#4, int_col_2#13)#38]
      +- Join RightOuter, (int_col_2#13 = int_col_5#4)
         :- SubqueryAlias t1
         :  +- Project [value#2 AS int_col_5#4]
         :     +- SerializeFromObject [input[0, int, true] AS value#2]
         :        +- ExternalRDD [obj#1]
         +- SubqueryAlias t2
            +- Project [_1#10 AS int_col_2#13, _2#11 AS int_col_5#14]
               +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1 AS _1#10, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#11]
                  +- ExternalRDD [obj#9]

== Optimized Logical Plan ==
Project [sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5, int_col_2) * 2)#32]
+- Filter (isnotnull(sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L) && (sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L > cast((coalesce(int_col_5#4, int_col_2#13)#38 * 2) as bigint)))
   +- Aggregate [greatest(coalesce(int_col_5#14, 109), coalesce(int_col_5#4, -449)), coalesce(int_col_5#4, int_col_2#13)], [sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(coalesce(int_col_5, int_col_2))#34L, (coalesce(int_col_5#4, int_col_2#13) * 2) AS (coalesce(int_col_5, int_col_2) * 2)#32, sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint)) AS sum(cast(coalesce(int_col_5#4, int_col_2#13) as bigint))#37L, coalesce(int_col_5#4, int_col_2#13) AS coalesce(int_col_5#4, int_col_2#13)#38]
      +- Join Inner, (isnotnull(coalesce(int_col_5#4, int_col_2#13)) && (int_col_2#13 = int_col_5#4))
         :- Project [value#2 AS int_col_5#4]
         :  +- Filter (isnotnull(value#2) && isnotnull(coalesce(value#2, value#2)))
         :     +- SerializeFromObject [input[0, int, true] AS value#2]
         :        +- ExternalRDD [obj#1]
         +- Project [_1#10 AS int_col_2#13, _2#11 AS int_col_5#14]
            +- Filter (isnotnull(coalesce(_1#10, _1#10)) && isnotnull(_1#10))
               +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._1 AS _1#10, assertnotnull(input[0, scala.Tuple2, true], top level non-flat input object)._2 AS _2#11]
                  +- ExternalRDD [obj#9]
{noformat}

It seems that it is pushing down a not null predicate through an outer join (which kinda defeats the whole idea of an outer join). Disabling the {{InferFiltersFromConstraints}} rule gives the correct answer.

> Incorrect result when HAVING clause is added to group by query
> --------------------------------------------------------------
>
>                 Key: SPARK-17099
>                 URL: https://issues.apache.org/jira/browse/SPARK-17099
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.1.0
>            Reporter: Josh Rosen
>            Priority: Critical
>             Fix For: 2.1.0
>
>
> Random query generation uncovered the following query which returns incorrect results when run on Spark SQL. This wasn't the original query uncovered by the generator, since I performed a bit of minimization to try to make it more understandable.
> With the following tables:
> {code}
> val t1 = sc.parallelize(Seq(-234, 145, 367, 975, 298)).toDF("int_col_5")
> val t2 = sc.parallelize(
>   Seq(
>     (-769, -244),
>     (-800, -409),
>     (940, 86),
>     (-507, 304),
>     (-367, 158))
> ).toDF("int_col_2", "int_col_5")
> t1.registerTempTable("t1")
> t2.registerTempTable("t2")
> {code}
> Run
> {code}
> SELECT
>   (SUM(COALESCE(t1.int_col_5, t2.int_col_2))),
>      ((COALESCE(t1.int_col_5, t2.int_col_2)) * 2)
> FROM t1
> RIGHT JOIN t2
>   ON (t2.int_col_2) = (t1.int_col_5)
> GROUP BY GREATEST(COALESCE(t2.int_col_5, 109), COALESCE(t1.int_col_5, -449)),
>          COALESCE(t1.int_col_5, t2.int_col_2)
> HAVING (SUM(COALESCE(t1.int_col_5, t2.int_col_2))) > ((COALESCE(t1.int_col_5, t2.int_col_2)) * 2)
> {code}
> In Spark SQL, this returns an empty result set, whereas Postgres returns four rows. However, if I omit the {{HAVING}} clause I see that the group's rows are being incorrectly filtered by the {{HAVING}} clause:
> {code}
> +--------------------------------------+---------------------------------------+--+
> | sum(coalesce(int_col_5, int_col_2))  | (coalesce(int_col_5, int_col_2) * 2)  |
> +--------------------------------------+---------------------------------------+--+
> | -507                                 | -1014                                 |
> | 940                                  | 1880                                  |
> | -769                                 | -1538                                 |
> | -367                                 | -734                                  |
> | -800                                 | -1600                                 |
> +--------------------------------------+---------------------------------------+--+
> {code}
> Based on this, the output after adding the {{HAVING}} should contain four rows, not zero.
> I'm not sure how to further shrink this in a straightforward way, so I'm opening this bug to get help in triaging further.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org