You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Henrique Colao Zanuz <he...@gmail.com> on 2018/05/10 16:50:16 UTC

Iterative Stream won't loop

Hi,

I am trying to implement a connected components algorithm using
DataStream. For this algorithm, I'm separating the data by tumbling
windows. So, for each window, I'm trying to compute it independently.
This algorithm is iterative because the labels (colors) of the vertices
need to be propagated. Basically, I need to iterate over the following
steps:

Input: *vertices *= Datastream of <VertexId, [list of neighbor vertices],
label>

Loop:
     *labels *= *vertices*.flatmap (emiting a tupple <VertexID, label> for
every  vertices.f0 and every element on vertices.f1)
                  .keyBy(VertexID)
                  .window(...)
                  .min(label);

     *updatedVertices *= *vertices*.
join(labels).where(VertexId).equalTo(VertexId)
                                   .windowAll(...)
                                   .apply(re-emit original *vertices *stream
tuples, but keeping the new labels)

End loop

I am trying to use IterativeStreams to do so. However, despite
successfully separating the tuples that need to be fed back to the loop (by
using filters and closeWith), the subsequent iterations are not happening.
So, what I get is only the first iteration.
I suppose this might come from the fact that I'm creating a new stream
(labels) based on the original IterativeStream, joining it with the
original one (vertices) and only then closing the loop with it.
Do you know whether Flink has some limitation in this respect? and if so,
would you have a hint about a different approach I could take for this
algorithm to avoid this?

thank you in advance,
Henrique Colao

Re: Iterative Stream won't loop

Posted by Rong Rong <wa...@gmail.com>.
Yeah that too. Agree with Paris here. especially you are not only doing a
windowed aggregation but a join step.

Using graph processing engine [1] might be the best idea here.

Another possible way is to create a rich agg function over a combination of
the <label, vertices, intermediate_results> tuple, this way you are
complete in control on (1) what is stored as intermediate results, (2) when
to emit output, (3) how to loop.

Rong

[1] https://flink.apache.org/news/2015/08/24/introducing-flink-gelly.html

On Fri, May 11, 2018 at 3:32 AM, Paris Carbone <pa...@kth.se> wrote:

> Hey!
>
> I would recommend against using iterations with windows for that problem
> at the moment.
> Alongside loop scoping and backpressure that will be addressed by FLIP-15
> [1] I think you also need the notion of stream supersteps, which is
> experimental work in progress for now, from my side at least.
>
> Until these features are added on Flink I would recommend trying out
> gelly-streams [2],  a Flink API for graph stream computation which supports
> connected components in a single pass.
> All you need to do is to convert your stream into edge additions. You can
> try it and let us know what you think [2].
>
> Paris
>
> [1] https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=66853132
> [2] https://github.com/vasia/gelly-streaming
>
>
> On 11 May 2018, at 10:08, Henrique Colao Zanuz <he...@gmail.com>
> wrote:
>
> Hi,
> thank you for your reply
>
> Actually, I'm doing something very similar to your code. The problem I'm
> having is that this structure is not generating any loop. For instance, If
> I print *labelsVerticesGroup*, I only see the initial set of tuples, the
> one from *updated**LabelsVerticesGroup* (at the end of the first
> iteration) and nothing more. So, it means that the content of *updated*
> *LabelsVerticesGroup* is indeed being correctly assigned to
> *labelsVerticesGroup*, but the loop itself is not happening.
>
> For simplicity sake, here I'm omitting the logic behind the separation of
> the tuples that need to be fed back to the loop. I do understand that both
> codes we commented here are expected to loop indefinitely. On the complete
> version of mine, I use the JoinFunction, a ProcessAllWindowFunction, a
> Filter Function and a Map to create a flag that indicates if there was a
> change on the label of a vertex during the join function, then the
> ProcessAllWindowFunction to spread this flag to the whole window, in case
> any tuple had a change. Finally I filter the tuples by this flag. This
> whole mechanism is separating the tuples as expected. However, even if I
> remove this logic from the code, in order to get an infinite loop of the
> tuples (as we get on the code we've written in the previous emails), the
> iteration does not work.
>
> PS. I've been using Flink 1.3.3
>
> Best,
> Henrique
>
> Em sex, 11 de mai de 2018 às 00:01, Rong Rong <wa...@gmail.com>
> escreveu:
>
>> Based on the pseudo code. Seems like you are trying to do the loop by
>> yourself and not suing the iterative.map() function[1].
>>
>> I think you would need to specify the "map" function in order to use the
>> iterative stream. and there should be a clear definition on
>> which data is iterative. In this case you have label & vertices
>> interlacing each other but no specific loop back.
>>
>> I would suggest something close to the example in [1], like
>>      *labelsVerticesGroup* = DataStream<initial_label, *vertices*>
>>
>>      *labels* = *labelsVerticesGroup*.map(...)
>>                   .keyBy(VertexID)
>>                   .window(...)
>>                   .min(label);
>>
>>      *vertices* = *labelsVerticesGroup*.map(...)
>>
>>      *updatedLabelsVerticesGroup* = *vertices*.join(*labels*).where(
>> VertexId).equalTo(VertexId)
>>                   .windowAll(...)
>>                   .agg(...)
>>
>>      *labelsVerticesGroup*.closeWith(*updatedLabelsVerticesGroup**)*
>>
>> Is this what you are looking for?
>>
>> --
>> Rong
>>
>> Reference:
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> master/dev/datastream_api.html#iterations
>>
>> On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz <
>> henrique.colao@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to implement a connected components algorithm using
>>> DataStream. For this algorithm, I'm separating the data by tumbling
>>> windows. So, for each window, I'm trying to compute it independently.
>>> This algorithm is iterative because the labels (colors) of the vertices
>>> need to be propagated. Basically, I need to iterate over the following
>>> steps:
>>>
>>> Input: *vertices *= Datastream of <VertexId, [list of neighbor
>>> vertices], label>
>>>
>>> Loop:
>>>      *labels *= *vertices*.flatmap (emiting a tupple <VertexID, label>
>>> for every  vertices.f0 and every element on vertices.f1)
>>>                   .keyBy(VertexID)
>>>                   .window(...)
>>>                   .min(label);
>>>
>>>      *updatedVertices *= *vertices*. join(labels).where(VertexId).
>>> equalTo(VertexId)
>>>                                    .windowAll(...)
>>>                                    .apply(re-emit original * vertices *stream
>>> tuples, but keeping the new labels)
>>>
>>> End loop
>>>
>>> I am trying to use IterativeStreams to do so. However, despite
>>> successfully separating the tuples that need to be fed back to the loop (by
>>> using filters and closeWith), the subsequent iterations are not happening.
>>> So, what I get is only the first iteration.
>>> I suppose this might come from the fact that I'm creating a new stream
>>> (labels) based on the original IterativeStream, joining it with the
>>> original one (vertices) and only then closing the loop with it.
>>> Do you know whether Flink has some limitation in this respect? and if
>>> so, would you have a hint about a different approach I could take for this
>>> algorithm to avoid this?
>>>
>>> thank you in advance,
>>> Henrique Colao
>>>
>>>
>>>
>>
>

Re: Iterative Stream won't loop

Posted by Paris Carbone <pa...@kth.se>.
Hey!

I would recommend against using iterations with windows for that problem at the moment.
Alongside loop scoping and backpressure that will be addressed by FLIP-15 [1] I think you also need the notion of stream supersteps, which is experimental work in progress for now, from my side at least.

Until these features are added on Flink I would recommend trying out gelly-streams [2],  a Flink API for graph stream computation which supports connected components in a single pass.
All you need to do is to convert your stream into edge additions. You can try it and let us know what you think [2].

Paris

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132
[2] https://github.com/vasia/gelly-streaming

On 11 May 2018, at 10:08, Henrique Colao Zanuz <he...@gmail.com>> wrote:

Hi,
thank you for your reply

Actually, I'm doing something very similar to your code. The problem I'm having is that this structure is not generating any loop. For instance, If I print labelsVerticesGroup, I only see the initial set of tuples, the one from updatedLabelsVerticesGroup (at the end of the first iteration) and nothing more. So, it means that the content of updatedLabelsVerticesGroup is indeed being correctly assigned to labelsVerticesGroup, but the loop itself is not happening.

For simplicity sake, here I'm omitting the logic behind the separation of the tuples that need to be fed back to the loop. I do understand that both codes we commented here are expected to loop indefinitely. On the complete version of mine, I use the JoinFunction, a ProcessAllWindowFunction, a Filter Function and a Map to create a flag that indicates if there was a change on the label of a vertex during the join function, then the ProcessAllWindowFunction to spread this flag to the whole window, in case any tuple had a change. Finally I filter the tuples by this flag. This whole mechanism is separating the tuples as expected. However, even if I remove this logic from the code, in order to get an infinite loop of the tuples (as we get on the code we've written in the previous emails), the iteration does not work.

PS. I've been using Flink 1.3.3

Best,
Henrique

Em sex, 11 de mai de 2018 às 00:01, Rong Rong <wa...@gmail.com>> escreveu:
Based on the pseudo code. Seems like you are trying to do the loop by yourself and not suing the iterative.map() function[1].

I think you would need to specify the "map" function in order to use the iterative stream. and there should be a clear definition on
which data is iterative. In this case you have label & vertices interlacing each other but no specific loop back.

I would suggest something close to the example in [1], like
     labelsVerticesGroup = DataStream<initial_label, vertices>

     labels = labelsVerticesGroup.map(...)
                  .keyBy(VertexID)
                  .window(...)
                  .min(label);

     vertices = labelsVerticesGroup.map(...)

     updatedLabelsVerticesGroup = vertices.join(labels).where(VertexId).equalTo(VertexId)
                  .windowAll(...)
                  .agg(...)

     labelsVerticesGroup.closeWith(updatedLabelsVerticesGroup)

Is this what you are looking for?

--
Rong

Reference:
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#iterations

On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz <he...@gmail.com>> wrote:
Hi,

I am trying to implement a connected components algorithm using DataStream. For this algorithm, I'm separating the data by tumbling windows. So, for each window, I'm trying to compute it independently.
This algorithm is iterative because the labels (colors) of the vertices need to be propagated. Basically, I need to iterate over the following steps:

Input: vertices = Datastream of <VertexId, [list of neighbor vertices], label>

Loop:
     labels = vertices.flatmap (emiting a tupple <VertexID, label> for every  vertices.f0 and every element on vertices.f1)
                  .keyBy(VertexID)
                  .window(...)
                  .min(label);

     updatedVertices = vertices. join(labels).where(VertexId).equalTo(VertexId)
                                   .windowAll(...)
                                   .apply(re-emit original vertices stream tuples, but keeping the new labels)

End loop

I am trying to use IterativeStreams to do so. However, despite successfully separating the tuples that need to be fed back to the loop (by using filters and closeWith), the subsequent iterations are not happening. So, what I get is only the first iteration.
I suppose this might come from the fact that I'm creating a new stream (labels) based on the original IterativeStream, joining it with the original one (vertices) and only then closing the loop with it.
Do you know whether Flink has some limitation in this respect? and if so, would you have a hint about a different approach I could take for this algorithm to avoid this?

thank you in advance,
Henrique Colao





Re: Iterative Stream won't loop

Posted by Henrique Colao Zanuz <he...@gmail.com>.
Hi,
thank you for your reply

Actually, I'm doing something very similar to your code. The problem I'm
having is that this structure is not generating any loop. For instance, If
I print *labelsVerticesGroup*, I only see the initial set of tuples, the
one from *updated**LabelsVerticesGroup* (at the end of the first iteration)
and nothing more. So, it means that the content of *updated*
*LabelsVerticesGroup* is indeed being correctly assigned to
*labelsVerticesGroup*, but the loop itself is not happening.

For simplicity sake, here I'm omitting the logic behind the separation of
the tuples that need to be fed back to the loop. I do understand that both
codes we commented here are expected to loop indefinitely. On the complete
version of mine, I use the JoinFunction, a ProcessAllWindowFunction, a
Filter Function and a Map to create a flag that indicates if there was a
change on the label of a vertex during the join function, then the
ProcessAllWindowFunction to spread this flag to the whole window, in case
any tuple had a change. Finally I filter the tuples by this flag. This
whole mechanism is separating the tuples as expected. However, even if I
remove this logic from the code, in order to get an infinite loop of the
tuples (as we get on the code we've written in the previous emails), the
iteration does not work.

PS. I've been using Flink 1.3.3

Best,
Henrique

Em sex, 11 de mai de 2018 às 00:01, Rong Rong <wa...@gmail.com>
escreveu:

> Based on the pseudo code. Seems like you are trying to do the loop by
> yourself and not suing the iterative.map() function[1].
>
> I think you would need to specify the "map" function in order to use the
> iterative stream. and there should be a clear definition on
> which data is iterative. In this case you have label & vertices
> interlacing each other but no specific loop back.
>
> I would suggest something close to the example in [1], like
>      *labelsVerticesGroup* = DataStream<initial_label, *vertices*>
>
>      *labels* = *labelsVerticesGroup*.map(...)
>                   .keyBy(VertexID)
>                   .window(...)
>                   .min(label);
>
>      *vertices* = *labelsVerticesGroup*.map(...)
>
>      *updatedLabelsVerticesGroup* = *vertices*.join(*labels*
> ).where(VertexId).equalTo(VertexId)
>                   .windowAll(...)
>                   .agg(...)
>
>      *labelsVerticesGroup*.closeWith(*updatedLabelsVerticesGroup**)*
>
> Is this what you are looking for?
>
> --
> Rong
>
> Reference:
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#iterations
>
> On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz <
> henrique.colao@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to implement a connected components algorithm using
>> DataStream. For this algorithm, I'm separating the data by tumbling
>> windows. So, for each window, I'm trying to compute it independently.
>> This algorithm is iterative because the labels (colors) of the vertices
>> need to be propagated. Basically, I need to iterate over the following
>> steps:
>>
>> Input: *vertices *= Datastream of <VertexId, [list of neighbor
>> vertices], label>
>>
>> Loop:
>>      *labels *= *vertices*.flatmap (emiting a tupple <VertexID, label>
>> for every  vertices.f0 and every element on vertices.f1)
>>                   .keyBy(VertexID)
>>                   .window(...)
>>                   .min(label);
>>
>>      *updatedVertices *= *vertices*.
>> join(labels).where(VertexId).equalTo(VertexId)
>>                                    .windowAll(...)
>>                                    .apply(re-emit original *vertices *stream
>> tuples, but keeping the new labels)
>>
>> End loop
>>
>> I am trying to use IterativeStreams to do so. However, despite
>> successfully separating the tuples that need to be fed back to the loop (by
>> using filters and closeWith), the subsequent iterations are not happening.
>> So, what I get is only the first iteration.
>> I suppose this might come from the fact that I'm creating a new stream
>> (labels) based on the original IterativeStream, joining it with the
>> original one (vertices) and only then closing the loop with it.
>> Do you know whether Flink has some limitation in this respect? and if so,
>> would you have a hint about a different approach I could take for this
>> algorithm to avoid this?
>>
>> thank you in advance,
>> Henrique Colao
>>
>>
>>
>

Re: Iterative Stream won't loop

Posted by Rong Rong <wa...@gmail.com>.
Based on the pseudo code. Seems like you are trying to do the loop by
yourself and not suing the iterative.map() function[1].

I think you would need to specify the "map" function in order to use the
iterative stream. and there should be a clear definition on
which data is iterative. In this case you have label & vertices interlacing
each other but no specific loop back.

I would suggest something close to the example in [1], like
     *labelsVerticesGroup* = DataStream<initial_label, *vertices*>

     *labels* = *labelsVerticesGroup*.map(...)
                  .keyBy(VertexID)
                  .window(...)
                  .min(label);

     *vertices* = *labelsVerticesGroup*.map(...)

     *updatedLabelsVerticesGroup* = *vertices*.join(*labels*
).where(VertexId).equalTo(VertexId)
                  .windowAll(...)
                  .agg(...)

     *labelsVerticesGroup*.closeWith(*updatedLabelsVerticesGroup**)*

Is this what you are looking for?

--
Rong

Reference:
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#iterations

On Thu, May 10, 2018 at 9:50 AM, Henrique Colao Zanuz <
henrique.colao@gmail.com> wrote:

> Hi,
>
> I am trying to implement a connected components algorithm using
> DataStream. For this algorithm, I'm separating the data by tumbling
> windows. So, for each window, I'm trying to compute it independently.
> This algorithm is iterative because the labels (colors) of the vertices
> need to be propagated. Basically, I need to iterate over the following
> steps:
>
> Input: *vertices *= Datastream of <VertexId, [list of neighbor vertices],
> label>
>
> Loop:
>      *labels *= *vertices*.flatmap (emiting a tupple <VertexID, label>
> for every  vertices.f0 and every element on vertices.f1)
>                   .keyBy(VertexID)
>                   .window(...)
>                   .min(label);
>
>      *updatedVertices *= *vertices*. join(labels).where(VertexId).
> equalTo(VertexId)
>                                    .windowAll(...)
>                                    .apply(re-emit original *vertices *stream
> tuples, but keeping the new labels)
>
> End loop
>
> I am trying to use IterativeStreams to do so. However, despite
> successfully separating the tuples that need to be fed back to the loop (by
> using filters and closeWith), the subsequent iterations are not happening.
> So, what I get is only the first iteration.
> I suppose this might come from the fact that I'm creating a new stream
> (labels) based on the original IterativeStream, joining it with the
> original one (vertices) and only then closing the loop with it.
> Do you know whether Flink has some limitation in this respect? and if so,
> would you have a hint about a different approach I could take for this
> algorithm to avoid this?
>
> thank you in advance,
> Henrique Colao
>
>
>