You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by defstat <de...@gmail.com> on 2015/08/23 21:36:02 UTC

Statefull computation

Hi. I am struggling the past few days to find a solution on the following
problem, using Apache Flink: 

I have a stream of vectors, represented by files in a local folder. After a
new text file is located using DataStream<String> text =
env.readFileStream(...), I transform (flatMap), the Input into a
SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the Integer
being the score coming from a scoring function. 

I want to persist a global HashMap containing the top-k vectors, using their
scores. I approached the problem using a statefull transformation. 
1. The first problem I have is that the HashMap retains per-sink data (so
for each thread of workers, one HashMap of data). How can I make that a
Global collection 

2. Using Apache Spark, I made that possible by 
JavaPairDStream<String, Integer> stateDstream =
tuples.updateStateByKey(updateFunction); 

and then making transformations on the stateDstream. Is there a way I can
get the same functionality using FLink? 

Thanks in advance! 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Statefull computation

Posted by Gyula Fóra <gy...@gmail.com>.
Hi,

Okay, than I understood correctly.

My point was something different. I never said that the approach I
suggested will produce identical results to the continuos DOP 1 top-k,
because thats impossible to parallelize.

What I suggested is to apply batch (or window) updates which would
periodically give you the "current" top-k (so some updates will be
overwritten before being sent to the output). If this is feasible or not,
depends on the application, but it should probably be fine.

Cheers,
Gyula

On Mon, Aug 24, 2015 at 8:46 AM Aljoscha Krettek <al...@apache.org>
wrote:

>
> Hi,
> In the example the result is not correct because the values for a,b,c and
> d are never forwarded from instance 2 even though they would modify the
> global top-k result. It works, though, if you partition by the key field
> (tuple field 0, in this case) before doing the summation and local top-k. I
> think.
>
> Best,
> Aljoscha
>
> On Sun, 23 Aug 2015 at 23:07 Gyula Fóra <gy...@gmail.com> wrote:
>
>> Hey,
>>
>> I am not sure if I get it, why aren't the results correct?
>>
>> You don't instantly get the global top-k, but you are always updating it
>> with the new local results.
>>
>> Gyula
>>
>> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. aug.
>> 23., V, 22:58):
>>
>>> Hi,
>>> I wanted to post something along the same lines but now I don't think
>>> the approach with local top-ks and merging works. For example, if you want
>>> to get top-4 and you do the pre-processing in two parallel instances. This
>>> input data would lead to incorrect results:
>>>
>>> 1. Instance:
>>> a 6
>>> b 5
>>> c 4
>>> d 3
>>>
>>> 2. Instance:
>>> e 10
>>> f 9
>>> g 8
>>> h 7
>>> a 6
>>> b 5
>>> c 4
>>> d 3
>>>
>>> So each parallel instance would forward its local top-4, which would
>>> lead to the end result:
>>> e 10
>>> f 9
>>> g 8
>>> h 7
>>>
>>> Which is wrong. I think no matter how many elements you forward you can
>>> construct cases that lead to wrong results. (The problem seems to be that
>>> top-k is inherently global.)
>>>
>>> Might also be that I'm tired and not seeing this right... :D
>>>
>>> For the case where your elements are partitioned by some key you should
>>> be fine, though, as Gyula mentioned.
>>>
>>> I'm not familiar with the Spark API, maybe you can help me out. What
>>> does the updateStateByKey() do if your state is not actually partitioned by
>>> a key. Plus, I'm curious in general what Spark does with this call.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gy...@apache.org> wrote:
>>>
>>>> Hey!
>>>>
>>>> What you are trying to do here is a global rolling aggregation, which
>>>> is inherently a DOP 1 operation. Your observation is correct that if you
>>>> want to use a simple stateful sink, you need to make sure that you set the
>>>> parallelism to 1 in order to get correct results.
>>>>
>>>> What you can do is to keep local top-ks in a parallel operator (let's
>>>> say a flatmap) and periodically output the local top-k elements and merge
>>>> them in a sink with parallelism=1 to produce a global top-k.
>>>>
>>>> I am not 100% sure how you implemented the same functionality in spark
>>>> but there you probably achieved the semantics I described above.
>>>>
>>>> The whole problem is much easier if you are interested in the top-k
>>>> elements grouped by some key, as then you can use partitioned operator
>>>> states which will give you the correct results with arbitrary parallelism.
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>> defstat <de...@gmail.com> ezt írta (időpont: 2015. aug. 23., V,
>>>> 21:40):
>>>>
>>>>> Hi. I am struggling the past few days to find a solution on the
>>>>> following
>>>>> problem, using Apache Flink:
>>>>>
>>>>> I have a stream of vectors, represented by files in a local folder.
>>>>> After a
>>>>> new text file is located using DataStream<String> text =
>>>>> env.readFileStream(...), I transform (flatMap), the Input into a
>>>>> SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the
>>>>> Integer
>>>>> being the score coming from a scoring function.
>>>>>
>>>>> I want to persist a global HashMap containing the top-k vectors, using
>>>>> their
>>>>> scores. I approached the problem using a statefull transformation.
>>>>> 1. The first problem I have is that the HashMap retains per-sink data
>>>>> (so
>>>>> for each thread of workers, one HashMap of data). How can I make that a
>>>>> Global collection
>>>>>
>>>>> 2. Using Apache Spark, I made that possible by
>>>>> JavaPairDStream<String, Integer> stateDstream =
>>>>> tuples.updateStateByKey(updateFunction);
>>>>>
>>>>> and then making transformations on the stateDstream. Is there a way I
>>>>> can
>>>>> get the same functionality using FLink?
>>>>>
>>>>> Thanks in advance!
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive at Nabble.com.
>>>>>
>>>>

Re: Statefull computation

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
In the example the result is not correct because the values for a,b,c and d
are never forwarded from instance 2 even though they would modify the
global top-k result. It works, though, if you partition by the key field
(tuple field 0, in this case) before doing the summation and local top-k. I
think.

Best,
Aljoscha

On Sun, 23 Aug 2015 at 23:07 Gyula Fóra <gy...@gmail.com> wrote:

> Hey,
>
> I am not sure if I get it, why aren't the results correct?
>
> You don't instantly get the global top-k, but you are always updating it
> with the new local results.
>
> Gyula
>
> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. aug. 23.,
> V, 22:58):
>
>> Hi,
>> I wanted to post something along the same lines but now I don't think the
>> approach with local top-ks and merging works. For example, if you want to
>> get top-4 and you do the pre-processing in two parallel instances. This
>> input data would lead to incorrect results:
>>
>> 1. Instance:
>> a 6
>> b 5
>> c 4
>> d 3
>>
>> 2. Instance:
>> e 10
>> f 9
>> g 8
>> h 7
>> a 6
>> b 5
>> c 4
>> d 3
>>
>> So each parallel instance would forward its local top-4, which would lead
>> to the end result:
>> e 10
>> f 9
>> g 8
>> h 7
>>
>> Which is wrong. I think no matter how many elements you forward you can
>> construct cases that lead to wrong results. (The problem seems to be that
>> top-k is inherently global.)
>>
>> Might also be that I'm tired and not seeing this right... :D
>>
>> For the case where your elements are partitioned by some key you should
>> be fine, though, as Gyula mentioned.
>>
>> I'm not familiar with the Spark API, maybe you can help me out. What does
>> the updateStateByKey() do if your state is not actually partitioned by a
>> key. Plus, I'm curious in general what Spark does with this call.
>>
>> Cheers,
>> Aljoscha
>>
>> On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gy...@apache.org> wrote:
>>
>>> Hey!
>>>
>>> What you are trying to do here is a global rolling aggregation, which is
>>> inherently a DOP 1 operation. Your observation is correct that if you want
>>> to use a simple stateful sink, you need to make sure that you set the
>>> parallelism to 1 in order to get correct results.
>>>
>>> What you can do is to keep local top-ks in a parallel operator (let's
>>> say a flatmap) and periodically output the local top-k elements and merge
>>> them in a sink with parallelism=1 to produce a global top-k.
>>>
>>> I am not 100% sure how you implemented the same functionality in spark
>>> but there you probably achieved the semantics I described above.
>>>
>>> The whole problem is much easier if you are interested in the top-k
>>> elements grouped by some key, as then you can use partitioned operator
>>> states which will give you the correct results with arbitrary parallelism.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> defstat <de...@gmail.com> ezt írta (időpont: 2015. aug. 23., V,
>>> 21:40):
>>>
>>>> Hi. I am struggling the past few days to find a solution on the
>>>> following
>>>> problem, using Apache Flink:
>>>>
>>>> I have a stream of vectors, represented by files in a local folder.
>>>> After a
>>>> new text file is located using DataStream<String> text =
>>>> env.readFileStream(...), I transform (flatMap), the Input into a
>>>> SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the
>>>> Integer
>>>> being the score coming from a scoring function.
>>>>
>>>> I want to persist a global HashMap containing the top-k vectors, using
>>>> their
>>>> scores. I approached the problem using a statefull transformation.
>>>> 1. The first problem I have is that the HashMap retains per-sink data
>>>> (so
>>>> for each thread of workers, one HashMap of data). How can I make that a
>>>> Global collection
>>>>
>>>> 2. Using Apache Spark, I made that possible by
>>>> JavaPairDStream<String, Integer> stateDstream =
>>>> tuples.updateStateByKey(updateFunction);
>>>>
>>>> and then making transformations on the stateDstream. Is there a way I
>>>> can
>>>> get the same functionality using FLink?
>>>>
>>>> Thanks in advance!
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive at Nabble.com.
>>>>
>>>

Re: Statefull computation

Posted by Gyula Fóra <gy...@gmail.com>.
Hey,

I am not sure if I get it, why aren't the results correct?

You don't instantly get the global top-k, but you are always updating it
with the new local results.

Gyula

Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. aug. 23.,
V, 22:58):

> Hi,
> I wanted to post something along the same lines but now I don't think the
> approach with local top-ks and merging works. For example, if you want to
> get top-4 and you do the pre-processing in two parallel instances. This
> input data would lead to incorrect results:
>
> 1. Instance:
> a 6
> b 5
> c 4
> d 3
>
> 2. Instance:
> e 10
> f 9
> g 8
> h 7
> a 6
> b 5
> c 4
> d 3
>
> So each parallel instance would forward its local top-4, which would lead
> to the end result:
> e 10
> f 9
> g 8
> h 7
>
> Which is wrong. I think no matter how many elements you forward you can
> construct cases that lead to wrong results. (The problem seems to be that
> top-k is inherently global.)
>
> Might also be that I'm tired and not seeing this right... :D
>
> For the case where your elements are partitioned by some key you should be
> fine, though, as Gyula mentioned.
>
> I'm not familiar with the Spark API, maybe you can help me out. What does
> the updateStateByKey() do if your state is not actually partitioned by a
> key. Plus, I'm curious in general what Spark does with this call.
>
> Cheers,
> Aljoscha
>
> On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gy...@apache.org> wrote:
>
>> Hey!
>>
>> What you are trying to do here is a global rolling aggregation, which is
>> inherently a DOP 1 operation. Your observation is correct that if you want
>> to use a simple stateful sink, you need to make sure that you set the
>> parallelism to 1 in order to get correct results.
>>
>> What you can do is to keep local top-ks in a parallel operator (let's say
>> a flatmap) and periodically output the local top-k elements and merge them
>> in a sink with parallelism=1 to produce a global top-k.
>>
>> I am not 100% sure how you implemented the same functionality in spark
>> but there you probably achieved the semantics I described above.
>>
>> The whole problem is much easier if you are interested in the top-k
>> elements grouped by some key, as then you can use partitioned operator
>> states which will give you the correct results with arbitrary parallelism.
>>
>> Cheers,
>> Gyula
>>
>> defstat <de...@gmail.com> ezt írta (időpont: 2015. aug. 23., V, 21:40):
>>
>>> Hi. I am struggling the past few days to find a solution on the following
>>> problem, using Apache Flink:
>>>
>>> I have a stream of vectors, represented by files in a local folder.
>>> After a
>>> new text file is located using DataStream<String> text =
>>> env.readFileStream(...), I transform (flatMap), the Input into a
>>> SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the
>>> Integer
>>> being the score coming from a scoring function.
>>>
>>> I want to persist a global HashMap containing the top-k vectors, using
>>> their
>>> scores. I approached the problem using a statefull transformation.
>>> 1. The first problem I have is that the HashMap retains per-sink data (so
>>> for each thread of workers, one HashMap of data). How can I make that a
>>> Global collection
>>>
>>> 2. Using Apache Spark, I made that possible by
>>> JavaPairDStream<String, Integer> stateDstream =
>>> tuples.updateStateByKey(updateFunction);
>>>
>>> and then making transformations on the stateDstream. Is there a way I can
>>> get the same functionality using FLink?
>>>
>>> Thanks in advance!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>

Re: Statefull computation

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I wanted to post something along the same lines but now I don't think the
approach with local top-ks and merging works. For example, if you want to
get top-4 and you do the pre-processing in two parallel instances. This
input data would lead to incorrect results:

1. Instance:
a 6
b 5
c 4
d 3

2. Instance:
e 10
f 9
g 8
h 7
a 6
b 5
c 4
d 3

So each parallel instance would forward its local top-4, which would lead
to the end result:
e 10
f 9
g 8
h 7

Which is wrong. I think no matter how many elements you forward you can
construct cases that lead to wrong results. (The problem seems to be that
top-k is inherently global.)

Might also be that I'm tired and not seeing this right... :D

For the case where your elements are partitioned by some key you should be
fine, though, as Gyula mentioned.

I'm not familiar with the Spark API, maybe you can help me out. What does
the updateStateByKey() do if your state is not actually partitioned by a
key. Plus, I'm curious in general what Spark does with this call.

Cheers,
Aljoscha

On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <gy...@apache.org> wrote:

> Hey!
>
> What you are trying to do here is a global rolling aggregation, which is
> inherently a DOP 1 operation. Your observation is correct that if you want
> to use a simple stateful sink, you need to make sure that you set the
> parallelism to 1 in order to get correct results.
>
> What you can do is to keep local top-ks in a parallel operator (let's say
> a flatmap) and periodically output the local top-k elements and merge them
> in a sink with parallelism=1 to produce a global top-k.
>
> I am not 100% sure how you implemented the same functionality in spark but
> there you probably achieved the semantics I described above.
>
> The whole problem is much easier if you are interested in the top-k
> elements grouped by some key, as then you can use partitioned operator
> states which will give you the correct results with arbitrary parallelism.
>
> Cheers,
> Gyula
>
> defstat <de...@gmail.com> ezt írta (időpont: 2015. aug. 23., V, 21:40):
>
>> Hi. I am struggling the past few days to find a solution on the following
>> problem, using Apache Flink:
>>
>> I have a stream of vectors, represented by files in a local folder. After
>> a
>> new text file is located using DataStream<String> text =
>> env.readFileStream(...), I transform (flatMap), the Input into a
>> SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the
>> Integer
>> being the score coming from a scoring function.
>>
>> I want to persist a global HashMap containing the top-k vectors, using
>> their
>> scores. I approached the problem using a statefull transformation.
>> 1. The first problem I have is that the HashMap retains per-sink data (so
>> for each thread of workers, one HashMap of data). How can I make that a
>> Global collection
>>
>> 2. Using Apache Spark, I made that possible by
>> JavaPairDStream<String, Integer> stateDstream =
>> tuples.updateStateByKey(updateFunction);
>>
>> and then making transformations on the stateDstream. Is there a way I can
>> get the same functionality using FLink?
>>
>> Thanks in advance!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>

Re: Statefull computation

Posted by Gyula Fóra <gy...@apache.org>.
Hey!

What you are trying to do here is a global rolling aggregation, which is
inherently a DOP 1 operation. Your observation is correct that if you want
to use a simple stateful sink, you need to make sure that you set the
parallelism to 1 in order to get correct results.

What you can do is to keep local top-ks in a parallel operator (let's say a
flatmap) and periodically output the local top-k elements and merge them in
a sink with parallelism=1 to produce a global top-k.

I am not 100% sure how you implemented the same functionality in spark but
there you probably achieved the semantics I described above.

The whole problem is much easier if you are interested in the top-k
elements grouped by some key, as then you can use partitioned operator
states which will give you the correct results with arbitrary parallelism.

Cheers,
Gyula

defstat <de...@gmail.com> ezt írta (időpont: 2015. aug. 23., V, 21:40):

> Hi. I am struggling the past few days to find a solution on the following
> problem, using Apache Flink:
>
> I have a stream of vectors, represented by files in a local folder. After a
> new text file is located using DataStream<String> text =
> env.readFileStream(...), I transform (flatMap), the Input into a
> SingleOutputStreamOperator<Tuple2&lt;String, Integer>, ?>, with the Integer
> being the score coming from a scoring function.
>
> I want to persist a global HashMap containing the top-k vectors, using
> their
> scores. I approached the problem using a statefull transformation.
> 1. The first problem I have is that the HashMap retains per-sink data (so
> for each thread of workers, one HashMap of data). How can I make that a
> Global collection
>
> 2. Using Apache Spark, I made that possible by
> JavaPairDStream<String, Integer> stateDstream =
> tuples.updateStateByKey(updateFunction);
>
> and then making transformations on the stateDstream. Is there a way I can
> get the same functionality using FLink?
>
> Thanks in advance!
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefull-computation-tp2494.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>