You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Hermann Gábor <re...@gmail.com> on 2015/02/06 17:13:58 UTC

Streaming fault tolerance with event sending

Hey,

We've been implementing a simple Storm-like fault tolerance system
with persisting source records and keeping track of all the records
(whether they've been processed), and replaying them if they fail. The
straightforward way to do this was creating a special AbstractJobVertex
(and AbstractInvokable) that deals with replaying and keeps track of the
records.

So there is a so called FTLayerVertex that needs a two-way
communication with the tasks:

   1. it needs to be notified about a processed record or a newly created
   record
   2. it needs to resend failed records to the appropriate tasks

We've decided to use events for 1. and regular message sending (i.e.
BufferReader/Writer) for 2. So we need to send events backward on a
BufferReader.

The problem is that backward events are only sent and received when
the channel is initialized (in case of one channel for now), there's a
living
connection (i.e. it's not UNKNOWN, please correct me if I'm wrong). But
sometimes the connection is not yet initialized when we're sending these
events from the tasks to the FTLayerVertex.

We've tried a workaround: sending an "initializer" forward event from FT
to all the tasks, and waiting for this event at the beginning of every
task's
invoke, thus the backward events will only be sent when the channel is
surely up. That did not work out, because in order to receive an event,
we need to read that input, and we only want to read and process that
input after we made sure we can use the channel.

Another workaround were sending an "initializer" record instead of an
event. That did not work out either, because we wanted to read that
input (from the FTLayerVertex) as a regular input, so the given
BufferReader were also passed to a UnionBufferReader, and reading
the given BufferReader separately (wrapped in a RecordReader)
resulted in the UnionBufferReader getting stuck. (It seemed as the
UnionBufferReader put the given BufferReader in it's queue because of
this "initializer" message, but the message had been already read from
the BufferReader separately, so it got stuck waiting for a new record
from the BufferReader).

These workarounds don't seem natural either, because we're pulling this
"synchronization" of connection initialization a layer up. It would be nice
if we had some kind of events that are made sure to get to the
destination. Is that possible? Would that be useful for other purposes too?

Does anyone know some workaround that might work? Are we missing
something? (Please correct me if I see something wrong.) Should we use
something else instead of events? (I think we cannot avoid this two-way
communication between vertices.) All suggestions are welcome!

Best regards,
Gabor