You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nick Dimiduk <nd...@apache.org> on 2016/02/08 07:14:28 UTC

OutputFormat vs SinkFunction

Heya,

Is there a plan to consolidate these two interfaces? They appear to provide
identical functionality, differing only in lifecycle management. I found
myself writing an adaptor so I can consume an OutputFormat where a
SinkFunction is expected; there's not much to it. This seems like code that
Flink should ship.

Maybe one interface or the other can be deprecated for 1.0 API?

Thanks,
Nick

Re: OutputFormat vs SinkFunction

Posted by Nick Dimiduk <nd...@apache.org>.
I think managing a lifecycle around the existing MR OutputFormat's makes a
lot of sense for the streaming environment. Having them unified in the
Flink Streaming API will make users' lives much better, and keeps the
streaming world open to the large existing ecosystem.

On Tue, Feb 9, 2016 at 6:13 AM, Stephan Ewen <se...@apache.org> wrote:

> Most of the problems we had with OutputFormats is that many were
> implemented in a batchy way:
>   - They buffer data and write large chunks at some points
>   - They need the "close()" call before any consistent result is guaranteed
>
> That is mostly the case for FileOutputFormats, but not exclusively (some
> DB OutputFormats also buffer and batch-commit in some intervals).
>
> In general, it should be simply possible to unify the two by adding a
> "sync()" or "ensurePersistent()" call to the OutputFormats. That method
> could be called upon checkpoints, or periodically, to ensure persistence
> and result visibility.
>
> The initial idea behind having SinkFunctions was to not interfere with the
> batch code and not break things (by adding new abstract methods).
> What we could do, however, is to allow OutputFormats to implement an
> interface like "Streamable" which would add the above mentioned methods and
> make the OutputFormat safe for streaming.
> We could then bit by bit add that interface to the existing output formats.
>
>
> Any thoughts on that?
>
> Greetings,
> Stephan
>
>
>
> On Tue, Feb 9, 2016 at 10:23 AM, Maximilian Michels <mx...@apache.org>
> wrote:
>
>> I think you have a point, Nick. OutputFormats on its own have the same
>> fault-tolerance semantics as SinkFunctions. What kind of failure semantics
>> they guarantee depends on the actual implementation. For instance, the
>> RMQSource has exactly-once semantics but the RMQSink currently does not. If
>> you care about exactly-once semantics, you have to look into the
>> documentation and use the sources and sinks accordingly. It is not like
>> OutputFormats are dangerous but all SinkFunctions are failure-proof.
>>
>> Consolidating the two interfaces would make sense. It might be a bit late
>> for the 1.0 release because I see that we would need to find a consensus
>> first and there are many things in the backlog :)
>>
>> On Tue, Feb 9, 2016 at 3:20 AM, Nick Dimiduk <nd...@apache.org> wrote:
>>
>>> I think this depends on the implementation of the OutputFormat. For
>>> instance, an HBase, Cassandra or ES OF will handle most operations as
>>> idempotent when the scheme is designed appropriately.
>>>
>>> You are (rightly) focusing on FileOF's, which also depend on the
>>> semantics of their implementation. MR always required an atomic rename of
>>> the DFS, and only moved output files into place once the task commits its
>>> output.
>>>
>>> Also I think it unreasonable to bring exactly once considerations into
>>> the discussion because nothing provides this right now without a
>>> multi-stage commit protocol. Such a protocol would be provided at the
>>> framework level and to the best of my knowledge it's semantic expectations
>>> on the output handler are undefined.
>>>
>>> My original question comes from wanting to use the LocalCollectionOF to
>>> test a streaming flow that sinks to Kafka, without rewriting the flow in
>>> test code. So in this case you're right that it does apply to tests. I
>>> don't think correctness of tests is a trivial concern though.
>>>
>>> As for RollingFileSink, I've not seen this conversation so I cannot
>>> comment. Per my earlier examples, I think it's not correct to assume all OF
>>> implementations are file-based.
>>>
>>>
>>> On Monday, February 8, 2016, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> one problem that I see with OutputFormat is that they are not made for
>>>> a streaming world. By this, I mean that they don’t handle failure well and
>>>> don’t consider fault-torelant streaming, i.e. exactly once semantics. For
>>>> example, what would be expected to happen if a job with a FileOutputFormat
>>>> fails and needs to recover. Now, there might be some garbage left in the
>>>> files that would get emitted again after restoring to a checkpoint, thus
>>>> leading to duplicate results.
>>>>
>>>> Having OutputFormats in a Streaming programs can work well in toy
>>>> examples and tests but can be dangerous in real-world jobs. I once talked
>>>> with Robert about this and we came up with the idea (I think it was mostly
>>>> him) of generalizing the RollingFileSink (which is fault-tolerance aware)
>>>> so that it can easily be used with something akin to OutputFormats.
>>>>
>>>> What do you think?
>>>>
>>>> -Aljoscha
>>>> > On 08 Feb 2016, at 19:40, Nick Dimiduk <nd...@apache.org> wrote:
>>>> >
>>>> > On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>> > Changing the class hierarchy would break backwards-compatibility of
>>>> the API. However, we could add another method to DataStream to easily use
>>>> OutputFormats in streaming.
>>>> >
>>>> > Indeed, that's why I suggested deprecating one and moving toward a
>>>> consolidated class hierarchy. It won't happen overnight, but this can be
>>>> managed pretty easily with some adapter code like this and some additional
>>>> overrides in the public APIs.
>>>> >
>>>> > How did you write your adapter? I came up with the one below.
>>>> >
>>>> > Our implementations are similar. This one is working fine with my
>>>> test code.
>>>> >
>>>> > https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3
>>>> >
>>>> > On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <nd...@apache.org>
>>>> wrote:
>>>> > In my case, I have my application code that is calling addSink, for
>>>> which I'm writing a test that needs to use LocalCollectionOutputFormat.
>>>> Having two separate class hierarchies is not helpful, hence the adapter.
>>>> Much of this code already exists in the implementation of FileSinkFunction,
>>>> so the project already supports it in a limited way.
>>>> >
>>>> > On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>> > Hi Nick,
>>>> >
>>>> > SinkFunction just implements user-defined functions on incoming
>>>> > elements. OutputFormat offers more lifecycle methods. Thus it is a
>>>> > more powerful interface. The OutputFormat originally comes from the
>>>> > batch API, whereas the SinkFunction originates from streaming. Those
>>>> > were more separate code paths in the past. Ultimately, it would make
>>>> > sense to have only the OutputFormat interface but I think we have to
>>>> > keep it to not break the API.
>>>> >
>>>> > If you need the lifecycle methods in streaming, there is
>>>> > RichSinkFunction, which implements OutputFormat and SinkFunction. In
>>>> > addition, it gives you access to the RuntimeContext. You can pass this
>>>> > directly to the "addSink(sinkFunction)" API method.
>>>> >
>>>> > Cheers,
>>>> > Max
>>>> >
>>>> > On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <nd...@apache.org>
>>>> wrote:
>>>> > > Heya,
>>>> > >
>>>> > > Is there a plan to consolidate these two interfaces? They appear to
>>>> provide
>>>> > > identical functionality, differing only in lifecycle management. I
>>>> found
>>>> > > myself writing an adaptor so I can consume an OutputFormat where a
>>>> > > SinkFunction is expected; there's not much to it. This seems like
>>>> code that
>>>> > > Flink should ship.
>>>> > >
>>>> > > Maybe one interface or the other can be deprecated for 1.0 API?
>>>> > >
>>>> > > Thanks,
>>>> > > Nick
>>>> >
>>>> >
>>>> >
>>>>
>>>>
>>
>

Re: OutputFormat vs SinkFunction

Posted by Stephan Ewen <se...@apache.org>.
Most of the problems we had with OutputFormats is that many were
implemented in a batchy way:
  - They buffer data and write large chunks at some points
  - They need the "close()" call before any consistent result is guaranteed

That is mostly the case for FileOutputFormats, but not exclusively (some DB
OutputFormats also buffer and batch-commit in some intervals).

In general, it should be simply possible to unify the two by adding a
"sync()" or "ensurePersistent()" call to the OutputFormats. That method
could be called upon checkpoints, or periodically, to ensure persistence
and result visibility.

The initial idea behind having SinkFunctions was to not interfere with the
batch code and not break things (by adding new abstract methods).
What we could do, however, is to allow OutputFormats to implement an
interface like "Streamable" which would add the above mentioned methods and
make the OutputFormat safe for streaming.
We could then bit by bit add that interface to the existing output formats.


Any thoughts on that?

Greetings,
Stephan



On Tue, Feb 9, 2016 at 10:23 AM, Maximilian Michels <mx...@apache.org> wrote:

> I think you have a point, Nick. OutputFormats on its own have the same
> fault-tolerance semantics as SinkFunctions. What kind of failure semantics
> they guarantee depends on the actual implementation. For instance, the
> RMQSource has exactly-once semantics but the RMQSink currently does not. If
> you care about exactly-once semantics, you have to look into the
> documentation and use the sources and sinks accordingly. It is not like
> OutputFormats are dangerous but all SinkFunctions are failure-proof.
>
> Consolidating the two interfaces would make sense. It might be a bit late
> for the 1.0 release because I see that we would need to find a consensus
> first and there are many things in the backlog :)
>
> On Tue, Feb 9, 2016 at 3:20 AM, Nick Dimiduk <nd...@apache.org> wrote:
>
>> I think this depends on the implementation of the OutputFormat. For
>> instance, an HBase, Cassandra or ES OF will handle most operations as
>> idempotent when the scheme is designed appropriately.
>>
>> You are (rightly) focusing on FileOF's, which also depend on the
>> semantics of their implementation. MR always required an atomic rename of
>> the DFS, and only moved output files into place once the task commits its
>> output.
>>
>> Also I think it unreasonable to bring exactly once considerations into
>> the discussion because nothing provides this right now without a
>> multi-stage commit protocol. Such a protocol would be provided at the
>> framework level and to the best of my knowledge it's semantic expectations
>> on the output handler are undefined.
>>
>> My original question comes from wanting to use the LocalCollectionOF to
>> test a streaming flow that sinks to Kafka, without rewriting the flow in
>> test code. So in this case you're right that it does apply to tests. I
>> don't think correctness of tests is a trivial concern though.
>>
>> As for RollingFileSink, I've not seen this conversation so I cannot
>> comment. Per my earlier examples, I think it's not correct to assume all OF
>> implementations are file-based.
>>
>>
>> On Monday, February 8, 2016, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> one problem that I see with OutputFormat is that they are not made for a
>>> streaming world. By this, I mean that they don’t handle failure well and
>>> don’t consider fault-torelant streaming, i.e. exactly once semantics. For
>>> example, what would be expected to happen if a job with a FileOutputFormat
>>> fails and needs to recover. Now, there might be some garbage left in the
>>> files that would get emitted again after restoring to a checkpoint, thus
>>> leading to duplicate results.
>>>
>>> Having OutputFormats in a Streaming programs can work well in toy
>>> examples and tests but can be dangerous in real-world jobs. I once talked
>>> with Robert about this and we came up with the idea (I think it was mostly
>>> him) of generalizing the RollingFileSink (which is fault-tolerance aware)
>>> so that it can easily be used with something akin to OutputFormats.
>>>
>>> What do you think?
>>>
>>> -Aljoscha
>>> > On 08 Feb 2016, at 19:40, Nick Dimiduk <nd...@apache.org> wrote:
>>> >
>>> > On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels <mx...@apache.org>
>>> wrote:
>>> > Changing the class hierarchy would break backwards-compatibility of
>>> the API. However, we could add another method to DataStream to easily use
>>> OutputFormats in streaming.
>>> >
>>> > Indeed, that's why I suggested deprecating one and moving toward a
>>> consolidated class hierarchy. It won't happen overnight, but this can be
>>> managed pretty easily with some adapter code like this and some additional
>>> overrides in the public APIs.
>>> >
>>> > How did you write your adapter? I came up with the one below.
>>> >
>>> > Our implementations are similar. This one is working fine with my test
>>> code.
>>> >
>>> > https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3
>>> >
>>> > On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <nd...@apache.org>
>>> wrote:
>>> > In my case, I have my application code that is calling addSink, for
>>> which I'm writing a test that needs to use LocalCollectionOutputFormat.
>>> Having two separate class hierarchies is not helpful, hence the adapter.
>>> Much of this code already exists in the implementation of FileSinkFunction,
>>> so the project already supports it in a limited way.
>>> >
>>> > On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <mx...@apache.org>
>>> wrote:
>>> > Hi Nick,
>>> >
>>> > SinkFunction just implements user-defined functions on incoming
>>> > elements. OutputFormat offers more lifecycle methods. Thus it is a
>>> > more powerful interface. The OutputFormat originally comes from the
>>> > batch API, whereas the SinkFunction originates from streaming. Those
>>> > were more separate code paths in the past. Ultimately, it would make
>>> > sense to have only the OutputFormat interface but I think we have to
>>> > keep it to not break the API.
>>> >
>>> > If you need the lifecycle methods in streaming, there is
>>> > RichSinkFunction, which implements OutputFormat and SinkFunction. In
>>> > addition, it gives you access to the RuntimeContext. You can pass this
>>> > directly to the "addSink(sinkFunction)" API method.
>>> >
>>> > Cheers,
>>> > Max
>>> >
>>> > On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <nd...@apache.org>
>>> wrote:
>>> > > Heya,
>>> > >
>>> > > Is there a plan to consolidate these two interfaces? They appear to
>>> provide
>>> > > identical functionality, differing only in lifecycle management. I
>>> found
>>> > > myself writing an adaptor so I can consume an OutputFormat where a
>>> > > SinkFunction is expected; there's not much to it. This seems like
>>> code that
>>> > > Flink should ship.
>>> > >
>>> > > Maybe one interface or the other can be deprecated for 1.0 API?
>>> > >
>>> > > Thanks,
>>> > > Nick
>>> >
>>> >
>>> >
>>>
>>>
>

Re: OutputFormat vs SinkFunction

Posted by Maximilian Michels <mx...@apache.org>.
I think you have a point, Nick. OutputFormats on its own have the same
fault-tolerance semantics as SinkFunctions. What kind of failure semantics
they guarantee depends on the actual implementation. For instance, the
RMQSource has exactly-once semantics but the RMQSink currently does not. If
you care about exactly-once semantics, you have to look into the
documentation and use the sources and sinks accordingly. It is not like
OutputFormats are dangerous but all SinkFunctions are failure-proof.

Consolidating the two interfaces would make sense. It might be a bit late
for the 1.0 release because I see that we would need to find a consensus
first and there are many things in the backlog :)

On Tue, Feb 9, 2016 at 3:20 AM, Nick Dimiduk <nd...@apache.org> wrote:

> I think this depends on the implementation of the OutputFormat. For
> instance, an HBase, Cassandra or ES OF will handle most operations as
> idempotent when the scheme is designed appropriately.
>
> You are (rightly) focusing on FileOF's, which also depend on the semantics
> of their implementation. MR always required an atomic rename of the DFS,
> and only moved output files into place once the task commits its output.
>
> Also I think it unreasonable to bring exactly once considerations into the
> discussion because nothing provides this right now without a multi-stage
> commit protocol. Such a protocol would be provided at the framework level
> and to the best of my knowledge it's semantic expectations on the output
> handler are undefined.
>
> My original question comes from wanting to use the LocalCollectionOF to
> test a streaming flow that sinks to Kafka, without rewriting the flow in
> test code. So in this case you're right that it does apply to tests. I
> don't think correctness of tests is a trivial concern though.
>
> As for RollingFileSink, I've not seen this conversation so I cannot
> comment. Per my earlier examples, I think it's not correct to assume all OF
> implementations are file-based.
>
>
> On Monday, February 8, 2016, Aljoscha Krettek <al...@apache.org> wrote:
>
>> Hi,
>> one problem that I see with OutputFormat is that they are not made for a
>> streaming world. By this, I mean that they don’t handle failure well and
>> don’t consider fault-torelant streaming, i.e. exactly once semantics. For
>> example, what would be expected to happen if a job with a FileOutputFormat
>> fails and needs to recover. Now, there might be some garbage left in the
>> files that would get emitted again after restoring to a checkpoint, thus
>> leading to duplicate results.
>>
>> Having OutputFormats in a Streaming programs can work well in toy
>> examples and tests but can be dangerous in real-world jobs. I once talked
>> with Robert about this and we came up with the idea (I think it was mostly
>> him) of generalizing the RollingFileSink (which is fault-tolerance aware)
>> so that it can easily be used with something akin to OutputFormats.
>>
>> What do you think?
>>
>> -Aljoscha
>> > On 08 Feb 2016, at 19:40, Nick Dimiduk <nd...@apache.org> wrote:
>> >
>> > On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels <mx...@apache.org>
>> wrote:
>> > Changing the class hierarchy would break backwards-compatibility of the
>> API. However, we could add another method to DataStream to easily use
>> OutputFormats in streaming.
>> >
>> > Indeed, that's why I suggested deprecating one and moving toward a
>> consolidated class hierarchy. It won't happen overnight, but this can be
>> managed pretty easily with some adapter code like this and some additional
>> overrides in the public APIs.
>> >
>> > How did you write your adapter? I came up with the one below.
>> >
>> > Our implementations are similar. This one is working fine with my test
>> code.
>> >
>> > https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3
>> >
>> > On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <nd...@apache.org>
>> wrote:
>> > In my case, I have my application code that is calling addSink, for
>> which I'm writing a test that needs to use LocalCollectionOutputFormat.
>> Having two separate class hierarchies is not helpful, hence the adapter.
>> Much of this code already exists in the implementation of FileSinkFunction,
>> so the project already supports it in a limited way.
>> >
>> > On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <mx...@apache.org>
>> wrote:
>> > Hi Nick,
>> >
>> > SinkFunction just implements user-defined functions on incoming
>> > elements. OutputFormat offers more lifecycle methods. Thus it is a
>> > more powerful interface. The OutputFormat originally comes from the
>> > batch API, whereas the SinkFunction originates from streaming. Those
>> > were more separate code paths in the past. Ultimately, it would make
>> > sense to have only the OutputFormat interface but I think we have to
>> > keep it to not break the API.
>> >
>> > If you need the lifecycle methods in streaming, there is
>> > RichSinkFunction, which implements OutputFormat and SinkFunction. In
>> > addition, it gives you access to the RuntimeContext. You can pass this
>> > directly to the "addSink(sinkFunction)" API method.
>> >
>> > Cheers,
>> > Max
>> >
>> > On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <nd...@apache.org>
>> wrote:
>> > > Heya,
>> > >
>> > > Is there a plan to consolidate these two interfaces? They appear to
>> provide
>> > > identical functionality, differing only in lifecycle management. I
>> found
>> > > myself writing an adaptor so I can consume an OutputFormat where a
>> > > SinkFunction is expected; there's not much to it. This seems like
>> code that
>> > > Flink should ship.
>> > >
>> > > Maybe one interface or the other can be deprecated for 1.0 API?
>> > >
>> > > Thanks,
>> > > Nick
>> >
>> >
>> >
>>
>>

Re: OutputFormat vs SinkFunction

Posted by Nick Dimiduk <nd...@apache.org>.
I think this depends on the implementation of the OutputFormat. For
instance, an HBase, Cassandra or ES OF will handle most operations as
idempotent when the scheme is designed appropriately.

You are (rightly) focusing on FileOF's, which also depend on the semantics
of their implementation. MR always required an atomic rename of the DFS,
and only moved output files into place once the task commits its output.

Also I think it unreasonable to bring exactly once considerations into the
discussion because nothing provides this right now without a multi-stage
commit protocol. Such a protocol would be provided at the framework level
and to the best of my knowledge it's semantic expectations on the output
handler are undefined.

My original question comes from wanting to use the LocalCollectionOF to
test a streaming flow that sinks to Kafka, without rewriting the flow in
test code. So in this case you're right that it does apply to tests. I
don't think correctness of tests is a trivial concern though.

As for RollingFileSink, I've not seen this conversation so I cannot
comment. Per my earlier examples, I think it's not correct to assume all OF
implementations are file-based.

On Monday, February 8, 2016, Aljoscha Krettek <al...@apache.org> wrote:

> Hi,
> one problem that I see with OutputFormat is that they are not made for a
> streaming world. By this, I mean that they don’t handle failure well and
> don’t consider fault-torelant streaming, i.e. exactly once semantics. For
> example, what would be expected to happen if a job with a FileOutputFormat
> fails and needs to recover. Now, there might be some garbage left in the
> files that would get emitted again after restoring to a checkpoint, thus
> leading to duplicate results.
>
> Having OutputFormats in a Streaming programs can work well in toy examples
> and tests but can be dangerous in real-world jobs. I once talked with
> Robert about this and we came up with the idea (I think it was mostly him)
> of generalizing the RollingFileSink (which is fault-tolerance aware) so
> that it can easily be used with something akin to OutputFormats.
>
> What do you think?
>
> -Aljoscha
> > On 08 Feb 2016, at 19:40, Nick Dimiduk <ndimiduk@apache.org
> <javascript:;>> wrote:
> >
> > On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels <mxm@apache.org
> <javascript:;>> wrote:
> > Changing the class hierarchy would break backwards-compatibility of the
> API. However, we could add another method to DataStream to easily use
> OutputFormats in streaming.
> >
> > Indeed, that's why I suggested deprecating one and moving toward a
> consolidated class hierarchy. It won't happen overnight, but this can be
> managed pretty easily with some adapter code like this and some additional
> overrides in the public APIs.
> >
> > How did you write your adapter? I came up with the one below.
> >
> > Our implementations are similar. This one is working fine with my test
> code.
> >
> > https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3
> >
> > On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <ndimiduk@apache.org
> <javascript:;>> wrote:
> > In my case, I have my application code that is calling addSink, for
> which I'm writing a test that needs to use LocalCollectionOutputFormat.
> Having two separate class hierarchies is not helpful, hence the adapter.
> Much of this code already exists in the implementation of FileSinkFunction,
> so the project already supports it in a limited way.
> >
> > On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <mxm@apache.org
> <javascript:;>> wrote:
> > Hi Nick,
> >
> > SinkFunction just implements user-defined functions on incoming
> > elements. OutputFormat offers more lifecycle methods. Thus it is a
> > more powerful interface. The OutputFormat originally comes from the
> > batch API, whereas the SinkFunction originates from streaming. Those
> > were more separate code paths in the past. Ultimately, it would make
> > sense to have only the OutputFormat interface but I think we have to
> > keep it to not break the API.
> >
> > If you need the lifecycle methods in streaming, there is
> > RichSinkFunction, which implements OutputFormat and SinkFunction. In
> > addition, it gives you access to the RuntimeContext. You can pass this
> > directly to the "addSink(sinkFunction)" API method.
> >
> > Cheers,
> > Max
> >
> > On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <ndimiduk@apache.org
> <javascript:;>> wrote:
> > > Heya,
> > >
> > > Is there a plan to consolidate these two interfaces? They appear to
> provide
> > > identical functionality, differing only in lifecycle management. I
> found
> > > myself writing an adaptor so I can consume an OutputFormat where a
> > > SinkFunction is expected; there's not much to it. This seems like code
> that
> > > Flink should ship.
> > >
> > > Maybe one interface or the other can be deprecated for 1.0 API?
> > >
> > > Thanks,
> > > Nick
> >
> >
> >
>
>

Re: OutputFormat vs SinkFunction

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
one problem that I see with OutputFormat is that they are not made for a streaming world. By this, I mean that they don’t handle failure well and don’t consider fault-torelant streaming, i.e. exactly once semantics. For example, what would be expected to happen if a job with a FileOutputFormat fails and needs to recover. Now, there might be some garbage left in the files that would get emitted again after restoring to a checkpoint, thus leading to duplicate results.

Having OutputFormats in a Streaming programs can work well in toy examples and tests but can be dangerous in real-world jobs. I once talked with Robert about this and we came up with the idea (I think it was mostly him) of generalizing the RollingFileSink (which is fault-tolerance aware) so that it can easily be used with something akin to OutputFormats.

What do you think?

-Aljoscha
> On 08 Feb 2016, at 19:40, Nick Dimiduk <nd...@apache.org> wrote:
> 
> On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels <mx...@apache.org> wrote:
> Changing the class hierarchy would break backwards-compatibility of the API. However, we could add another method to DataStream to easily use OutputFormats in streaming.
> 
> Indeed, that's why I suggested deprecating one and moving toward a consolidated class hierarchy. It won't happen overnight, but this can be managed pretty easily with some adapter code like this and some additional overrides in the public APIs.
> 
> How did you write your adapter? I came up with the one below.
> 
> Our implementations are similar. This one is working fine with my test code.
> 
> https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3
> 
> On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <nd...@apache.org> wrote:
> In my case, I have my application code that is calling addSink, for which I'm writing a test that needs to use LocalCollectionOutputFormat. Having two separate class hierarchies is not helpful, hence the adapter. Much of this code already exists in the implementation of FileSinkFunction, so the project already supports it in a limited way.
> 
> On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <mx...@apache.org> wrote:
> Hi Nick,
> 
> SinkFunction just implements user-defined functions on incoming
> elements. OutputFormat offers more lifecycle methods. Thus it is a
> more powerful interface. The OutputFormat originally comes from the
> batch API, whereas the SinkFunction originates from streaming. Those
> were more separate code paths in the past. Ultimately, it would make
> sense to have only the OutputFormat interface but I think we have to
> keep it to not break the API.
> 
> If you need the lifecycle methods in streaming, there is
> RichSinkFunction, which implements OutputFormat and SinkFunction. In
> addition, it gives you access to the RuntimeContext. You can pass this
> directly to the "addSink(sinkFunction)" API method.
> 
> Cheers,
> Max
> 
> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <nd...@apache.org> wrote:
> > Heya,
> >
> > Is there a plan to consolidate these two interfaces? They appear to provide
> > identical functionality, differing only in lifecycle management. I found
> > myself writing an adaptor so I can consume an OutputFormat where a
> > SinkFunction is expected; there's not much to it. This seems like code that
> > Flink should ship.
> >
> > Maybe one interface or the other can be deprecated for 1.0 API?
> >
> > Thanks,
> > Nick
> 
> 
> 


Re: OutputFormat vs SinkFunction

Posted by Nick Dimiduk <nd...@apache.org>.
On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels <mx...@apache.org> wrote:

> Changing the class hierarchy would break backwards-compatibility of the
> API. However, we could add another method to DataStream to easily use
> OutputFormats in streaming.
>

Indeed, that's why I suggested deprecating one and moving toward a
consolidated class hierarchy. It won't happen overnight, but this can be
managed pretty easily with some adapter code like this and some additional
overrides in the public APIs.

How did you write your adapter? I came up with the one below.
>

Our implementations are similar. This one is working fine with my test code.

https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3

On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <nd...@apache.org> wrote:
>
>> In my case, I have my application code that is calling addSink, for which
>> I'm writing a test that needs to use LocalCollectionOutputFormat. Having
>> two separate class hierarchies is not helpful, hence the adapter. Much of
>> this code already exists in the implementation of FileSinkFunction, so the
>> project already supports it in a limited way.
>>
>> On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Hi Nick,
>>>
>>> SinkFunction just implements user-defined functions on incoming
>>> elements. OutputFormat offers more lifecycle methods. Thus it is a
>>> more powerful interface. The OutputFormat originally comes from the
>>> batch API, whereas the SinkFunction originates from streaming. Those
>>> were more separate code paths in the past. Ultimately, it would make
>>> sense to have only the OutputFormat interface but I think we have to
>>> keep it to not break the API.
>>>
>>> If you need the lifecycle methods in streaming, there is
>>> RichSinkFunction, which implements OutputFormat and SinkFunction. In
>>> addition, it gives you access to the RuntimeContext. You can pass this
>>> directly to the "addSink(sinkFunction)" API method.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <nd...@apache.org>
>>> wrote:
>>> > Heya,
>>> >
>>> > Is there a plan to consolidate these two interfaces? They appear to
>>> provide
>>> > identical functionality, differing only in lifecycle management. I
>>> found
>>> > myself writing an adaptor so I can consume an OutputFormat where a
>>> > SinkFunction is expected; there's not much to it. This seems like code
>>> that
>>> > Flink should ship.
>>> >
>>> > Maybe one interface or the other can be deprecated for 1.0 API?
>>> >
>>> > Thanks,
>>> > Nick
>>>
>>
>>
>

Re: OutputFormat vs SinkFunction

Posted by Maximilian Michels <mx...@apache.org>.
Changing the class hierarchy would break backwards-compatibility of the
API. However, we could add another method to DataStream to easily use
OutputFormats in streaming.

How did you write your adapter? I came up with the one below. Admittedly,
it is sort of a hack but works fine. By the way, you can also use the
DataStream.write(OutputFormat format) method to use any OutputFormat. The
code is below is just if you really only want to use
DataStream.addSink(SinkFunction function).

Cheers,
Max

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.Collection;

public class OutputFormatAdapter<T> extends LocalCollectionOutputFormat<T>
   implements SinkFunction<T>, RichFunction {

   public OutputFormatAdapter(Collection<T> out) {
      super(out);
   }

   @Override
   public void invoke(T value) throws Exception {
      super.writeRecord(value);
   }

   @Override
   public void open(Configuration parameters) throws Exception {
      super.open(getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
   }

   @Override
   public IterationRuntimeContext getIterationRuntimeContext() {
      throw new UnsupportedOperationException("This is not supported.");
   }


   /** Small test */
   public static void main(String[] args) throws Exception {

      StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

      final DataStreamSource<Long> longDataStreamSource =
env.generateSequence(0, 1000);

      final ArrayList<Long> longs = new ArrayList<>();

      longDataStreamSource.addSink(new OutputFormatAdapter<>(longs));

      env.execute();

      for (long l : longs) {
         System.out.println(l);
      }
   }
}



On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk <nd...@apache.org> wrote:

> In my case, I have my application code that is calling addSink, for which
> I'm writing a test that needs to use LocalCollectionOutputFormat. Having
> two separate class hierarchies is not helpful, hence the adapter. Much of
> this code already exists in the implementation of FileSinkFunction, so the
> project already supports it in a limited way.
>
> On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <mx...@apache.org> wrote:
>
>> Hi Nick,
>>
>> SinkFunction just implements user-defined functions on incoming
>> elements. OutputFormat offers more lifecycle methods. Thus it is a
>> more powerful interface. The OutputFormat originally comes from the
>> batch API, whereas the SinkFunction originates from streaming. Those
>> were more separate code paths in the past. Ultimately, it would make
>> sense to have only the OutputFormat interface but I think we have to
>> keep it to not break the API.
>>
>> If you need the lifecycle methods in streaming, there is
>> RichSinkFunction, which implements OutputFormat and SinkFunction. In
>> addition, it gives you access to the RuntimeContext. You can pass this
>> directly to the "addSink(sinkFunction)" API method.
>>
>> Cheers,
>> Max
>>
>> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <nd...@apache.org> wrote:
>> > Heya,
>> >
>> > Is there a plan to consolidate these two interfaces? They appear to
>> provide
>> > identical functionality, differing only in lifecycle management. I found
>> > myself writing an adaptor so I can consume an OutputFormat where a
>> > SinkFunction is expected; there's not much to it. This seems like code
>> that
>> > Flink should ship.
>> >
>> > Maybe one interface or the other can be deprecated for 1.0 API?
>> >
>> > Thanks,
>> > Nick
>>
>
>

Re: OutputFormat vs SinkFunction

Posted by Nick Dimiduk <nd...@apache.org>.
In my case, I have my application code that is calling addSink, for which
I'm writing a test that needs to use LocalCollectionOutputFormat. Having
two separate class hierarchies is not helpful, hence the adapter. Much of
this code already exists in the implementation of FileSinkFunction, so the
project already supports it in a limited way.

On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels <mx...@apache.org> wrote:

> Hi Nick,
>
> SinkFunction just implements user-defined functions on incoming
> elements. OutputFormat offers more lifecycle methods. Thus it is a
> more powerful interface. The OutputFormat originally comes from the
> batch API, whereas the SinkFunction originates from streaming. Those
> were more separate code paths in the past. Ultimately, it would make
> sense to have only the OutputFormat interface but I think we have to
> keep it to not break the API.
>
> If you need the lifecycle methods in streaming, there is
> RichSinkFunction, which implements OutputFormat and SinkFunction. In
> addition, it gives you access to the RuntimeContext. You can pass this
> directly to the "addSink(sinkFunction)" API method.
>
> Cheers,
> Max
>
> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <nd...@apache.org> wrote:
> > Heya,
> >
> > Is there a plan to consolidate these two interfaces? They appear to
> provide
> > identical functionality, differing only in lifecycle management. I found
> > myself writing an adaptor so I can consume an OutputFormat where a
> > SinkFunction is expected; there's not much to it. This seems like code
> that
> > Flink should ship.
> >
> > Maybe one interface or the other can be deprecated for 1.0 API?
> >
> > Thanks,
> > Nick
>

Re: OutputFormat vs SinkFunction

Posted by Maximilian Michels <mx...@apache.org>.
Hi Nick,

SinkFunction just implements user-defined functions on incoming
elements. OutputFormat offers more lifecycle methods. Thus it is a
more powerful interface. The OutputFormat originally comes from the
batch API, whereas the SinkFunction originates from streaming. Those
were more separate code paths in the past. Ultimately, it would make
sense to have only the OutputFormat interface but I think we have to
keep it to not break the API.

If you need the lifecycle methods in streaming, there is
RichSinkFunction, which implements OutputFormat and SinkFunction. In
addition, it gives you access to the RuntimeContext. You can pass this
directly to the "addSink(sinkFunction)" API method.

Cheers,
Max

On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk <nd...@apache.org> wrote:
> Heya,
>
> Is there a plan to consolidate these two interfaces? They appear to provide
> identical functionality, differing only in lifecycle management. I found
> myself writing an adaptor so I can consume an OutputFormat where a
> SinkFunction is expected; there's not much to it. This seems like code that
> Flink should ship.
>
> Maybe one interface or the other can be deprecated for 1.0 API?
>
> Thanks,
> Nick