You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kudu.apache.org by "Mauricio Aristizabal (Jira)" <ji...@apache.org> on 2019/10/21 01:24:00 UTC

[jira] [Commented] (KUDU-2483) Scan tablets with bloom filter

    [ https://issues.apache.org/jira/browse/KUDU-2483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16955679#comment-16955679 ] 

Mauricio Aristizabal commented on KUDU-2483:
--------------------------------------------

This just became a huge blocker for Kudu adoption for us, and I'm worried that this ticket hasn't had any movement in over a year.  Our use case is not even Spark, but just doing simple ETL via Impala:

 

We just wanted to move our aggregation/cube tables to Kudu, and have them be true aggregations: one record per combination of dimension columns gets updated as the measures increase (as opposed to additive aggregations in Parquet that we routinely re-aggregate/compact which is very resource intensive and hard to manage).

The problem is that to update a 1 billion record table with just 10K arriving changes, the min-max filter in the join between the target agg table and the table with the updates is pretty useless, as the updates will typically be for just a handful of the dimensions yes, but they are not nicely consecutive but all over the id space.  So it ends up scanning most of the big table and therefore it gets slower and slower as the table grows.

So we'll have to hold off on adopting Kudu until this (and support in Impala) is added, or until we switch ETL to programmatically mutate the records individually with the Java client (perhaps in a Spark RDD).

Please prioritize this, otherwise Kudu is good only for end-user queries with highly selective filters and joins, and doesn't really support ETL or large-scale analysis via SQL.

> Scan tablets with bloom filter
> ------------------------------
>
>                 Key: KUDU-2483
>                 URL: https://issues.apache.org/jira/browse/KUDU-2483
>             Project: Kudu
>          Issue Type: New Feature
>          Components: client
>            Reporter: jin xing
>            Assignee: Xu Yao
>            Priority: Major
>              Labels: roadmap-candidate
>         Attachments: BloomFilter+Design+Doc.pdf, KUDU-2483, image-2018-07-01-23-29-05-517.png
>
>
> Join is really common/popular in Spark SQL, in this JIRA I take broadcast join as an example and describe how Kudu's bloom filter can help accelerate distributed computing.
> Spark runs broadcast join with below steps:
>  1. When do broadcast join, we have a small table and a big table; Spark will read all data from small table to one worker and build a hash table;
>  2. The generated hash table from step 1 is broadcasted to all the workers, which will read the splits from big table;
>  3. Workers start fetching and iterating all the splits of big table and see if the joining keys exists in the hash table; Only matched joining keys is retained.
> From above, step 3 is the heaviest, especially when the worker and split storage is not on the same host and bandwith is limited. Actually the cost brought by step 3 is not always necessary. Think about below scenario:
> {code:none}
> Small table A
> id      name
> 1      Jin
> 6      Xing
> Big table B
> id     age
> 1      10
> 2      21
> 3      33
> 4      65
> 5      32
> 6      23
> 7      18
> 8      20
> 9      22
> {code}
> Run query with SQL: *select * from A inner join B on A.id=B.id*
> It's pretty straight that we don't need to fetch all the data from Table B, because the number of matched keys is really small;
> I propose to use small table to build a bloom filter(BF) and use the generated BF as a predicate/filter to fetch data from big table, thus:
>  1. Much traffic/bandwith is saved.
>  2. Less data to processe by worker
> Broadcast join is just an example, other types of join will also benefit if we scan with a BF
> In a nutshell, I think Kudu can provide an iterface, by which user can scan data with bloom filters
>  
> Here I want add some statistics for Spark-Kudu integration with/without BloomBloomFilter.
> In our product environment the bandwidth of each executor is 50M bps.
> We do inner join with two tables – – one is large and another one is comparatively small.
> In Spark, inner join can be implemented as SortMergeJoin or BroadcastHashJoin, we implemented the corresponding operators with BloomFilter as SortMergeBloomFilterJoin and BroadcastBloomFilterJoin.
> The hash table of BloomFilter is configured as 32M. 
> I show statistics as below:
> ||Records of Table A||Records of Table B||Join Operator||Executor Time||
> |400 thousand|14 billion|SortMergeJoin|760 seconds|
> |400 thousand|14 billion|BroadcastHashJoin|376s|
> |400 thousand|14 billion|BroadcastBloomFilterJoin|21s|
> |2 million|14 billion|SortMergeJoin|707s|
> |2 million|14 billion|BroadcastHashJoin|329s|
> |2 million|14 billion|SortMergeBloomFilterJoin|75s|
> |2 million|14 billion|BroadcastBloomFilterJoin|35s|
> As we can see, it benefit a lot from BloomFilter-PushDown. 
> I want to take this jira  as a umbrella and my workmates will submit following sub-task/pr.
> It will be great if some can take more look at this and share some comments. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)