You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Hermann Gábor <re...@gmail.com> on 2014/07/01 16:49:12 UTC

Moving streaming to 0.6

Hey,

We are trying to move the streaming code from the 0.5 release to 0.6, and
we've run into a problem. We extended AbstractInputTask, AbstractTask and
AbstractOutputTask classes to implement our components (StreamSource,
StreamTask, StreamSink), and it seems like they are replaced by
DataSourceTask, RegularPactTask and DataSinkTask respectively, so we
replaced them in our code too. The problem is that the RecordWriter's
numChannels is not set because the RecordWriter's initializeSerializers()
method does not get called. Should we call this manually somewhere? Also, I
don't know whether we should use DataSourceTask and the others or extend
AbstractInvokable and implement our own classes similar to
AbstractInputTask, AbstractTask and AbstractOutputTask. Can you please help
us with this? Thanks!

Regards,
Gábor

Re: Moving streaming to 0.6

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Thanks, Ufuk, for answering this. It's correct, you can now use
"AbstractInvokable" everywhere, no need to distinguish the task classes
between inputs, intermediates, and outputs.

Let me add the following: I am reworking the abstraction for Job Graphs and
scheduling right now. It will make all vertices the same (no distinction
between input / output / intermediate), which is also important for the new
incremental roll-out feature that will come.

Stephan

Re: Moving streaming to 0.6

Posted by Gyula Fóra <gy...@gmail.com>.
Hey,

Thank you!
We could fix it by calling the  initializeSerializers() in the invoke
method.

Regards,
Gyula






On Wed, Jul 2, 2014 at 12:20 AM, Ufuk Celebi <u....@fu-berlin.de> wrote:

> On 01 Jul 2014, at 16:49, Hermann Gábor <re...@gmail.com> wrote:
>
> > Hey,
> >
> > We are trying to move the streaming code from the 0.5 release to 0.6, and
> > we've run into a problem. We extended AbstractInputTask, AbstractTask and
> > AbstractOutputTask classes to implement our components (StreamSource,
> > StreamTask, StreamSink), and it seems like they are replaced by
> > DataSourceTask, RegularPactTask and DataSinkTask respectively, so we
> > replaced them in our code too.
>
> The execution engine's task hierarchy has been simplified with
> 8c1d82a8ec674de6525319501c6be2674e3143f1 [1]. It does not differentiate
> between input and output tasks any more. I think the respective input and
> output task logic (e.g for input splits) has moved to the classes you
> mentioned, but I'm not sure whether you really should subclass these if you
> only need a small subset of their functionality.
>
> At least for the previous AbstractTask you could just extend
> AbstractInvokable, because it was just subclassing AbstractInvokable before.
>
> I'd say wait for Stephan's take on this. ;-)
>
> [1]
> https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1
>
>
> > The problem is that the RecordWriter's
> > numChannels is not set because the RecordWriter's initializeSerializers()
> > method does not get called. Should we call this manually somewhere?
>
> Yes. This is needed to work around a flaw in the way that the runtime is
> instantiated, which is going to be refactored soon.
>
> The RecordWriter creates the OutputGate, but the RuntimeEnvironment
> initializes the channels of the output gates at a later point. That's why
> it is not known at construction time how many channels are attached to a
> Gate and consequently how many serializers are needed for the RecordWriter.
> The initializeSerializers method is a work around for that.
>
> This could also be refactored stand-alone, but will be subsumed by the
> upcoming runtime changes for the intermediate data set partitions
> (FLINK-986). If it is important to you, I could also do a quick fix for it
> now.
>
> [2]
> https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1
>
>

Re: Moving streaming to 0.6

Posted by Ufuk Celebi <u....@fu-berlin.de>.
On 01 Jul 2014, at 16:49, Hermann Gábor <re...@gmail.com> wrote:

> Hey,
> 
> We are trying to move the streaming code from the 0.5 release to 0.6, and
> we've run into a problem. We extended AbstractInputTask, AbstractTask and
> AbstractOutputTask classes to implement our components (StreamSource,
> StreamTask, StreamSink), and it seems like they are replaced by
> DataSourceTask, RegularPactTask and DataSinkTask respectively, so we
> replaced them in our code too.

The execution engine's task hierarchy has been simplified with 8c1d82a8ec674de6525319501c6be2674e3143f1 [1]. It does not differentiate between input and output tasks any more. I think the respective input and output task logic (e.g for input splits) has moved to the classes you mentioned, but I'm not sure whether you really should subclass these if you only need a small subset of their functionality.

At least for the previous AbstractTask you could just extend AbstractInvokable, because it was just subclassing AbstractInvokable before.

I'd say wait for Stephan's take on this. ;-)

[1] https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1


> The problem is that the RecordWriter's
> numChannels is not set because the RecordWriter's initializeSerializers()
> method does not get called. Should we call this manually somewhere?

Yes. This is needed to work around a flaw in the way that the runtime is instantiated, which is going to be refactored soon.

The RecordWriter creates the OutputGate, but the RuntimeEnvironment initializes the channels of the output gates at a later point. That's why it is not known at construction time how many channels are attached to a Gate and consequently how many serializers are needed for the RecordWriter. The initializeSerializers method is a work around for that.

This could also be refactored stand-alone, but will be subsumed by the upcoming runtime changes for the intermediate data set partitions (FLINK-986). If it is important to you, I could also do a quick fix for it now.

[2] https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1