You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2015/06/01 01:17:45 UTC

Re: [DISCUSS] Streaming Sources (again)

I am by now also in favor of the locking variant.

It pains me to say that, because I was the one pushing so heavily for the
other variant ( reachedEnd() / next() ) in the first place. The concept and
design of that is nice, but the reality seems to speak against it.

Writing code that is properly interruptible where possible (and
non-interruptible where necessary) is a hard thing to get right. And most
I/O code that is written without that in mind will almost certainly have
leaks when interrupted frequently. Not even our prime source (Kafka) seems
to handle that correctly, let alone the amount of errors they would log on
each checkpoint.

>From the experiences Aljoscha and me gathered in the past days (debugging
and testing the exactly once guarantees), it seems that it is just not
realistic to expect, so

On the upside: I ran a few ad-hoc minibenchmarks on my laptop just now.
Uncontended locks are really cheap in Java by now.
Also, if we consolidate the lock for the checkpointing with that of the
buffer-flushing, we eliminate any overhead.

Stephan



On Sun, May 31, 2015 at 3:57 PM, Ufuk Celebi <uc...@apache.org> wrote:

> +1 to the locking interface for the release. I agree with what Marton that
> it seems that interruptability is to much of a burden on the sources.
>
> The code docs should be very clear (and concise (!)) about why the locking
> is needed etc.
>
> – Ufuk
>
> On 31 May 2015, at 14:52, Gyula Fóra <gy...@gmail.com> wrote:
>
> > Alright, let's do the locking then :)
> >
> > Let's keep only one interface for the release.
> >
> > On Sun, May 31, 2015 at 12:58 PM, Márton Balassi <
> balassi.marton@gmail.com>
> > wrote:
> >
> >> I am also for having only one source interface. It seems that
> >> interruptability is to much of a burden on the sources, locking version
> >> should be still acceptable from the user point of view. We are dealing
> with
> >> inherently concurrent tasks, I suppose our users are familiar with
> locking
> >> - especially the ones in need for exactly once processing.
> >>
> >> On Sat, May 30, 2015 at 2:44 AM, Aljoscha Krettek <al...@apache.org>
> >> wrote:
> >>
> >>> I would also prefer having only one source. The PR still has both
> >>> variants so that people can check them out.
> >>>
> >>> In my opinion the assumptions about interruptibility are easier to
> >>> break than the requirement of locking. Even if we get the kafka source
> >>> to work with the interruptions (which I doubt, because this fails
> >>> somewhere in their code) this would not guarantee that this will
> >>> always work in future versions. With the locking you either have the
> >>> locking, then it is correct (even for feature versions) or you don't,
> >>> then it is immediately incorrect.
> >>>
> >>> On Fri, May 29, 2015 at 10:56 PM, Gyula Fóra <gy...@gmail.com>
> >> wrote:
> >>>> Hey,
> >>>>
> >>>> It seems like both interfaces are pretty much capable of doing the
> same
> >>>> thing but work on slightly different assumptions.
> >>>>
> >>>> Isn't there a way that the kafka source can work with the
> >> interruptions?
> >>> I
> >>>> think the reachedEnd/next interface is slightly easier to grasp than
> >> the
> >>>> run() with the locks. But in any case I would slightly prefer having
> >> only
> >>>> one of them if they can technically do the same thing.
> >>>>
> >>>> Also adding a new interface means we add a new streamtask
> >> implementation
> >>>> which is also getting slightly too much.
> >>>>
> >>>> What is you opinion on this?
> >>>>
> >>>> Gyula
> >>>>
> >>>>
> >>>>
> >>>> On Fri, May 29, 2015 at 6:51 PM, Aljoscha Krettek <
> aljoscha@apache.org
> >>>
> >>>> wrote:
> >>>>
> >>>>> Hi All,
> >>>>> after finishing my pull request that should fix the problems with the
> >>>>> synchronisation of checkpoints and element emission (the reason for
> >>>>> the faulty results of the exactly-once tests) I discovered that the
> >>>>> Kafka source does not deal well with being interrupted. We recently
> >>>>> changed the SourceFunction to the reachedEnd()/next() interface, with
> >>>>> the contract that the source must be interruptible to be able to
> >>>>> perform checkpoints. Now this doesn't seem to work with Kafka. I
> added
> >>>>> another Source interface in my PR
> >>>>> (https://github.com/apache/flink/pull/742). This is similar to the
> >> old
> >>>>> interface of run()/cancel(), with the addition that the source must
> >>>>> acquire a lock before updating state and emitting elements. The
> update
> >>>>> of state and the emission of elements must happen in the same
> >>>>> synchronized block to ensure consistency. This seems to solve the
> >>>>> problem but now we have two source interfaces.
> >>>>>
> >>>>> The question is now. What do you think about the two interfaces?
> >>>>> Should we keep both? Remove one?
> >>>>>
> >>>>> Cheers,
> >>>>> Aljoscha
> >>>>>
> >>>
> >>
>
>