You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Vladimir Sitnikov <si...@gmail.com> on 2020/01/03 18:19:32 UTC

[DISCUSS] Stream tables vs hash joins

Hi,

Stream tables do not play very well for hash joins.
In other words, if hash join would try to build a lookup table out of a
stream, it could just run out of memory.

Is there metadata or something like that to identify stream-like inputs so
hash join would ensure it does not
try to build a lookup table out of the stream?

The case is org.apache.calcite.test.StreamTest#testStreamToRelationJoin
which transforms to the following.
The plan is wrong because it would build hash lookup out of the second
input which happens to be (infinite?) (STREAM).

As a temporary workaround, I will increase the estimated rowcount for
orders table to 100'000, but it would be nice to make those decisions
metadata-driven.

EnumerableProject(ROWTIME=[$2], ORDERID=[$3], SUPPLIERID=[$1]): rowcount =
3000.0, cumulative cost = {6950.0 rows, 9650.0 cpu, 0.0 io}, id = 603
  EnumerableHashJoin(condition=[=($0, $6)], joinType=[inner]): rowcount =
3000.0, cumulative cost = {3950.0 rows, 650.0 cpu, 0.0 io}, id = 602
    EnumerableInterpreter: rowcount = 200.0, cumulative cost = {100.0 rows,
100.0 cpu, 0.0 io}, id = 599
      BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]]): rowcount =
200.0, cumulative cost = {2.0 rows, 2.0100000000000002 cpu, 0.0 io}, id =
122
    EnumerableProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3],
PRODUCT0=[CAST($2):VARCHAR(32) NOT NULL]): rowcount = 100.0, cumulative
cost = {150.0 rows, 550.0 cpu, 0.0 io}, id = 601
      EnumerableInterpreter: rowcount = 100.0, cumulative cost = {50.0
rows, 50.0 cpu, 0.0 io}, id = 600
        BindableTableScan(table=[[STREAM_JOINS, ORDERS, (STREAM)]]):
rowcount = 100.0, cumulative cost = {1.0 rows, 1.01 cpu, 0.0 io}, id = 182

Vladimir

Re: [DISCUSS] Stream tables vs hash joins

Posted by Stamatis Zampetakis <za...@gmail.com>.
Possibly we could also rely on RelOptCostFactory#makeInfiniteCost and
RelOptCost#isInfinite.

Best,
Stamatis

On Fri, Jan 3, 2020 at 8:04 PM Rui Wang <am...@apache.org> wrote:

> +1 on having a type of property on Relnode to make know which node is steam
> or non-stream.  In Apache Beam SQL's practice, stream joins are already
> metadata driven in which if there is one side stream and another side
> non-stream, we use hash-join like implementation but build that table on
> the non-stream side.
>
>
> Technically, it's feasible to build hash table on stream side even if it is
> infinite (but I guess the rational is this stream is very small and it
> could join on a TB level non-steam data). New stream data will update the
> hash table. In this case, the implementation have to update data
> accordingly based on the new arriving data, which turned out to be
> difficult to implement.
>
>
> -Rui
>
> On Fri, Jan 3, 2020 at 10:19 AM Vladimir Sitnikov <
> sitnikov.vladimir@gmail.com> wrote:
>
> > Hi,
> >
> > Stream tables do not play very well for hash joins.
> > In other words, if hash join would try to build a lookup table out of a
> > stream, it could just run out of memory.
> >
> > Is there metadata or something like that to identify stream-like inputs
> so
> > hash join would ensure it does not
> > try to build a lookup table out of the stream?
> >
> > The case is org.apache.calcite.test.StreamTest#testStreamToRelationJoin
> > which transforms to the following.
> > The plan is wrong because it would build hash lookup out of the second
> > input which happens to be (infinite?) (STREAM).
> >
> > As a temporary workaround, I will increase the estimated rowcount for
> > orders table to 100'000, but it would be nice to make those decisions
> > metadata-driven.
> >
> > EnumerableProject(ROWTIME=[$2], ORDERID=[$3], SUPPLIERID=[$1]): rowcount
> =
> > 3000.0, cumulative cost = {6950.0 rows, 9650.0 cpu, 0.0 io}, id = 603
> >   EnumerableHashJoin(condition=[=($0, $6)], joinType=[inner]): rowcount =
> > 3000.0, cumulative cost = {3950.0 rows, 650.0 cpu, 0.0 io}, id = 602
> >     EnumerableInterpreter: rowcount = 200.0, cumulative cost = {100.0
> rows,
> > 100.0 cpu, 0.0 io}, id = 599
> >       BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]]): rowcount =
> > 200.0, cumulative cost = {2.0 rows, 2.0100000000000002 cpu, 0.0 io}, id =
> > 122
> >     EnumerableProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3],
> > PRODUCT0=[CAST($2):VARCHAR(32) NOT NULL]): rowcount = 100.0, cumulative
> > cost = {150.0 rows, 550.0 cpu, 0.0 io}, id = 601
> >       EnumerableInterpreter: rowcount = 100.0, cumulative cost = {50.0
> > rows, 50.0 cpu, 0.0 io}, id = 600
> >         BindableTableScan(table=[[STREAM_JOINS, ORDERS, (STREAM)]]):
> > rowcount = 100.0, cumulative cost = {1.0 rows, 1.01 cpu, 0.0 io}, id =
> 182
> >
> > Vladimir
> >
>

Re: [DISCUSS] Stream tables vs hash joins

Posted by Rui Wang <am...@apache.org>.
+1 on having a type of property on Relnode to make know which node is steam
or non-stream.  In Apache Beam SQL's practice, stream joins are already
metadata driven in which if there is one side stream and another side
non-stream, we use hash-join like implementation but build that table on
the non-stream side.


Technically, it's feasible to build hash table on stream side even if it is
infinite (but I guess the rational is this stream is very small and it
could join on a TB level non-steam data). New stream data will update the
hash table. In this case, the implementation have to update data
accordingly based on the new arriving data, which turned out to be
difficult to implement.


-Rui

On Fri, Jan 3, 2020 at 10:19 AM Vladimir Sitnikov <
sitnikov.vladimir@gmail.com> wrote:

> Hi,
>
> Stream tables do not play very well for hash joins.
> In other words, if hash join would try to build a lookup table out of a
> stream, it could just run out of memory.
>
> Is there metadata or something like that to identify stream-like inputs so
> hash join would ensure it does not
> try to build a lookup table out of the stream?
>
> The case is org.apache.calcite.test.StreamTest#testStreamToRelationJoin
> which transforms to the following.
> The plan is wrong because it would build hash lookup out of the second
> input which happens to be (infinite?) (STREAM).
>
> As a temporary workaround, I will increase the estimated rowcount for
> orders table to 100'000, but it would be nice to make those decisions
> metadata-driven.
>
> EnumerableProject(ROWTIME=[$2], ORDERID=[$3], SUPPLIERID=[$1]): rowcount =
> 3000.0, cumulative cost = {6950.0 rows, 9650.0 cpu, 0.0 io}, id = 603
>   EnumerableHashJoin(condition=[=($0, $6)], joinType=[inner]): rowcount =
> 3000.0, cumulative cost = {3950.0 rows, 650.0 cpu, 0.0 io}, id = 602
>     EnumerableInterpreter: rowcount = 200.0, cumulative cost = {100.0 rows,
> 100.0 cpu, 0.0 io}, id = 599
>       BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]]): rowcount =
> 200.0, cumulative cost = {2.0 rows, 2.0100000000000002 cpu, 0.0 io}, id =
> 122
>     EnumerableProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3],
> PRODUCT0=[CAST($2):VARCHAR(32) NOT NULL]): rowcount = 100.0, cumulative
> cost = {150.0 rows, 550.0 cpu, 0.0 io}, id = 601
>       EnumerableInterpreter: rowcount = 100.0, cumulative cost = {50.0
> rows, 50.0 cpu, 0.0 io}, id = 600
>         BindableTableScan(table=[[STREAM_JOINS, ORDERS, (STREAM)]]):
> rowcount = 100.0, cumulative cost = {1.0 rows, 1.01 cpu, 0.0 io}, id = 182
>
> Vladimir
>