You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by weijie tong <to...@gmail.com> on 2017/09/19 15:45:36 UTC

Propose about join push down

All:
   This is a propose about join query tuning by pushing down the join
condition. Welcome suggestion ,discussion,objection .

   Suppose we have a join query "select t1.a,t1.s,t3.d (select a, sum(b) as
s from t1 where a='1' group by a ) t2 join t3 on t2.a = t3.a"  .  This
query will be transferred to a hashjoin or boradcast hashjoin (if metadata
is accurate ). But the t3's rows will all be pulled out from the storage.
If the t3 is a large table,the performance will be unacceptable.
If we can first get the 'a' result set of the inner query,then we pushed
down the result set to the right table t3's scan node. The right table's
scan will be quickly.

     possible solutions :
     1. A new physical operator or  broadcast join ,hash join enhancements
, which need to first query the left table's data, then push down the
filtered left join condition column set to the right table stream, once
confirmed the pushed down , works as normal join query logic.
     2. The pushed down join condition set maybe two possible formats bloom
filters bytes  or list of strings.
     3. RecordBatch needs to support to push down 2's data down stream.
     4. SubScan needs to hold the 2's data,and wait for next real call to
push down to the storage level query.
     5. Storage level should have an interface to indicate whether it
supports to solve the pushed down bloom filter or list of strings.

     Since this violates drill's data flow direction,it seems a lot of work
to do ,to change to implement this feature.

Re: Propose about join push down

Posted by weijie tong <to...@gmail.com>.
Hi Boaz:

Sorry for the wrong example,it should be
 "select t2.a,t2.s,t3.d (select a, sum(b) as s from t1 where c='1' group by
a ) t2 join t3 on t2.a = t3.a" which would make sense.

The prerequisite for pushing down join is the storage plugin support filter
push down. The storage plugin should add a interface to indicate it
supports join push down. The corresponding rule will care about this.

I think this strategy also applies to hashjoin. The build side table's join
keys construct the bloom filter firstly. Then it pushs down the bloom
filter down (next call with data parameters).All other things left are the
same process logic as the current hashjoin implementation.


On Wed, 20 Sep 2017 at 5:25 AM Boaz Ben-Zvi <bb...@mapr.com> wrote:

> Hi Weijie,
>
>     Are there some typos in the sample query ?  Looks like the projection
> should be t2.a,t2.s,t3.d (i.e., t2 instead of t1). Also the predicate “
> where a='1' ” makes the inner query return only a single row, which is
> pretty trivial.
>
>     Assuming these changes are made, then there could be many t2 “a”
> values to be equi-joined to t3’s “a” values.
>
> With Bloom filters, the rows from t3 would only be “mostly filtered”;
> there still needs to be a join above to produce the final result.
>
> If wanting to push the “whole join” down, then _either_ need to have some
> index mechanism on “t3.a” – which would work as a nested loop join (NLJ),
> _or_ need to perform another type of join down below (with all related
> issues, like memory control, spill etc).  For the NLJ, indeed the current
> Drill does not support “down flow” of data (and most storage does not have
> indexes), and it’ll take some work to implement (e.g., all operators would
> need to accept a next() call with some “data” parameter).
>
>          Boaz
> --------------------------------
>
> On 9/19/17, 8:45 AM, "weijie tong" <to...@gmail.com> wrote:
>
>     All:
>        This is a propose about join query tuning by pushing down the join
>     condition. Welcome suggestion ,discussion,objection .
>
>        Suppose we have a join query "select t1.a,t1.s,t3.d (select a,
> sum(b) as
>     s from t1 where a='1' group by a ) t2 join t3 on t2.a = t3.a"  .  This
>     query will be transferred to a hashjoin or boradcast hashjoin (if
> metadata
>     is accurate ). But the t3's rows will all be pulled out from the
> storage.
>     If the t3 is a large table,the performance will be unacceptable.
>     If we can first get the 'a' result set of the inner query,then we
> pushed
>     down the result set to the right table t3's scan node. The right
> table's
>     scan will be quickly.
>
>          possible solutions :
>          1. A new physical operator or  broadcast join ,hash join
> enhancements
>     , which need to first query the left table's data, then push down the
>     filtered left join condition column set to the right table stream, once
>     confirmed the pushed down , works as normal join query logic.
>          2. The pushed down join condition set maybe two possible formats
> bloom
>     filters bytes  or list of strings.
>          3. RecordBatch needs to support to push down 2's data down stream.
>          4. SubScan needs to hold the 2's data,and wait for next real call
> to
>     push down to the storage level query.
>          5. Storage level should have an interface to indicate whether it
>     supports to solve the pushed down bloom filter or list of strings.
>
>          Since this violates drill's data flow direction,it seems a lot of
> work
>     to do ,to change to implement this feature.
>
>
>

Re: Propose about join push down

Posted by weijie tong <to...@gmail.com>.
Hi Boaz:

   Sorry for the wrong example. "select t2.a,t2.s,t3.d (select a, sum(b) as
    s from t1 where c='1' group by a ) t2 join t3 on t2.a = t3.a"  this sql
would make sense.

The prerequisite for join push down is the storage plugin supports filter
push down. The corresponding rule should learn about this message to decide
whether to do the join push down (storage plugin like elastic search will
benefit from this).

I think there's little change to current hashjoin process logic except the
data pushing down work. 1st the build side table constructs the bloom
filter. 2. The hashjoin batch pushes down the bloom filter. 3 The things
left behaves the same as current implementation to do the join work between
filtered probing data and the build side ones.

One thing explicitly is to implement next call with data parameters . I
will think about this.

On Wed, 20 Sep 2017 at 5:25 AM Boaz Ben-Zvi <bb...@mapr.com> wrote:

> Hi Weijie,
>
>     Are there some typos in the sample query ?  Looks like the projection
> should be t2.a,t2.s,t3.d (i.e., t2 instead of t1). Also the predicate “
> where a='1' ” makes the inner query return only a single row, which is
> pretty trivial.
>
>     Assuming these changes are made, then there could be many t2 “a”
> values to be equi-joined to t3’s “a” values.
>
> With Bloom filters, the rows from t3 would only be “mostly filtered”;
> there still needs to be a join above to produce the final result.
>
> If wanting to push the “whole join” down, then _either_ need to have some
> index mechanism on “t3.a” – which would work as a nested loop join (NLJ),
> _or_ need to perform another type of join down below (with all related
> issues, like memory control, spill etc).  For the NLJ, indeed the current
> Drill does not support “down flow” of data (and most storage does not have
> indexes), and it’ll take some work to implement (e.g., all operators would
> need to accept a next() call with some “data” parameter).
>
>          Boaz
> --------------------------------
>
> On 9/19/17, 8:45 AM, "weijie tong" <to...@gmail.com> wrote:
>
>     All:
>        This is a propose about join query tuning by pushing down the join
>     condition. Welcome suggestion ,discussion,objection .
>
>        Suppose we have a join query "select t1.a,t1.s,t3.d (select a,
> sum(b) as
>     s from t1 where a='1' group by a ) t2 join t3 on t2.a = t3.a"  .  This
>     query will be transferred to a hashjoin or boradcast hashjoin (if
> metadata
>     is accurate ). But the t3's rows will all be pulled out from the
> storage.
>     If the t3 is a large table,the performance will be unacceptable.
>     If we can first get the 'a' result set of the inner query,then we
> pushed
>     down the result set to the right table t3's scan node. The right
> table's
>     scan will be quickly.
>
>          possible solutions :
>          1. A new physical operator or  broadcast join ,hash join
> enhancements
>     , which need to first query the left table's data, then push down the
>     filtered left join condition column set to the right table stream, once
>     confirmed the pushed down , works as normal join query logic.
>          2. The pushed down join condition set maybe two possible formats
> bloom
>     filters bytes  or list of strings.
>          3. RecordBatch needs to support to push down 2's data down stream.
>          4. SubScan needs to hold the 2's data,and wait for next real call
> to
>     push down to the storage level query.
>          5. Storage level should have an interface to indicate whether it
>     supports to solve the pushed down bloom filter or list of strings.
>
>          Since this violates drill's data flow direction,it seems a lot of
> work
>     to do ,to change to implement this feature.
>
>
>

Re: Propose about join push down

Posted by Boaz Ben-Zvi <bb...@mapr.com>.
Hi Weijie,

    Are there some typos in the sample query ?  Looks like the projection should be t2.a,t2.s,t3.d (i.e., t2 instead of t1). Also the predicate “ where a='1' ” makes the inner query return only a single row, which is pretty trivial.

    Assuming these changes are made, then there could be many t2 “a” values to be equi-joined to t3’s “a” values. 

With Bloom filters, the rows from t3 would only be “mostly filtered”; there still needs to be a join above to produce the final result.

If wanting to push the “whole join” down, then _either_ need to have some index mechanism on “t3.a” – which would work as a nested loop join (NLJ), _or_ need to perform another type of join down below (with all related issues, like memory control, spill etc).  For the NLJ, indeed the current Drill does not support “down flow” of data (and most storage does not have indexes), and it’ll take some work to implement (e.g., all operators would need to accept a next() call with some “data” parameter).

         Boaz 
--------------------------------

On 9/19/17, 8:45 AM, "weijie tong" <to...@gmail.com> wrote:

    All:
       This is a propose about join query tuning by pushing down the join
    condition. Welcome suggestion ,discussion,objection .
    
       Suppose we have a join query "select t1.a,t1.s,t3.d (select a, sum(b) as
    s from t1 where a='1' group by a ) t2 join t3 on t2.a = t3.a"  .  This
    query will be transferred to a hashjoin or boradcast hashjoin (if metadata
    is accurate ). But the t3's rows will all be pulled out from the storage.
    If the t3 is a large table,the performance will be unacceptable.
    If we can first get the 'a' result set of the inner query,then we pushed
    down the result set to the right table t3's scan node. The right table's
    scan will be quickly.
    
         possible solutions :
         1. A new physical operator or  broadcast join ,hash join enhancements
    , which need to first query the left table's data, then push down the
    filtered left join condition column set to the right table stream, once
    confirmed the pushed down , works as normal join query logic.
         2. The pushed down join condition set maybe two possible formats bloom
    filters bytes  or list of strings.
         3. RecordBatch needs to support to push down 2's data down stream.
         4. SubScan needs to hold the 2's data,and wait for next real call to
    push down to the storage level query.
         5. Storage level should have an interface to indicate whether it
    supports to solve the pushed down bloom filter or list of strings.
    
         Since this violates drill's data flow direction,it seems a lot of work
    to do ,to change to implement this feature.