You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kudu.apache.org by "Grant Henke (Jira)" <ji...@apache.org> on 2020/06/05 23:25:00 UTC
[jira] [Resolved] (KUDU-2483) Scan tablets with bloom filter
[ https://issues.apache.org/jira/browse/KUDU-2483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Grant Henke resolved KUDU-2483.
-------------------------------
Fix Version/s: 1.12.0
Resolution: Fixed
> Scan tablets with bloom filter
> ------------------------------
>
> Key: KUDU-2483
> URL: https://issues.apache.org/jira/browse/KUDU-2483
> Project: Kudu
> Issue Type: New Feature
> Components: client
> Reporter: Jin Xing
> Assignee: Bankim Bhavsar
> Priority: Major
> Labels: roadmap-candidate
> Fix For: 1.12.0
>
> Attachments: BloomFilter+Design+Doc.pdf, KUDU-2483, image-2018-07-01-23-29-05-517.png
>
>
> Join is really common/popular in Spark SQL, in this JIRA I take broadcast join as an example and describe how Kudu's bloom filter can help accelerate distributed computing.
> Spark runs broadcast join with below steps:
> 1. When do broadcast join, we have a small table and a big table; Spark will read all data from small table to one worker and build a hash table;
> 2. The generated hash table from step 1 is broadcasted to all the workers, which will read the splits from big table;
> 3. Workers start fetching and iterating all the splits of big table and see if the joining keys exists in the hash table; Only matched joining keys is retained.
> From above, step 3 is the heaviest, especially when the worker and split storage is not on the same host and bandwith is limited. Actually the cost brought by step 3 is not always necessary. Think about below scenario:
> {code:none}
> Small table A
> id name
> 1 Jin
> 6 Xing
> Big table B
> id age
> 1 10
> 2 21
> 3 33
> 4 65
> 5 32
> 6 23
> 7 18
> 8 20
> 9 22
> {code}
> Run query with SQL: *select * from A inner join B on A.id=B.id*
> It's pretty straight that we don't need to fetch all the data from Table B, because the number of matched keys is really small;
> I propose to use small table to build a bloom filter(BF) and use the generated BF as a predicate/filter to fetch data from big table, thus:
> 1. Much traffic/bandwith is saved.
> 2. Less data to processe by worker
> Broadcast join is just an example, other types of join will also benefit if we scan with a BF
> In a nutshell, I think Kudu can provide an iterface, by which user can scan data with bloom filters
>
> Here I want add some statistics for Spark-Kudu integration with/without BloomBloomFilter.
> In our product environment the bandwidth of each executor is 50M bps.
> We do inner join with two tables – – one is large and another one is comparatively small.
> In Spark, inner join can be implemented as SortMergeJoin or BroadcastHashJoin, we implemented the corresponding operators with BloomFilter as SortMergeBloomFilterJoin and BroadcastBloomFilterJoin.
> The hash table of BloomFilter is configured as 32M.
> I show statistics as below:
> ||Records of Table A||Records of Table B||Join Operator||Executor Time||
> |400 thousand|14 billion|SortMergeJoin|760 seconds|
> |400 thousand|14 billion|BroadcastHashJoin|376s|
> |400 thousand|14 billion|BroadcastBloomFilterJoin|21s|
> |2 million|14 billion|SortMergeJoin|707s|
> |2 million|14 billion|BroadcastHashJoin|329s|
> |2 million|14 billion|SortMergeBloomFilterJoin|75s|
> |2 million|14 billion|BroadcastBloomFilterJoin|35s|
> As we can see, it benefit a lot from BloomFilter-PushDown.
> I want to take this jira as a umbrella and my workmates will submit following sub-task/pr.
> It will be great if some can take more look at this and share some comments.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)