You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chengzhi Zhao <w....@gmail.com> on 2018/05/11 16:48:30 UTC

Better way to clean up state when connect

Hi there,

I have a use case to check for active ID, there are two streams and I
connect them: one has actual data (Stream A) and the other one is for
lookup purpose (Stream B), I am getting Stream B as a file which includes
all active ID, so inactive ID would not be show up on this list. I tried to
use watermark to clean up the state of inactivate ID, but the Stream B
updates is unpredictable so I want to keep everything in state until I
found the item is not in that file any more.

Please suggest what is the best way to implement it in flink. Thanks in
advance for your help.

Regards,
Chengzhi

Re: Better way to clean up state when connect

Posted by Chengzhi Zhao <w....@gmail.com>.
Thanks again Xingcan! Appreciate your help!

On Tue, May 15, 2018, 9:31 PM Xingcan Cui <xi...@gmail.com> wrote:

> Hi Chengzhi,
>
> more details about partitioning mechanisms can be found at
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning
> .
>
> Best,
> Xingcan
>
> On May 16, 2018, at 3:25 AM, Chengzhi Zhao <w....@gmail.com>
> wrote:
>
> Hi Xingcan,
>
> Thanks a lot for providing your inputs on the possible solutions here. Can
> you please clarify on how to broadcasted in Flink?
> Appreciate your help!!
>
> Best,
> Chengzhi
>
> On Tue, May 15, 2018 at 10:22 AM, Xingcan Cui <xi...@gmail.com> wrote:
>
>> Hi Chengzhi,
>>
>> currently, it's impossible to process both a stream and a (dynamically
>> updated) dataset in a single job. I'll provide you with some workarounds,
>> all of which are based on that the file for active test names is not so
>> large.
>>
>> (1) You may define your own stream source[1] which should be aware of the
>> file update, and keep the input file as a stream (the Stream B as you
>> described). Some special records can be inserted to indicate the start and
>> end of an update. Note that instead of using the `keyBy()` method, the
>> Stream B should be broadcasted, while the Stream A can be partitioned
>> arbitrarily. With this method, you can clean and rebuild the states
>> according to the start/end indicators.
>>
>> (2) You may also take the file of active test names as external states
>> and set processing time timers[2] to update them regularly (e.g., with 1
>> min interval) in a ProcessFunction[3].
>>
>> IMO, the watermark may not work as expected for your use case. Besides,
>> since the file will be updated unpredictably, it's hard to guarantee the
>> precision of results.
>>
>> Hope that helps,
>> Xingcan
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#data-sources
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#the-processfunction
>>
>> On May 14, 2018, at 10:05 PM, Chengzhi Zhao <w....@gmail.com>
>> wrote:
>>
>> Hi Xingcan,
>>
>> Thanks for your response, to give your more background about my use case,
>> I have Stream B with some split test name, and Stream A will be the actual
>> test. I want to have Stream A connect to Stream B to figure out whether
>> this test is still active or not. I am not sure this is the right way to
>> do: My watermark is based on event time for 15 mins, OnTimer will be emit
>> that records after 15 mins. I was wondering if there is way to purge the
>> state of entire Stream B so I can get all the active test, since the file
>> will include all the updated split testing name so I can refresh the lookup.
>>
>> Also, I am not sure if I am using the right operator here, or if there is
>> a way to share variable globally so I can just perform filter on stream A.
>> Please let me know your thoughts and thanks for you suggestions again.
>>
>> Regards,
>> Chengzhi
>>
>> On Sat, May 12, 2018 at 8:55 PM, Xingcan Cui <xi...@gmail.com> wrote:
>>
>>> Hi Chengzhi,
>>>
>>> you said the Stream B which comes from a file will be updated
>>> unpredictably. I wonder if you could share more about how to judge an item
>>> (from Stream A I suppose) is not in the file and what watermark generation
>>> strategy did you choose?
>>>
>>> Best,
>>> Xingcan
>>>
>>> > On May 12, 2018, at 12:48 AM, Chengzhi Zhao <w....@gmail.com>
>>> wrote:
>>> >
>>> > Hi there,
>>> >
>>> > I have a use case to check for active ID, there are two streams and I
>>> connect them: one has actual data (Stream A) and the other one is for
>>> lookup purpose (Stream B), I am getting Stream B as a file which includes
>>> all active ID, so inactive ID would not be show up on this list. I tried to
>>> use watermark to clean up the state of inactivate ID, but the Stream B
>>> updates is unpredictable so I want to keep everything in state until I
>>> found the item is not in that file any more.
>>> >
>>> > Please suggest what is the best way to implement it in flink. Thanks
>>> in advance for your help.
>>> >
>>> > Regards,
>>> > Chengzhi
>>> >
>>> >
>>>
>>>
>>
>>
>
>

Re: Better way to clean up state when connect

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Chengzhi,

more details about partitioning mechanisms can be found at https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning>.

Best,
Xingcan

> On May 16, 2018, at 3:25 AM, Chengzhi Zhao <w....@gmail.com> wrote:
> 
> Hi Xingcan,
> 
> Thanks a lot for providing your inputs on the possible solutions here. Can you please clarify on how to broadcasted in Flink? 
> Appreciate your help!!
> 
> Best,
> Chengzhi
> 
> On Tue, May 15, 2018 at 10:22 AM, Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>> wrote:
> Hi Chengzhi,
> 
> currently, it's impossible to process both a stream and a (dynamically updated) dataset in a single job. I'll provide you with some workarounds, all of which are based on that the file for active test names is not so large.
> 
> (1) You may define your own stream source[1] which should be aware of the file update, and keep the input file as a stream (the Stream B as you described). Some special records can be inserted to indicate the start and end of an update. Note that instead of using the `keyBy()` method, the Stream B should be broadcasted, while the Stream A can be partitioned arbitrarily. With this method, you can clean and rebuild the states according to the start/end indicators.
> 
> (2) You may also take the file of active test names as external states and set processing time timers[2] to update them regularly (e.g., with 1 min interval) in a ProcessFunction[3].
> 
> IMO, the watermark may not work as expected for your use case. Besides, since the file will be updated unpredictably, it's hard to guarantee the precision of results.
> 
> Hope that helps,
> Xingcan
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#data-sources <https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#data-sources>
> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers>
> [3] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#the-processfunction <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#the-processfunction>
> 
>> On May 14, 2018, at 10:05 PM, Chengzhi Zhao <w.zhaochengzhi@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Xingcan,
>> 
>> Thanks for your response, to give your more background about my use case, I have Stream B with some split test name, and Stream A will be the actual test. I want to have Stream A connect to Stream B to figure out whether this test is still active or not. I am not sure this is the right way to do: My watermark is based on event time for 15 mins, OnTimer will be emit that records after 15 mins. I was wondering if there is way to purge the state of entire Stream B so I can get all the active test, since the file will include all the updated split testing name so I can refresh the lookup.
>> 
>> Also, I am not sure if I am using the right operator here, or if there is a way to share variable globally so I can just perform filter on stream A.
>> Please let me know your thoughts and thanks for you suggestions again.
>> 
>> Regards,
>> Chengzhi
>> 
>> On Sat, May 12, 2018 at 8:55 PM, Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>> wrote:
>> Hi Chengzhi,
>> 
>> you said the Stream B which comes from a file will be updated unpredictably. I wonder if you could share more about how to judge an item (from Stream A I suppose) is not in the file and what watermark generation strategy did you choose?
>> 
>> Best,
>> Xingcan
>> 
>> > On May 12, 2018, at 12:48 AM, Chengzhi Zhao <w.zhaochengzhi@gmail.com <ma...@gmail.com>> wrote:
>> > 
>> > Hi there,
>> > 
>> > I have a use case to check for active ID, there are two streams and I connect them: one has actual data (Stream A) and the other one is for lookup purpose (Stream B), I am getting Stream B as a file which includes all active ID, so inactive ID would not be show up on this list. I tried to use watermark to clean up the state of inactivate ID, but the Stream B updates is unpredictable so I want to keep everything in state until I found the item is not in that file any more. 
>> > 
>> > Please suggest what is the best way to implement it in flink. Thanks in advance for your help.
>> > 
>> > Regards,
>> > Chengzhi
>> > 
>> > 
>> 
>> 
> 
> 


Re: Better way to clean up state when connect

Posted by Chengzhi Zhao <w....@gmail.com>.
Hi Xingcan,

Thanks a lot for providing your inputs on the possible solutions here. Can
you please clarify on how to broadcasted in Flink?
Appreciate your help!!

Best,
Chengzhi

On Tue, May 15, 2018 at 10:22 AM, Xingcan Cui <xi...@gmail.com> wrote:

> Hi Chengzhi,
>
> currently, it's impossible to process both a stream and a (dynamically
> updated) dataset in a single job. I'll provide you with some workarounds,
> all of which are based on that the file for active test names is not so
> large.
>
> (1) You may define your own stream source[1] which should be aware of the
> file update, and keep the input file as a stream (the Stream B as you
> described). Some special records can be inserted to indicate the start and
> end of an update. Note that instead of using the `keyBy()` method, the
> Stream B should be broadcasted, while the Stream A can be partitioned
> arbitrarily. With this method, you can clean and rebuild the states
> according to the start/end indicators.
>
> (2) You may also take the file of active test names as external states and
> set processing time timers[2] to update them regularly (e.g., with 1 min
> interval) in a ProcessFunction[3].
>
> IMO, the watermark may not work as expected for your use case. Besides,
> since the file will be updated unpredictably, it's hard to guarantee the
> precision of results.
>
> Hope that helps,
> Xingcan
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/datastream_api.html#data-sources
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/stream/operators/process_function.html#timers
> [3] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/stream/operators/process_function.html#the-processfunction
>
> On May 14, 2018, at 10:05 PM, Chengzhi Zhao <w....@gmail.com>
> wrote:
>
> Hi Xingcan,
>
> Thanks for your response, to give your more background about my use case,
> I have Stream B with some split test name, and Stream A will be the actual
> test. I want to have Stream A connect to Stream B to figure out whether
> this test is still active or not. I am not sure this is the right way to
> do: My watermark is based on event time for 15 mins, OnTimer will be emit
> that records after 15 mins. I was wondering if there is way to purge the
> state of entire Stream B so I can get all the active test, since the file
> will include all the updated split testing name so I can refresh the lookup.
>
> Also, I am not sure if I am using the right operator here, or if there is
> a way to share variable globally so I can just perform filter on stream A.
> Please let me know your thoughts and thanks for you suggestions again.
>
> Regards,
> Chengzhi
>
> On Sat, May 12, 2018 at 8:55 PM, Xingcan Cui <xi...@gmail.com> wrote:
>
>> Hi Chengzhi,
>>
>> you said the Stream B which comes from a file will be updated
>> unpredictably. I wonder if you could share more about how to judge an item
>> (from Stream A I suppose) is not in the file and what watermark generation
>> strategy did you choose?
>>
>> Best,
>> Xingcan
>>
>> > On May 12, 2018, at 12:48 AM, Chengzhi Zhao <w....@gmail.com>
>> wrote:
>> >
>> > Hi there,
>> >
>> > I have a use case to check for active ID, there are two streams and I
>> connect them: one has actual data (Stream A) and the other one is for
>> lookup purpose (Stream B), I am getting Stream B as a file which includes
>> all active ID, so inactive ID would not be show up on this list. I tried to
>> use watermark to clean up the state of inactivate ID, but the Stream B
>> updates is unpredictable so I want to keep everything in state until I
>> found the item is not in that file any more.
>> >
>> > Please suggest what is the best way to implement it in flink. Thanks in
>> advance for your help.
>> >
>> > Regards,
>> > Chengzhi
>> >
>> >
>>
>>
>
>

Re: Better way to clean up state when connect

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Chengzhi,

currently, it's impossible to process both a stream and a (dynamically updated) dataset in a single job. I'll provide you with some workarounds, all of which are based on that the file for active test names is not so large.

(1) You may define your own stream source[1] which should be aware of the file update, and keep the input file as a stream (the Stream B as you described). Some special records can be inserted to indicate the start and end of an update. Note that instead of using the `keyBy()` method, the Stream B should be broadcasted, while the Stream A can be partitioned arbitrarily. With this method, you can clean and rebuild the states according to the start/end indicators.

(2) You may also take the file of active test names as external states and set processing time timers[2] to update them regularly (e.g., with 1 min interval) in a ProcessFunction[3].

IMO, the watermark may not work as expected for your use case. Besides, since the file will be updated unpredictably, it's hard to guarantee the precision of results.

Hope that helps,
Xingcan

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#data-sources
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
[3] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#the-processfunction

> On May 14, 2018, at 10:05 PM, Chengzhi Zhao <w....@gmail.com> wrote:
> 
> Hi Xingcan,
> 
> Thanks for your response, to give your more background about my use case, I have Stream B with some split test name, and Stream A will be the actual test. I want to have Stream A connect to Stream B to figure out whether this test is still active or not. I am not sure this is the right way to do: My watermark is based on event time for 15 mins, OnTimer will be emit that records after 15 mins. I was wondering if there is way to purge the state of entire Stream B so I can get all the active test, since the file will include all the updated split testing name so I can refresh the lookup.
> 
> Also, I am not sure if I am using the right operator here, or if there is a way to share variable globally so I can just perform filter on stream A.
> Please let me know your thoughts and thanks for you suggestions again.
> 
> Regards,
> Chengzhi
> 
> On Sat, May 12, 2018 at 8:55 PM, Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>> wrote:
> Hi Chengzhi,
> 
> you said the Stream B which comes from a file will be updated unpredictably. I wonder if you could share more about how to judge an item (from Stream A I suppose) is not in the file and what watermark generation strategy did you choose?
> 
> Best,
> Xingcan
> 
> > On May 12, 2018, at 12:48 AM, Chengzhi Zhao <w.zhaochengzhi@gmail.com <ma...@gmail.com>> wrote:
> > 
> > Hi there,
> > 
> > I have a use case to check for active ID, there are two streams and I connect them: one has actual data (Stream A) and the other one is for lookup purpose (Stream B), I am getting Stream B as a file which includes all active ID, so inactive ID would not be show up on this list. I tried to use watermark to clean up the state of inactivate ID, but the Stream B updates is unpredictable so I want to keep everything in state until I found the item is not in that file any more. 
> > 
> > Please suggest what is the best way to implement it in flink. Thanks in advance for your help.
> > 
> > Regards,
> > Chengzhi
> > 
> > 
> 
> 


Re: Better way to clean up state when connect

Posted by Chengzhi Zhao <w....@gmail.com>.
Hi Xingcan,

Thanks for your response, to give your more background about my use case, I
have Stream B with some split test name, and Stream A will be the actual
test. I want to have Stream A connect to Stream B to figure out whether
this test is still active or not. I am not sure this is the right way to
do: My watermark is based on event time for 15 mins, OnTimer will be emit
that records after 15 mins. I was wondering if there is way to purge the
state of entire Stream B so I can get all the active test, since the file
will include all the updated split testing name so I can refresh the lookup.

Also, I am not sure if I am using the right operator here, or if there is a
way to share variable globally so I can just perform filter on stream A.
Please let me know your thoughts and thanks for you suggestions again.

Regards,
Chengzhi

On Sat, May 12, 2018 at 8:55 PM, Xingcan Cui <xi...@gmail.com> wrote:

> Hi Chengzhi,
>
> you said the Stream B which comes from a file will be updated
> unpredictably. I wonder if you could share more about how to judge an item
> (from Stream A I suppose) is not in the file and what watermark generation
> strategy did you choose?
>
> Best,
> Xingcan
>
> > On May 12, 2018, at 12:48 AM, Chengzhi Zhao <w....@gmail.com>
> wrote:
> >
> > Hi there,
> >
> > I have a use case to check for active ID, there are two streams and I
> connect them: one has actual data (Stream A) and the other one is for
> lookup purpose (Stream B), I am getting Stream B as a file which includes
> all active ID, so inactive ID would not be show up on this list. I tried to
> use watermark to clean up the state of inactivate ID, but the Stream B
> updates is unpredictable so I want to keep everything in state until I
> found the item is not in that file any more.
> >
> > Please suggest what is the best way to implement it in flink. Thanks in
> advance for your help.
> >
> > Regards,
> > Chengzhi
> >
> >
>
>

Re: Better way to clean up state when connect

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Chengzhi,

you said the Stream B which comes from a file will be updated unpredictably. I wonder if you could share more about how to judge an item (from Stream A I suppose) is not in the file and what watermark generation strategy did you choose?

Best,
Xingcan

> On May 12, 2018, at 12:48 AM, Chengzhi Zhao <w....@gmail.com> wrote:
> 
> Hi there,
> 
> I have a use case to check for active ID, there are two streams and I connect them: one has actual data (Stream A) and the other one is for lookup purpose (Stream B), I am getting Stream B as a file which includes all active ID, so inactive ID would not be show up on this list. I tried to use watermark to clean up the state of inactivate ID, but the Stream B updates is unpredictable so I want to keep everything in state until I found the item is not in that file any more. 
> 
> Please suggest what is the best way to implement it in flink. Thanks in advance for your help.
> 
> Regards,
> Chengzhi
> 
>