You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kudu.apache.org by "ASF subversion and git services (Jira)" <ji...@apache.org> on 2020/02/19 17:26:00 UTC

[jira] [Commented] (KUDU-2483) Scan tablets with bloom filter

    [ https://issues.apache.org/jira/browse/KUDU-2483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040260#comment-17040260 ] 

ASF subversion and git services commented on KUDU-2483:
-------------------------------------------------------

Commit 961888dd14e5bd306d3e5b741bc4443620617664 in kudu's branch refs/heads/master from Bankim Bhavsar
[ https://gitbox.apache.org/repos/asf?p=kudu.git;h=961888d ]

KUDU-2483 Integrate BlockBloomFilter with ColumnPredicate on server side

This change switches the implementation of the ColumnPredicate to use the
BlockBloomFilter for the BloomFilter predicate on the server side.

Earlier implementation was still experimental and didn't provide public client
APIs that actually use this BloomFilter predicate so taken the liberty to make
incompatible wire protocol changes.

Updated BlockBloomFilter to take hash_algorithm and hash_seed.
This make serializing and deserializing the BlockBloomFilter convenient and
removes the need of BloomFilterInner in ColumnPredicate.
Added overloaded Insert()/Find() functions to BlockBloomFilter that take Slice
parameter and hashes the key before insertion/lookup.

Most of the change involves refactoring the implementation including the
unit tests.

Currently only FAST_HASH algorithm is supported since 32-bit versions of
MURMUR2 and CITY_HASH are not currently implemented.

Change-Id: I7ecfd67e9c5fbe459c5b4aed91e0be2a194d433a
Reviewed-on: http://gerrit.cloudera.org:8080/15034
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: helifu <hz...@corp.netease.com>


> Scan tablets with bloom filter
> ------------------------------
>
>                 Key: KUDU-2483
>                 URL: https://issues.apache.org/jira/browse/KUDU-2483
>             Project: Kudu
>          Issue Type: New Feature
>          Components: client
>            Reporter: Jin Xing
>            Assignee: Bankim Bhavsar
>            Priority: Major
>              Labels: roadmap-candidate
>         Attachments: BloomFilter+Design+Doc.pdf, KUDU-2483, image-2018-07-01-23-29-05-517.png
>
>
> Join is really common/popular in Spark SQL, in this JIRA I take broadcast join as an example and describe how Kudu's bloom filter can help accelerate distributed computing.
> Spark runs broadcast join with below steps:
>  1. When do broadcast join, we have a small table and a big table; Spark will read all data from small table to one worker and build a hash table;
>  2. The generated hash table from step 1 is broadcasted to all the workers, which will read the splits from big table;
>  3. Workers start fetching and iterating all the splits of big table and see if the joining keys exists in the hash table; Only matched joining keys is retained.
> From above, step 3 is the heaviest, especially when the worker and split storage is not on the same host and bandwith is limited. Actually the cost brought by step 3 is not always necessary. Think about below scenario:
> {code:none}
> Small table A
> id      name
> 1      Jin
> 6      Xing
> Big table B
> id     age
> 1      10
> 2      21
> 3      33
> 4      65
> 5      32
> 6      23
> 7      18
> 8      20
> 9      22
> {code}
> Run query with SQL: *select * from A inner join B on A.id=B.id*
> It's pretty straight that we don't need to fetch all the data from Table B, because the number of matched keys is really small;
> I propose to use small table to build a bloom filter(BF) and use the generated BF as a predicate/filter to fetch data from big table, thus:
>  1. Much traffic/bandwith is saved.
>  2. Less data to processe by worker
> Broadcast join is just an example, other types of join will also benefit if we scan with a BF
> In a nutshell, I think Kudu can provide an iterface, by which user can scan data with bloom filters
>  
> Here I want add some statistics for Spark-Kudu integration with/without BloomBloomFilter.
> In our product environment the bandwidth of each executor is 50M bps.
> We do inner join with two tables – – one is large and another one is comparatively small.
> In Spark, inner join can be implemented as SortMergeJoin or BroadcastHashJoin, we implemented the corresponding operators with BloomFilter as SortMergeBloomFilterJoin and BroadcastBloomFilterJoin.
> The hash table of BloomFilter is configured as 32M. 
> I show statistics as below:
> ||Records of Table A||Records of Table B||Join Operator||Executor Time||
> |400 thousand|14 billion|SortMergeJoin|760 seconds|
> |400 thousand|14 billion|BroadcastHashJoin|376s|
> |400 thousand|14 billion|BroadcastBloomFilterJoin|21s|
> |2 million|14 billion|SortMergeJoin|707s|
> |2 million|14 billion|BroadcastHashJoin|329s|
> |2 million|14 billion|SortMergeBloomFilterJoin|75s|
> |2 million|14 billion|BroadcastBloomFilterJoin|35s|
> As we can see, it benefit a lot from BloomFilter-PushDown. 
> I want to take this jira  as a umbrella and my workmates will submit following sub-task/pr.
> It will be great if some can take more look at this and share some comments. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)