You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by faaron zheng <fa...@gmail.com> on 2020/03/02 06:55:34 UTC

Question about runtime filter

Hi, everyone

These days, I am trying to implement runtime filter in flink1.10 with
flink-sql-benchmark  according to blink. I mainly change three part of
flink code: add runtime filter rule; modify the code gen and bloomfilter;
add some aggregatedaccumulator  methods according to accumulator. Now, It
seems runtime filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
i_manufact, i_size, i_formulation, i_color, i_units, i_container,
i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
-> Calc(select=[i_item_sk], where=[((i_category =
_UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
d_same_day_lq, d_current_day, d_current_week, d_current_month,
d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])


However,the number of records sent is the same as normal.  Anyone who can
give me some advices?



Thanks

Re: Question about runtime filter

Posted by Jingsong Li <ji...@gmail.com>.
Great exploration. And thanks for your information.
I believe you have a deep understanding of Flink's internal mechanism.

Best,
Jingsong Lee

On Thu, Mar 5, 2020 at 12:09 PM faaron zheng <fa...@gmail.com> wrote:

> I finally got through the runtimefilter in 1.10, the reason why it didn't
> call commit method is in OperatorCodeGenerator. It should call endInput()
> method correctly in generateOneInputStreamOperator. A complete process of
> runtimefilter is: 1.Add insert and remove rule in batch rules. 2.Build side
> constructs bloomfilter and commit. 3. Jobmanager merge bloomfilter as a
> global one. 4. Probe side get global bloomfilter and filter data. Although
> runtimefilter is already achieved in blink, it doesn't have a independent
> commit. So it's a little hard to merge whole code once. I hope it helps if
> anyone try to do same thing.
>
> faaron zheng <fa...@gmail.com> 于 2020年3月2日周一 下午7:52写道:
>
>> I set sql.exec.runtime-filter.wait to true. HiveTableSource take much
>> longer time but get same result. I think the reason is not commit
>> preAggregateAccumulator. But I dont know why it happens?
>>
>> JingsongLee <lz...@aliyun.com> 于 2020年3月2日周一 下午3:22写道:
>>
>>> Hi,
>>>
>>> Does runtime filter probe side wait for building runtime filter?
>>> Can you check the start time of build side and probe side?
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> ------------------------------------------------------------------
>>> From:faaron zheng <fa...@gmail.com>
>>> Send Time:2020年3月2日(星期一) 14:55
>>> To:user <us...@flink.apache.org>
>>> Subject:Question about runtime filter
>>>
>>> Hi, everyone
>>>
>>> These days, I am trying to implement runtime filter in flink1.10 with
>>> flink-sql-benchmark  according to blink. I mainly change three part of
>>> flink code: add runtime filter rule; modify the code gen and bloomfilter;
>>> add some aggregatedaccumulator  methods according to accumulator. Now, It
>>> seems runtime filter works in execution graph as follows:
>>> Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
>>> i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
>>> i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
>>> i_manufact, i_size, i_formulation, i_color, i_units, i_container,
>>> i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
>>> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
>>> -> Calc(select=[i_item_sk], where=[((i_category =
>>> _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
>>> (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
>>> "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])
>>>
>>> and
>>>
>>> Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
>>> d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
>>> d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
>>> d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
>>> d_same_day_lq, d_current_day, d_current_week, d_current_month,
>>> d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
>>> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
>>> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])
>>>
>>>
>>> However,the number of records sent is the same as normal.  Anyone who
>>> can give me some advices?
>>>
>>>
>>>
>>> Thanks
>>>
>>>

-- 
Best, Jingsong Lee

Re: Question about runtime filter

Posted by faaron zheng <fa...@gmail.com>.
I finally got through the runtimefilter in 1.10, the reason why it didn't
call commit method is in OperatorCodeGenerator. It should call endInput()
method correctly in generateOneInputStreamOperator. A complete process of
runtimefilter is: 1.Add insert and remove rule in batch rules. 2.Build side
constructs bloomfilter and commit. 3. Jobmanager merge bloomfilter as a
global one. 4. Probe side get global bloomfilter and filter data. Although
runtimefilter is already achieved in blink, it doesn't have a independent
commit. So it's a little hard to merge whole code once. I hope it helps if
anyone try to do same thing.

faaron zheng <fa...@gmail.com> 于 2020年3月2日周一 下午7:52写道:

> I set sql.exec.runtime-filter.wait to true. HiveTableSource take much
> longer time but get same result. I think the reason is not commit
> preAggregateAccumulator. But I dont know why it happens?
>
> JingsongLee <lz...@aliyun.com> 于 2020年3月2日周一 下午3:22写道:
>
>> Hi,
>>
>> Does runtime filter probe side wait for building runtime filter?
>> Can you check the start time of build side and probe side?
>>
>> Best,
>> Jingsong Lee
>>
>> ------------------------------------------------------------------
>> From:faaron zheng <fa...@gmail.com>
>> Send Time:2020年3月2日(星期一) 14:55
>> To:user <us...@flink.apache.org>
>> Subject:Question about runtime filter
>>
>> Hi, everyone
>>
>> These days, I am trying to implement runtime filter in flink1.10 with
>> flink-sql-benchmark  according to blink. I mainly change three part of
>> flink code: add runtime filter rule; modify the code gen and bloomfilter;
>> add some aggregatedaccumulator  methods according to accumulator. Now, It
>> seems runtime filter works in execution graph as follows:
>> Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
>> i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
>> i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
>> i_manufact, i_size, i_formulation, i_color, i_units, i_container,
>> i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
>> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
>> -> Calc(select=[i_item_sk], where=[((i_category =
>> _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
>> (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])
>>
>> and
>>
>> Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
>> d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
>> d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
>> d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
>> d_same_day_lq, d_current_day, d_current_week, d_current_month,
>> d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
>> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
>> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])
>>
>>
>> However,the number of records sent is the same as normal.  Anyone who can
>> give me some advices?
>>
>>
>>
>> Thanks
>>
>>

Re: Question about runtime filter

Posted by faaron zheng <fa...@gmail.com>.
I set sql.exec.runtime-filter.wait to true. HiveTableSource take much
longer time but get same result. I think the reason is not commit
preAggregateAccumulator. But I dont know why it happens?

JingsongLee <lz...@aliyun.com> 于 2020年3月2日周一 下午3:22写道:

> Hi,
>
> Does runtime filter probe side wait for building runtime filter?
> Can you check the start time of build side and probe side?
>
> Best,
> Jingsong Lee
>
> ------------------------------------------------------------------
> From:faaron zheng <fa...@gmail.com>
> Send Time:2020年3月2日(星期一) 14:55
> To:user <us...@flink.apache.org>
> Subject:Question about runtime filter
>
> Hi, everyone
>
> These days, I am trying to implement runtime filter in flink1.10 with
> flink-sql-benchmark  according to blink. I mainly change three part of
> flink code: add runtime filter rule; modify the code gen and bloomfilter;
> add some aggregatedaccumulator  methods according to accumulator. Now, It
> seems runtime filter works in execution graph as follows:
> Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
> i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
> i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
> i_manufact, i_size, i_formulation, i_color, i_units, i_container,
> i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
> -> Calc(select=[i_item_sk], where=[((i_category =
> _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
> (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])
>
> and
>
> Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
> d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
> d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
> d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
> d_same_day_lq, d_current_day, d_current_week, d_current_month,
> d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])
>
>
> However,the number of records sent is the same as normal.  Anyone who can
> give me some advices?
>
>
>
> Thanks
>
>

Re: Question about runtime filter

Posted by faaron zheng <fa...@gmail.com>.
Thanks for replying Lee,  I follow your method to debug the code and I find
the build side only call addPreAggregatedAccumulator but not call commit
method. Furthermore, I add a breakpoint at future.handleAsync in
asyncGetBroadcastBloomFilter method. But when program stop at if(e==null &&
accumulator != null), it finish with result immediately. Any suggestion?

JingsongLee <lz...@aliyun.com> 于 2020年3月2日周一 下午3:22写道:

> Hi,
>
> Does runtime filter probe side wait for building runtime filter?
> Can you check the start time of build side and probe side?
>
> Best,
> Jingsong Lee
>
> ------------------------------------------------------------------
> From:faaron zheng <fa...@gmail.com>
> Send Time:2020年3月2日(星期一) 14:55
> To:user <us...@flink.apache.org>
> Subject:Question about runtime filter
>
> Hi, everyone
>
> These days, I am trying to implement runtime filter in flink1.10 with
> flink-sql-benchmark  according to blink. I mainly change three part of
> flink code: add runtime filter rule; modify the code gen and bloomfilter;
> add some aggregatedaccumulator  methods according to accumulator. Now, It
> seems runtime filter works in execution graph as follows:
> Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date,
> i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id,
> i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id,
> i_manufact, i_size, i_formulation, i_color, i_units, i_container,
> i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item,
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12]
> -> Calc(select=[i_item_sk], where=[((i_category =
> _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
> (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])
>
> and
>
> Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq,
> d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year,
> d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday,
> d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly,
> d_same_day_lq, d_current_day, d_current_week, d_current_month,
> d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim,
> PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] ->
> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])
>
>
> However,the number of records sent is the same as normal.  Anyone who can
> give me some advices?
>
>
>
> Thanks
>
>

Re: Question about runtime filter

Posted by JingsongLee <lz...@aliyun.com>.
Hi,

Does runtime filter probe side wait for building runtime filter?
Can you check the start time of build side and probe side? 

Best,
Jingsong Lee


------------------------------------------------------------------
From:faaron zheng <fa...@gmail.com>
Send Time:2020年3月2日(星期一) 14:55
To:user <us...@flink.apache.org>
Subject:Question about runtime filter

Hi, everyone    

These days, I am trying to implement runtime filter in flink1.10 with flink-sql-benchmark  according to blink. I mainly change three part of flink code: add runtime filter rule; modify the code gen and bloomfilter; add some aggregatedaccumulator  methods according to accumulator. Now, It seems runtime filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, i_size, i_formulation, i_color, i_units, i_container, i_manager_id, i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])      

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, d_current_day, d_current_week, d_current_month, d_current_quarter, d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])  


However,the number of records sent is the same as normal.  Anyone who can give me some advices?



Thanks