You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Renato Marroquín Mogrovejo <re...@gmail.com> on 2016/02/14 22:22:49 UTC

Newbie question

Hi all,

I have two streams in which I need to keep counts of different metrics that
will have to be shared by both of the streams. So they will be sharing some
state once they have finished processing the stream. My question is if I
should do this as a sink aggregating what I need at the end or by doing
them as stateful operators.
I tried putting two hashmaps at the sinks of my streams, but one stream
fills the hashmap and the other one can not update it. I guess they are not
shared or I am doing something wrong.
Any pointers or suggestions would be great! Thanks!


Renato M.

Re: Newbie question

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Renato,

First of all to do anything together on the two streams you probably want
to union them. This means that you need to have a common type. If this is
the case you are lucky and you don't need anything else. Otherwise I
suggest using the Either type provided by Flink as a simple wrapper.

If you just instantiate hashmaps in your sinks, you will end up with as
many hashmaps as the parallelism of your sink. This means that some part of
the stream will see one hashmap, some part the other. This is probably not
good for you because you want to aggregate statistics over the "whole"
stream and also hashmaps are not fault tolerant.

Because of this you want to use some sort of operator state, either
partitioned (KvState) or non-partitioned (Checkpointed interface) depending
on the application logic.

If the statistics that you are aggregating are tied to some keys
(substreams) of the incoming stream, you should probably use a ValueState
in your sink which can be instantiated from the RuntimeContext of the
RichFunctions, in this case a RichSinkFunction. For more details see here:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#using-the-keyvalue-state-interface

If you want to aggregate over the whole incoming stream not by key, then
you have to use a non-parallel sink as this is an inherently non-parallel
operation. In this case you should set the sink parallelism explicitly to
1: stream.addSink(...).setParallelism(1); Now your sink can implement the
Checkpointed interface which you can use to persist your HashMaps. More
info can be found here:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#checkpointing-local-variables

I hope this helped.

Cheers,
Gyula

Renato Marroquín Mogrovejo <re...@gmail.com> ezt írta (időpont:
2016. febr. 14., V, 22:22):

> Hi all,
>
> I have two streams in which I need to keep counts of different metrics
> that will have to be shared by both of the streams. So they will be sharing
> some state once they have finished processing the stream. My question is if
> I should do this as a sink aggregating what I need at the end or by doing
> them as stateful operators.
> I tried putting two hashmaps at the sinks of my streams, but one stream
> fills the hashmap and the other one can not update it. I guess they are not
> shared or I am doing something wrong.
> Any pointers or suggestions would be great! Thanks!
>
>
> Renato M.
>