You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@impala.apache.org by "Alexander Behm (JIRA)" <ji...@apache.org> on 2017/08/26 00:45:00 UTC

[jira] [Created] (IMPALA-5850) Partitioned hash join inside union may return wrong results

Alexander Behm created IMPALA-5850:
--------------------------------------

             Summary: Partitioned hash join inside union may return wrong results
                 Key: IMPALA-5850
                 URL: https://issues.apache.org/jira/browse/IMPALA-5850
             Project: IMPALA
          Issue Type: Bug
          Components: Frontend
    Affects Versions: Impala 2.8.0, Impala 2.3.0, Impala 2.5.0, Impala 2.4.0, Impala 2.6.0, Impala 2.7.0, Impala 2.9.0, Impala 2.10.0
            Reporter: Alexander Behm
            Assignee: Alexander Behm
            Priority: Blocker


Impala may return wrong results for plans that have a partitioned join inside a union.

*Affected queries*
* plan has a partitioned join inside a union
* tables must have stats - otherwise a partitioned join would not be chosen
* for at least one equi-join condition, the left-hand side and right-hand side join keys have different types

*Reproduction*
Setup:
{code}
create table a (id int);
insert into a values (1),(2),(3),(4);
insert into a values (5),(6),(7),(8);
compute stats a;

create table b (id bigint);
insert into b values (1),(2),(3),(4);
insert into b values (5),(6),(7),(8);
compute stats b;
{code}

Query that returns correct results:
{code}
select v.id from
(select distinct id from a) v join b
 on v.id = b.id

+----+
| id |
+----+
| 1  |
| 2  |
| 3  |
| 4  |
| 5  |
| 6  |
| 7  |
| 8  |
+----+
Fetched 8 row(s) in 0.20s
{code}

Query that returns wrong results:
{code}
select null from a limit 0
union all
select v.id from
(select distinct id from a) v join b
 on v.id = b.id

+------+
| null |
+------+
| 3    |
| 5    |
| 6    |
| 7    |
| 8    |
+------+
Fetched 5 row(s) in 0.12s

Plan:
+--------------------------------------------------+
| Explain String                                   |
+--------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=3.88MB |
| Per-Host Resource Estimates: Memory=85.94MB      |
| Codegen disabled by planner                      |
|                                                  |
| PLAN-ROOT SINK                                   |
| |                                                |
| 08:EXCHANGE [UNPARTITIONED]                      |
| |                                                |
| 00:UNION                                         |
| |                                                |
| 04:HASH JOIN [INNER JOIN, PARTITIONED]      <--- Partitioned join inside union     |
| |  hash predicates: b.id = id                    |
| |  runtime filters: RF000 <- id                  |
| |                                                |
| |--06:AGGREGATE [FINALIZE]                       |
| |  |  group by: id                               |
| |  |                                             |
| |  05:EXCHANGE [HASH(id)]                        |
| |  |                                             |
| |  02:AGGREGATE [STREAMING]                      |
| |  |  group by: id                               |
| |  |                                             |
| |  01:SCAN HDFS [default.a]                      |
| |     partitions=1/1 files=2 size=16B            |
| |                                                |
| 07:EXCHANGE [HASH(b.id)]                         |
| |                                                |
| 03:SCAN HDFS [default.b]                         |
|    partitions=1/1 files=2 size=16B               |
|    runtime filters: RF000 -> b.id                |
+--------------------------------------------------+
{code}

*Analysis*
The bug is a missing implicit cast in the EXCHANGE 05. The id should be cast to BIGINT to be consistent with the left input of the join.
We already have code to properly cast partition expressions in exchanges, but the code incorrectly assumes that we only need to do so for hash-partitioned fragments. The problem is that the UNION makes the fragment RANDOM partitioned (because the union children could be arbitrarily partitioned there is no guarantee on which partition is produced by the fragment).
The buggy code is in PlanFragment#finalizeExchanges():
{code}
public void finalizeExchanges(Analyzer analyzer)
      throws InternalException, NotImplementedException {
    if (destNode_ != null) {
      Preconditions.checkState(sink_ == null);
      // we're streaming to an exchange node
      DataStreamSink streamSink = new DataStreamSink(destNode_, outputPartition_);
      streamSink.setFragment(this);
      sink_ = streamSink;
    }

    if (!dataPartition_.isHashPartitioned()) return; <--- Problem here
...
   The following code adds casts to exchanges
{code}

*Workaround*
* Use the /* +broadcast */ and /* +straight_join */ hints to force the join to use a broadcast distribution strategy.
* Reformulate the query to avoid the join inside a union
* Write the join result into a separate table and use that table in the original query instead of 




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)