You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bart van Deenen <ba...@fastmail.fm> on 2016/04/07 15:41:06 UTC

mutable hashmap outside of stream, does it get snapshotted ?

Hi all

I'm having a datastream transformation, that updates a mutable 
hashmap that exists outside of the stream.

So it's something like

object FlinkJob {
  val uriLookup = mutable.HashMap.empty[String, Int]


  def main(args: Array[String]) {

    val stream: DataStream = ...

    stream.keybBy(1).timeWindow(..).fold(..)
    .window(..)
    .map(..).fold(..)
    .addSink(..)
  }
}

where the uriLookup hashmap gets updated inside the stream
transformation, 
and is serialized in the step before the addSink

It works fine, however

Does the snapshotting mechanism in case of a node failure actually
serialize this map?

And out of curiousity, can I actually see what data exists inside the
snapshot data?

Thanks.

Bart


Re: mutable hashmap outside of stream, does it get snapshotted ?

Posted by Bart van Deenen <ba...@fastmail.fm>.
Thanks all!
I was under the mistaken impression that Flink automagically did the
snapshotting for me.
The info is really clear, I'll have no trouble implementing it.
 
Bart
 
 
On Thu, Apr 7, 2016, at 18:40, Aljoscha Krettek wrote:
> Hi,
> good explanation and pointers!
>
> I just want to add that the uriLookup table in your example is not
> really shared between your operator instances in a distributed
> setting. When serializing your transformations the current state of
> the HashMap is serialized with them because it is in the closure of
> the transformations. Then, on the cluster, the HashMap is serialized
> and all parallel instances basically work on their now local copy of
> the empty HashMap.
>
> Cheers,
> Aljoscha
>
> On Thu, 7 Apr 2016 at 18:30 Stefano Baghino
> <st...@radicalbit.io> wrote:
>> Hi Bart,
>>
>> to make sure that the state is checkpointed you have to:
>>  1. configure your Flink installation with a reliable state backend
>>     (optional for development, you can read more about it here[1])
>>  2. explicitly enable checkpointing in your program (see how here[2]
>>     — it's just a couple of lines of code)
>>  3. extend your operators so that they checkpoint data, by
>>     implementing the `Checkpointed` interface or using an instance
>>     field (the semantics of the two approaches are slightly
>>     different, you can read more about it here[3])
>> When your data is checkpointed you can access the state if your
>> operator implements the `RichFunction` interface (via an abstract
>> class that wraps the operator you need to implement, like
>> `RichMapFunction`).
>>
>> For your need in particular, I don't know a way to checkpoint state
>> shared between different operators; perhaps you can you refactor your
>> code so that the state is encapsulated in an operator implementation
>> and then moved through your pipeline as a parameter of the following
>> operators. Would that work?
>>
>> I apologize for just providing pointers to the docs in my reply but
>> checkpointing deserves a good explanation and I feel the docs get the
>> job done pretty well. I will gladly help you if you have any doubt.
>>
>> Hope I've been of some help.
>>
>>
>> On Thu, Apr 7, 2016 at 3:41 PM, Bart van Deenen
>> <ba...@fastmail.fm> wrote:
>>> Hi all
>>>
>>>  I'm having a datastream transformation, that updates a mutable
>>>  hashmap that exists outside of the stream.
>>>
>>>  So it's something like
>>>
>>>  object FlinkJob {
>>>  val uriLookup = mutable.HashMap.empty[String, Int]
>>>
>>>
>>>  def main(args: Array[String]) {
>>>
>>>  val stream: DataStream = ...
>>>
>>>  stream.keybBy(1).timeWindow(..).fold(..)
>>>  .window(..)
>>>  .map(..).fold(..)
>>>  .addSink(..)
>>>  }
>>>  }
>>>
>>>  where the uriLookup hashmap gets updated inside the stream
>>>  transformation,
>>>  and is serialized in the step before the addSink
>>>
>>>  It works fine, however
>>>
>>>  Does the snapshotting mechanism in case of a node failure actually
>>>  serialize this map?
>>>
>>>  And out of curiousity, can I actually see what data exists
>>>  inside the
>>>  snapshot data?
>>>
>>>  Thanks.
>>>
>>>  Bart
>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>> Software Engineer @ Radicalbit
 

Links:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html#configuration
  2. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/fault_tolerance.html
  3. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

Re: mutable hashmap outside of stream, does it get snapshotted ?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
good explanation and pointers!

I just want to add that the uriLookup table in your example is not really
shared between your operator instances in a distributed setting. When
serializing your transformations the current state of the HashMap is
serialized with them because it is in the closure of the transformations.
Then, on the cluster, the HashMap is serialized and all parallel instances
basically work on their now local copy of the empty HashMap.

Cheers,
Aljoscha

On Thu, 7 Apr 2016 at 18:30 Stefano Baghino <st...@radicalbit.io>
wrote:

> Hi Bart,
>
> to make sure that the state is checkpointed you have to:
>
>    1. configure your Flink installation with a reliable state backend
>    (optional for development, you can read more about it here
>    <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html#configuration>
>    )
>    2. explicitly enable checkpointing in your program (see how here
>    <https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/fault_tolerance.html>
>    — it's just a couple of lines of code)
>    3. extend your operators so that they checkpoint data, by implementing
>    the `Checkpointed` interface or using an instance field (the semantics of
>    the two approaches are slightly different, you can read more about it
>    here
>    <https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html>
>    )
>
> When your data is checkpointed you can access the state if your operator
> implements the `RichFunction` interface (via an abstract class that wraps
> the operator you need to implement, like `RichMapFunction`).
>
> For your need in particular, I don't know a way to checkpoint state shared
> between different operators; perhaps you can you refactor your code so that
> the state is encapsulated in an operator implementation and then moved
> through your pipeline as a parameter of the following operators. Would that
> work?
>
> I apologize for just providing pointers to the docs in my reply but
> checkpointing deserves a good explanation and I feel the docs get the job
> done pretty well. I will gladly help you if you have any doubt.
>
> Hope I've been of some help.
>
> On Thu, Apr 7, 2016 at 3:41 PM, Bart van Deenen <bartvandeenen@fastmail.fm
> > wrote:
>
>> Hi all
>>
>> I'm having a datastream transformation, that updates a mutable
>> hashmap that exists outside of the stream.
>>
>> So it's something like
>>
>> object FlinkJob {
>>   val uriLookup = mutable.HashMap.empty[String, Int]
>>
>>
>>   def main(args: Array[String]) {
>>
>>     val stream: DataStream = ...
>>
>>     stream.keybBy(1).timeWindow(..).fold(..)
>>     .window(..)
>>     .map(..).fold(..)
>>     .addSink(..)
>>   }
>> }
>>
>> where the uriLookup hashmap gets updated inside the stream
>> transformation,
>> and is serialized in the step before the addSink
>>
>> It works fine, however
>>
>> Does the snapshotting mechanism in case of a node failure actually
>> serialize this map?
>>
>> And out of curiousity, can I actually see what data exists inside the
>> snapshot data?
>>
>> Thanks.
>>
>> Bart
>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>

Re: mutable hashmap outside of stream, does it get snapshotted ?

Posted by Stefano Baghino <st...@radicalbit.io>.
Hi Bart,

to make sure that the state is checkpointed you have to:

   1. configure your Flink installation with a reliable state backend
   (optional for development, you can read more about it here
   <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html#configuration>
   )
   2. explicitly enable checkpointing in your program (see how here
   <https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/fault_tolerance.html>
   — it's just a couple of lines of code)
   3. extend your operators so that they checkpoint data, by implementing
   the `Checkpointed` interface or using an instance field (the semantics of
   the two approaches are slightly different, you can read more about it
   here
   <https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html>
   )

When your data is checkpointed you can access the state if your operator
implements the `RichFunction` interface (via an abstract class that wraps
the operator you need to implement, like `RichMapFunction`).

For your need in particular, I don't know a way to checkpoint state shared
between different operators; perhaps you can you refactor your code so that
the state is encapsulated in an operator implementation and then moved
through your pipeline as a parameter of the following operators. Would that
work?

I apologize for just providing pointers to the docs in my reply but
checkpointing deserves a good explanation and I feel the docs get the job
done pretty well. I will gladly help you if you have any doubt.

Hope I've been of some help.

On Thu, Apr 7, 2016 at 3:41 PM, Bart van Deenen <ba...@fastmail.fm>
wrote:

> Hi all
>
> I'm having a datastream transformation, that updates a mutable
> hashmap that exists outside of the stream.
>
> So it's something like
>
> object FlinkJob {
>   val uriLookup = mutable.HashMap.empty[String, Int]
>
>
>   def main(args: Array[String]) {
>
>     val stream: DataStream = ...
>
>     stream.keybBy(1).timeWindow(..).fold(..)
>     .window(..)
>     .map(..).fold(..)
>     .addSink(..)
>   }
> }
>
> where the uriLookup hashmap gets updated inside the stream
> transformation,
> and is serialized in the step before the addSink
>
> It works fine, however
>
> Does the snapshotting mechanism in case of a node failure actually
> serialize this map?
>
> And out of curiousity, can I actually see what data exists inside the
> snapshot data?
>
> Thanks.
>
> Bart
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit