You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2015/05/01 17:39:40 UTC

Re: Making state in streaming more explicit

>From this discussion I derive that we will have a state abstraction that
everyone who requires state will work with? Or will the state be in object
fields and they will be saved upon invocation of some doBackup() method.
On Apr 30, 2015 10:31 PM, "Stephan Ewen" <se...@apache.org> wrote:

> That would be one way of doing it, yes...
>
> On Thu, Apr 30, 2015 at 10:23 PM, Gyula Fóra <gy...@gmail.com> wrote:
>
> > Okay, so the commit would be something like:
> >
> > commitState(OperatorState state)
> >
> >
> > On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > I think your assumption (and the current kafka source implementation)
> is
> > > that there is one state object that you update/mutate all the time.
> > >
> > > If you draw a snapshot state object at the time of checkpoint, the
> source
> > > can continue and that particular offset is remembered as the state of
> > this
> > > checkpoint
> > > and can be committed to kafka/zookeeper later.
> > >
> > > On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <gy...@gmail.com>
> > wrote:
> > >
> > > > Regarding the commits (for instance kafka offset):
> > > >
> > > > I dont exactly get how you mean to do this, if the source continues
> > > > processing after the checkpoint and before the commit, it will not
> know
> > > > what state has been committed exactly, so it would need to know the
> > time
> > > of
> > > > checkpoint and store a local copy.
> > > >
> > > > Gyula
> > > >
> > > >
> > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <se...@apache.org>
> > wrote:
> > > >
> > > > > Thanks for the comments!
> > > > >
> > > > > Concerning acknowledging the checkpoint:
> > > > >
> > > > >    The sinks need to definitely acknowledge it.
> > > > >    If we asynchronously write the state of operator (and emit
> > > downstream
> > > > > barriers before that is complete),
> > > > >    then I think that we also need those operators to acknowledge
> the
> > > > > checkpoint.
> > > > >
> > > > >
> > > > > For the commit messages:
> > > > >
> > > > >    My first thought was to send commit messages simply as actor
> > > messages
> > > > > from the JobManager
> > > > >    to the vertices that require these messages. That way, they are
> > not
> > > > > stuck in the data flow with its possible latency.
> > > > >    Also, in the data flow, messages get duplicated (at all to all
> > > > > connections).
> > > > >
> > > > >
> > > > > For iterative flows:
> > > > >
> > > > > Does the JobManager need to be aware of this, or can the
> > IterationHead
> > > > > handle that transparently for the JobManager.
> > > > > From our last conversation, I recall:
> > > > >  - Receive barriers, push out barriers
> > > > >  - snapshot its state
> > > > >  - wait for the barriers to come back through the backchannel
> > > > >  - write the state snapshot plus the backchannel buffers
> > > > >  - then only acknowledge the checkpoint
> > > > >
> > > > > My first impression is that this way the JobManager would not
> handle
> > > the
> > > > > IterationHead any different from all other stateful operators.
> > > > >
> > > > > Greetings,
> > > > > Stephan
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <pa...@kth.se>
> > wrote:
> > > > >
> > > > > > I agree with all suggestions, thanks for summing it up Stephan.
> > > > > >
> > > > > > A few more points I have in mind at the moment:
> > > > > >
> > > > > > - Regarding the acknowledgements, indeed we don’t need to make
> all
> > > > > > operators commit back, we just have to make sure that all sinks
> > have
> > > > > > acknowledged a checkpoint to consider it complete back at the
> > > > > coordinator.
> > > > > >
> > > > > > - Do you think we should broadcast commit responses to sources
> that
> > > > need
> > > > > > it after every successful checkpoint? The checkpoint interval
> does
> > > not
> > > > > > always match with the frequency we want to initiate a compaction
> > for
> > > > > > example on Kafka. One alternative would be to make sources
> request
> > a
> > > > > > successful checkpoint id via a future on demand.
> > > > > >
> > > > > > - We have to update the current checkpointing approach to cover
> > > > iterative
> > > > > > streams. We need to make sure we don’t send checkpoint requests
> to
> > > > > > iteration heads and handle downstream backup for records in
> transit
> > > > > during
> > > > > > checkpoints accordingly.
> > > > > >
> > > > > > cheers
> > > > > > Paris
> > > > > >
> > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <se...@apache.org>
> wrote:
> > > > > > >
> > > > > > > I was looking into the handling of state in streaming
> operators,
> > > and
> > > > it
> > > > > > is
> > > > > > > a bit hidden from the system
> > > > > > >
> > > > > > > Right now, functions can (of they want) put some state into
> their
> > > > > > context.
> > > > > > > At runtime, state may occur or not. Before runtime, the system
> > > cannot
> > > > > > tell
> > > > > > > which operators are going to be stateful, and which are going
> to
> > be
> > > > > > > stateless.
> > > > > > >
> > > > > > > I think it is a good idea to expose that. We can use that for
> > > > > > optimizations
> > > > > > > and we know which operators need to checkpoint state and
> > > acknowledge
> > > > > the
> > > > > > > asynchronous checkpoint.
> > > > > > >
> > > > > > > At this point, we need to assume that all operators need to
> send
> > a
> > > > > > > confirmation message, which is unnecessary.
> > > > > > >
> > > > > > > Also, I think we should expose which operations want a "commit"
> > > > > > > notification after the checkpoint completed. Good examples are
> > > > > > >
> > > > > > >  - the KafkaConsumer source, which can then commit the offset
> > that
> > > is
> > > > > > safe
> > > > > > > to zookeeper
> > > > > > >
> > > > > > >  - a transactional KafkaProduce sink, which can commit a batch
> of
> > > > > > messages
> > > > > > > to the kafka partition once the checkpoint is done (to get
> > exactly
> > > > once
> > > > > > > guarantees that include the sink)
> > > > > > >
> > > > > > > Comments welcome!
> > > > > > >
> > > > > > > Greetings,
> > > > > > > Stephan
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Making state in streaming more explicit

Posted by Márton Balassi <ba...@gmail.com>.
The current aim is the first option as you have correctly derived. :)
On May 1, 2015 5:39 PM, "Aljoscha Krettek" <al...@apache.org> wrote:

> From this discussion I derive that we will have a state abstraction that
> everyone who requires state will work with? Or will the state be in object
> fields and they will be saved upon invocation of some doBackup() method.
> On Apr 30, 2015 10:31 PM, "Stephan Ewen" <se...@apache.org> wrote:
>
> > That would be one way of doing it, yes...
> >
> > On Thu, Apr 30, 2015 at 10:23 PM, Gyula Fóra <gy...@gmail.com>
> wrote:
> >
> > > Okay, so the commit would be something like:
> > >
> > > commitState(OperatorState state)
> > >
> > >
> > > On Thu, Apr 30, 2015 at 10:17 PM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > I think your assumption (and the current kafka source implementation)
> > is
> > > > that there is one state object that you update/mutate all the time.
> > > >
> > > > If you draw a snapshot state object at the time of checkpoint, the
> > source
> > > > can continue and that particular offset is remembered as the state of
> > > this
> > > > checkpoint
> > > > and can be committed to kafka/zookeeper later.
> > > >
> > > > On Thu, Apr 30, 2015 at 10:09 PM, Gyula Fóra <gy...@gmail.com>
> > > wrote:
> > > >
> > > > > Regarding the commits (for instance kafka offset):
> > > > >
> > > > > I dont exactly get how you mean to do this, if the source continues
> > > > > processing after the checkpoint and before the commit, it will not
> > know
> > > > > what state has been committed exactly, so it would need to know the
> > > time
> > > > of
> > > > > checkpoint and store a local copy.
> > > > >
> > > > > Gyula
> > > > >
> > > > >
> > > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen <se...@apache.org>
> > > wrote:
> > > > >
> > > > > > Thanks for the comments!
> > > > > >
> > > > > > Concerning acknowledging the checkpoint:
> > > > > >
> > > > > >    The sinks need to definitely acknowledge it.
> > > > > >    If we asynchronously write the state of operator (and emit
> > > > downstream
> > > > > > barriers before that is complete),
> > > > > >    then I think that we also need those operators to acknowledge
> > the
> > > > > > checkpoint.
> > > > > >
> > > > > >
> > > > > > For the commit messages:
> > > > > >
> > > > > >    My first thought was to send commit messages simply as actor
> > > > messages
> > > > > > from the JobManager
> > > > > >    to the vertices that require these messages. That way, they
> are
> > > not
> > > > > > stuck in the data flow with its possible latency.
> > > > > >    Also, in the data flow, messages get duplicated (at all to all
> > > > > > connections).
> > > > > >
> > > > > >
> > > > > > For iterative flows:
> > > > > >
> > > > > > Does the JobManager need to be aware of this, or can the
> > > IterationHead
> > > > > > handle that transparently for the JobManager.
> > > > > > From our last conversation, I recall:
> > > > > >  - Receive barriers, push out barriers
> > > > > >  - snapshot its state
> > > > > >  - wait for the barriers to come back through the backchannel
> > > > > >  - write the state snapshot plus the backchannel buffers
> > > > > >  - then only acknowledge the checkpoint
> > > > > >
> > > > > > My first impression is that this way the JobManager would not
> > handle
> > > > the
> > > > > > IterationHead any different from all other stateful operators.
> > > > > >
> > > > > > Greetings,
> > > > > > Stephan
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone <pa...@kth.se>
> > > wrote:
> > > > > >
> > > > > > > I agree with all suggestions, thanks for summing it up Stephan.
> > > > > > >
> > > > > > > A few more points I have in mind at the moment:
> > > > > > >
> > > > > > > - Regarding the acknowledgements, indeed we don’t need to make
> > all
> > > > > > > operators commit back, we just have to make sure that all sinks
> > > have
> > > > > > > acknowledged a checkpoint to consider it complete back at the
> > > > > > coordinator.
> > > > > > >
> > > > > > > - Do you think we should broadcast commit responses to sources
> > that
> > > > > need
> > > > > > > it after every successful checkpoint? The checkpoint interval
> > does
> > > > not
> > > > > > > always match with the frequency we want to initiate a
> compaction
> > > for
> > > > > > > example on Kafka. One alternative would be to make sources
> > request
> > > a
> > > > > > > successful checkpoint id via a future on demand.
> > > > > > >
> > > > > > > - We have to update the current checkpointing approach to cover
> > > > > iterative
> > > > > > > streams. We need to make sure we don’t send checkpoint requests
> > to
> > > > > > > iteration heads and handle downstream backup for records in
> > transit
> > > > > > during
> > > > > > > checkpoints accordingly.
> > > > > > >
> > > > > > > cheers
> > > > > > > Paris
> > > > > > >
> > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen <se...@apache.org>
> > wrote:
> > > > > > > >
> > > > > > > > I was looking into the handling of state in streaming
> > operators,
> > > > and
> > > > > it
> > > > > > > is
> > > > > > > > a bit hidden from the system
> > > > > > > >
> > > > > > > > Right now, functions can (of they want) put some state into
> > their
> > > > > > > context.
> > > > > > > > At runtime, state may occur or not. Before runtime, the
> system
> > > > cannot
> > > > > > > tell
> > > > > > > > which operators are going to be stateful, and which are going
> > to
> > > be
> > > > > > > > stateless.
> > > > > > > >
> > > > > > > > I think it is a good idea to expose that. We can use that for
> > > > > > > optimizations
> > > > > > > > and we know which operators need to checkpoint state and
> > > > acknowledge
> > > > > > the
> > > > > > > > asynchronous checkpoint.
> > > > > > > >
> > > > > > > > At this point, we need to assume that all operators need to
> > send
> > > a
> > > > > > > > confirmation message, which is unnecessary.
> > > > > > > >
> > > > > > > > Also, I think we should expose which operations want a
> "commit"
> > > > > > > > notification after the checkpoint completed. Good examples
> are
> > > > > > > >
> > > > > > > >  - the KafkaConsumer source, which can then commit the offset
> > > that
> > > > is
> > > > > > > safe
> > > > > > > > to zookeeper
> > > > > > > >
> > > > > > > >  - a transactional KafkaProduce sink, which can commit a
> batch
> > of
> > > > > > > messages
> > > > > > > > to the kafka partition once the checkpoint is done (to get
> > > exactly
> > > > > once
> > > > > > > > guarantees that include the sink)
> > > > > > > >
> > > > > > > > Comments welcome!
> > > > > > > >
> > > > > > > > Greetings,
> > > > > > > > Stephan
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>