You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by nsengupta <se...@gmail.com> on 2016/05/13 17:42:47 UTC

Sharing State between Operators

Hello Flinksters


Alright. So, I had a fruitful exchange of messages with Balaji earlier
today, on this topic. I moved ahead with the understanding derived from the
exchange (thanks, Balaji) at the time. But, now I am back because I think my
approach is unclean, if not incorrect. There probably is a smarter way to
achieve the same but I can't figure it out.

Here's the problem:

A building has 4 walls (0,1,2,3). On each wall, a number of devices has been
planted to capture some physical attribute: let's say temperature at that
spot. Every device has a unique ID. 

A typical tuple looks like this (Reading ==> Temperature as an Integer):
(TupleType,Time,WallID,DeviceID,Reading) 

The system works on the basis of records arriving in a time-window of 60
seconds. We can consider this to be a Tumbling Window. The time (and Window
assignment etc.) is not the issue here. The 'Time' field increases
monotonically.

If TupleType == 0, I need to compute and update my data structures from the
stream

If TupleType == 1, I need to emit the maximum temperature recorded by the
DeviceID out of last 5 readings.

If TupleType == 2, I need to emit the number of readings so far arrived from
the particular wall. Obviously, in this case, we will ignore the value of
fields 'DeviceID' and 'Reading' in the tuple.

The Application generates output for TupleType 1 and TupleType 2. 

The TupleTypes can arrive in any order. For example, TupleType 1 may arrive
with a DeviceID which the application hasn't seen before (no corresponding
TupleType 0 has arrived earlier with that DeviceID). Let us assume that we
have a fallback value to be emitted for such cases, to keep things simple.

In my mind, the implementation should be along this line:

- Split the incoming Stream in three separate substreams using SplitStream,
based upon TupleType
- For StreamOFTupleType0,
  - KeyBy(DeviceID)
  - Apply a Mapper
     - Update a Map [DeviceID, [Tuple2(MaxReadingSoFar,
FixedSizeList[Reading])] somewhere
  - Apply (next) Mapper
     - Calculate the total count of reading the Wall so far
     - Update a Map [WallID, Count]

- For StreamOFTupleType1
     - Access the Map created/updated through the first Mapper above
     - Emit

- For StreamOFTupleType2
    - Access the Map created/updated through the second Mapper above. 
    - Emit

I have hit a wall to decide how the live data structures should be created,
updated and accessed, correctly and efficiently  in a situation like above.
More importantly, how will they be shared between operators, across
partitions (nodes).

I can't broadcast the Maps because they are not READONLY (/aka/ LookUp
only).

I can't create RichMapFunction local data structures because they are not
shared between partitions (my understanding). They will be blind to the
effect of accumulation. Each will begin with an empty Map.

I have done a bit of exploration and I have found this  thread
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/mutable-hashmap-outside-of-stream-does-it-get-snapshotted-td6002.html#a6013>  
in the forum. I have understood what  Stephano
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=266>  
is suggesting ('..State is moved along pipeline ..')  but then, failed to
figure out how to apply in my case, if at all possible.

I have been thinking about using an external DB-like datastore but I want to
be sure about the inevitability of that decision. If I use a DB, then the
focus may go to the INSERT/SELECT like queries. My application then becomes
more of a distributed DB application rather than a lean Streaming
application. That thought doesn't make me happy! :-)

Please make me wiser (by pointing out gaps in understanding where they
exist). If any more specific information helps you, please ask me.

My primary aim is to have a clarity of the recipe of a UseCase like this.

-- Nirmalya
  






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Sharing-State-between-Operators-tp6911.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Sharing State between Operators

Posted by Aljoscha Krettek <al...@apache.org>.
I prepared a small example that outlines how something like this could be
implemented:
https://gist.github.com/aljoscha/36afedce40abf8ae92b92d4355809ff1

It doesn't include all your requirements, such as count per wall, etc. But
this should get you started on the right path.

I hope this helps!

On Fri, 13 May 2016 at 20:18 nsengupta <se...@gmail.com> wrote:

> Hello Flinksters
>
>
> Alright. So, I had a fruitful exchange of messages with Balaji earlier
> today, on this topic. I moved ahead with the understanding derived from the
> exchange (thanks, Balaji) at the time. But, now I am back because I think
> my
> approach is unclean, if not incorrect. There probably is a smarter way to
> achieve the same but I can't figure it out.
>
> Here's the problem:
>
> A building has 4 walls (0,1,2,3). On each wall, a number of devices has
> been
> planted to capture some physical attribute: let's say temperature at that
> spot. Every device has a unique ID.
>
> A typical tuple looks like this (Reading ==> Temperature as an Integer):
> (TupleType,Time,WallID,DeviceID,Reading)
>
> The system works on the basis of records arriving in a time-window of 60
> seconds. We can consider this to be a Tumbling Window. The time (and Window
> assignment etc.) is not the issue here. The 'Time' field increases
> monotonically.
>
> If TupleType == 0, I need to compute and update my data structures from the
> stream
>
> If TupleType == 1, I need to emit the maximum temperature recorded by the
> DeviceID out of last 5 readings.
>
> If TupleType == 2, I need to emit the number of readings so far arrived
> from
> the particular wall. Obviously, in this case, we will ignore the value of
> fields 'DeviceID' and 'Reading' in the tuple.
>
> The Application generates output for TupleType 1 and TupleType 2.
>
> The TupleTypes can arrive in any order. For example, TupleType 1 may arrive
> with a DeviceID which the application hasn't seen before (no corresponding
> TupleType 0 has arrived earlier with that DeviceID). Let us assume that we
> have a fallback value to be emitted for such cases, to keep things simple.
>
> In my mind, the implementation should be along this line:
>
> - Split the incoming Stream in three separate substreams using SplitStream,
> based upon TupleType
> - For StreamOFTupleType0,
>   - KeyBy(DeviceID)
>   - Apply a Mapper
>      - Update a Map [DeviceID, [Tuple2(MaxReadingSoFar,
> FixedSizeList[Reading])] somewhere
>   - Apply (next) Mapper
>      - Calculate the total count of reading the Wall so far
>      - Update a Map [WallID, Count]
>
> - For StreamOFTupleType1
>      - Access the Map created/updated through the first Mapper above
>      - Emit
>
> - For StreamOFTupleType2
>     - Access the Map created/updated through the second Mapper above.
>     - Emit
>
> I have hit a wall to decide how the live data structures should be created,
> updated and accessed, correctly and efficiently  in a situation like above.
> More importantly, how will they be shared between operators, across
> partitions (nodes).
>
> I can't broadcast the Maps because they are not READONLY (/aka/ LookUp
> only).
>
> I can't create RichMapFunction local data structures because they are not
> shared between partitions (my understanding). They will be blind to the
> effect of accumulation. Each will begin with an empty Map.
>
> I have done a bit of exploration and I have found this  thread
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/mutable-hashmap-outside-of-stream-does-it-get-snapshotted-td6002.html#a6013
> >
> in the forum. I have understood what  Stephano
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=266
> >
> is suggesting ('..State is moved along pipeline ..')  but then, failed to
> figure out how to apply in my case, if at all possible.
>
> I have been thinking about using an external DB-like datastore but I want
> to
> be sure about the inevitability of that decision. If I use a DB, then the
> focus may go to the INSERT/SELECT like queries. My application then becomes
> more of a distributed DB application rather than a lean Streaming
> application. That thought doesn't make me happy! :-)
>
> Please make me wiser (by pointing out gaps in understanding where they
> exist). If any more specific information helps you, please ask me.
>
> My primary aim is to have a clarity of the recipe of a UseCase like this.
>
> -- Nirmalya
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Sharing-State-between-Operators-tp6911.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>