You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matt <dr...@gmail.com> on 2017/02/05 15:42:39 UTC

Re: Cyclic ConnectedStream

I really don't know what you mean, I've been reading the documentation and
examples showing iterations. but it just won't work for me I believe. Maybe
you can write a quick example? It doesn't matter the details, only the
topology.

If anyone else has an idea it's very welcome!

Matt

On Tue, Jan 31, 2017 at 3:07 PM, Gábor Gévay <gg...@gmail.com> wrote:

> I somehow still suspect that iterations might work for your use case.
> Note, that in the streaming API, iterations are currently nothing more than
> a back-edge in the topology, i.e. a low-level tool to create a cyclic
> topology, like as you say with your hypothetical setter syntax. (It's quite
> different from the iterations of the batch API.)
>
> The tricky part for your use-case is that you would want a ConnectedStream
> as your iteration head, which should get the elements from the back-edge in
> a separated way from the normal input. You could simulate this by using not
> ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input
> element type is an Either type, whose two components would be the normal
> input and the back-edge input. (And you add maps before the closeWith and
> to your input1, which would appropriately wrap into the two alternatives of
> the Either type.)
>
> Best,
> Gábor
>
>
>
> 2017-01-29 15:39 GMT+01:00 Matt <dr...@gmail.com>:
>
>> Check this image for clarification, this is what I'm trying to do:
>> http://i.imgur.com/iZxPv04.png
>>
>> [image: Inline image 1]
>>
>> The rectangles are the two CoFlatMapFunction, sharing a state between
>> process and update (map1 and map2). It's clear from the image that I need
>> input1 and the green box to create the blue box, and input2 and the blue
>> box to create the green one.
>>
>> ---
>> *blue*  = *input1*.connect(*green*).keyBy(...).flatMap(...);
>> *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...);
>> ---
>>
>> As you can see there's no cycle in the flow of data so I guess this
>> topology is valid. The problem is not having a way to define such flow.
>>
>> For instance, with the appropriate setters we would be able to do this:
>>
>> ---
>> *blue*  = *input1*.connect();
>> *green* = *input2*.connect();
>>
>> *blue.*setConnection(*green*);
>> *green*.setConnection(*blue*);
>>
>> *blue*.keyBy(...).flatMap(...);
>> *green*.keyBy(...).flatMap(...);
>> ---
>>
>> Any idea is welcome.
>>
>> Matt
>>
>> On Sat, Jan 28, 2017 at 5:31 PM, Matt <dr...@gmail.com> wrote:
>>
>>> I'm aware of IterativeStream but I don't think it's useful in this case.
>>>
>>> As shown in the example above, my use case is "cyclic" in that the same
>>> object goes from *Input* to *predictionStream* (flatMap1), then to
>>> *statsStream* (flatMap2, where it's updated with an object from *Input2*)
>>> and finally to *predictionStream* (flatMap2).
>>>
>>> The same operator is never applied twice to the object, thus I would say
>>> this dataflow is cyclic only in the dependencies of the stream
>>> (predictionStream depends on statsStream, but it depends on
>>> predictionStream in the first place).
>>>
>>> I hope it is clear now.
>>>
>>> Matt
>>>
>>> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gg...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Cyclic dataflows can be built using iterations:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>>> dev/datastream_api.html#iterations
>>>>
>>>> Best,
>>>> Gábor
>>>>
>>>>
>>>>
>>>>
>>>> 2017-01-28 18:39 GMT+01:00 Matt <dr...@gmail.com>:
>>>> > I have a ConnectedStream (A) that depends on another ConnectedStream
>>>> (B),
>>>> > which depends on the first one (A).
>>>> >
>>>> > Simplified code:
>>>> >
>>>> > predictionStream = input
>>>> >   .connect(statsStream)
>>>> >   .keyBy(...)
>>>> >   .flatMap(CoFlatMapFunction {
>>>> >      flatMap1(obj, output) {
>>>> >          p = prediction(obj)
>>>> >          output.collect(p)
>>>> >      }
>>>> >      flatMap2(stat, output) {
>>>> >          updateModel(stat)
>>>> >      }
>>>> >   })
>>>> >
>>>> > statsStream = input2
>>>> >   .connect(predictionStream)
>>>> >   .keyBy(...)
>>>> >   .flatMap(CoFlatMapFunction {
>>>> >      flatMap1(obj2, output) {
>>>> >         s = getStats(obj2, p)
>>>> >         output.collect(s)
>>>> >      }
>>>> >      flatMap2(prediction, output) {
>>>> >         p = prediction
>>>> >      }
>>>> >   })
>>>> >
>>>> > I'm guessing this should be possible to achieve, one way would be to
>>>> add a
>>>> > sink on statsStream to save the elements into Kafka and read from
>>>> that topic
>>>> > on predictionStream instead of initializing it with a reference of
>>>> > statsStream. But I would rather avoid writing unnecessarily into
>>>> kafka.
>>>> >
>>>> > Is there any other way to achieve this?
>>>> >
>>>> > Thanks,
>>>> > Matt
>>>>
>>>
>>>
>>
>