You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrew Roberts <ar...@fuze.com> on 2020/01/24 16:58:56 UTC

Is there anything strictly special about sink functions?

Hello,

I’m trying to push some behavior that we’ve currently got in a large, stateful SinkFunction implementation into Flink’s windowing system. The task at hand is similar to what StreamingFileSink provides, but more flexible. I don’t want to re-implement that sink, because it uses the StreamingRuntimeContext.getProcessingTimeService() via a cast - that class is marked as internal, and I’d like to avoid the exposure to an interface that could change. Extending it similarly introduces complexity I would rather not add to our codebase.

WindowedStream.process() provides more or less the pieces I need, but the stream continues on after a ProcessFunction - there’s no way to process() directly into a sink. I could use a ProcessFunction[In, Unit, Key, Window], and follow that immediately with a no-op sink that discards the Unit values, or I could just leave the stream “unfinished," with no sink.

Is there a downside to either of these approaches? Is there anything special about doing sink-like work in a ProcessFunction or FlatMapFunction instead of a SinkFunction?

Thanks,

Andrew



-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*

Re: Is there anything strictly special about sink functions?

Posted by Till Rohrmann <tr...@apache.org>.
Yes, checkpointing should behave normally without a sink. If I am not
mistaken, then sinks should indeed be isomorphic to FlatMap[A, Nothing].
However, there is no guarantee that this will always stay like this.

Cheers,
Till

On Wed, Jan 29, 2020 at 2:53 PM Andrew Roberts <ar...@fuze.com> wrote:

> Can I expect checkpointing to behave normally without a sink, or do sink
> functions Invoke some special behavior?
>
> My hope is that sinks are isomorphic to FlatMap[A, Nothing], but it’s a
> challenge to verify all the bits of behavior observationally.
>
> Thanks for all your help!
>
> On Jan 29, 2020, at 7:58 AM, Till Rohrmann <tr...@apache.org> wrote:
>
> 
> As far as I know you don't have to define a sink in order to define a
> valid Flink program (using Flink >= 1.9). Your topology can simply end in a
> map function and it should be executable once you call env.execute().
>
> Cheers,
> Till
>
> On Tue, Jan 28, 2020 at 10:06 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> As Konstantin said, you need to use a sink, but you could use
>> `org.apache.flink.streaming.api.functions.sink.DiscardingSink`.
>>
>> There is nothing inherently wrong with outputting things through a UDF.
>> You need to solve the same challenges as in a SinkFunction: you need to
>> implement your own state management. Also make sure that you can handle
>> duplicates occurring during recovery after a restart.
>>
>> On Tue, Jan 28, 2020 at 6:43 AM Konstantin Knauf <
>> konstantin@ververica.com> wrote:
>>
>>> Hi Andrew,
>>>
>>> as far as I know there is nothing particularly special about the sink in
>>> terms of how it handles state or time. You can not leave the pipeline
>>> "unfinished", only sinks trigger the execution of the whole pipeline.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>>
>>>
>>> On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts <ar...@fuze.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I’m trying to push some behavior that we’ve currently got in a large,
>>>> stateful SinkFunction implementation into Flink’s windowing system. The
>>>> task at hand is similar to what StreamingFileSink provides, but more
>>>> flexible. I don’t want to re-implement that sink, because it uses the
>>>> StreamingRuntimeContext.getProcessingTimeService() via a cast - that class
>>>> is marked as internal, and I’d like to avoid the exposure to an interface
>>>> that could change. Extending it similarly introduces complexity I would
>>>> rather not add to our codebase.
>>>>
>>>> WindowedStream.process() provides more or less the pieces I need, but
>>>> the stream continues on after a ProcessFunction - there’s no way to
>>>> process() directly into a sink. I could use a ProcessFunction[In, Unit,
>>>> Key, Window], and follow that immediately with a no-op sink that discards
>>>> the Unit values, or I could just leave the stream “unfinished," with no
>>>> sink.
>>>>
>>>> Is there a downside to either of these approaches? Is there anything
>>>> special about doing sink-like work in a ProcessFunction or FlatMapFunction
>>>> instead of a SinkFunction?
>>>>
>>>> Thanks,
>>>>
>>>> Andrew
>>>>
>>>>
>>>>
>>>> --
>>>> *Confidentiality Notice: The information contained in this e-mail and
>>>> any
>>>>
>>>> attachments may be confidential. If you are not an intended recipient,
>>>> you
>>>>
>>>> are hereby notified that any dissemination, distribution or copying of
>>>> this
>>>>
>>>> e-mail is strictly prohibited. If you have received this e-mail in
>>>> error,
>>>>
>>>> please notify the sender and permanently delete the e-mail and any
>>>>
>>>> attachments immediately. You should not retain, copy or use this e-mail
>>>> or
>>>>
>>>> any attachment for any purpose, nor disclose all or any part of the
>>>>
>>>> contents to any other person. Thank you.*
>>>>
>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>>
>>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>>
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Tony) Cheng
>>>
>>
> *Confidentiality Notice: The information contained in this e-mail and any
> attachments may be confidential. If you are not an intended recipient, you
> are hereby notified that any dissemination, distribution or copying of this
> e-mail is strictly prohibited. If you have received this e-mail in error,
> please notify the sender and permanently delete the e-mail and any
> attachments immediately. You should not retain, copy or use this e-mail or
> any attachment for any purpose, nor disclose all or any part of the
> contents to any other person. Thank you.*

Re: Is there anything strictly special about sink functions?

Posted by Andrew Roberts <ar...@fuze.com>.
Can I expect checkpointing to behave normally without a sink, or do sink functions Invoke some special behavior?

My hope is that sinks are isomorphic to FlatMap[A, Nothing], but it’s a challenge to verify all the bits of behavior observationally. 

Thanks for all your help!

> On Jan 29, 2020, at 7:58 AM, Till Rohrmann <tr...@apache.org> wrote:
> 
> 
> As far as I know you don't have to define a sink in order to define a valid Flink program (using Flink >= 1.9). Your topology can simply end in a map function and it should be executable once you call env.execute().
> 
> Cheers,
> Till
> 
>> On Tue, Jan 28, 2020 at 10:06 AM Arvid Heise <ar...@ververica.com> wrote:
>> As Konstantin said, you need to use a sink, but you could use `org.apache.flink.streaming.api.functions.sink.DiscardingSink`. 
>> 
>> There is nothing inherently wrong with outputting things through a UDF. You need to solve the same challenges as in a SinkFunction: you need to implement your own state management. Also make sure that you can handle duplicates occurring during recovery after a restart.
>> 
>>> On Tue, Jan 28, 2020 at 6:43 AM Konstantin Knauf <ko...@ververica.com> wrote:
>>> Hi Andrew, 
>>> 
>>> as far as I know there is nothing particularly special about the sink in terms of how it handles state or time. You can not leave the pipeline "unfinished", only sinks trigger the execution of the whole pipeline.
>>> 
>>> Cheers, 
>>> 
>>> Konstantin
>>> 
>>> 
>>> 
>>>> On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts <ar...@fuze.com> wrote:
>>>> Hello,
>>>> 
>>>> I’m trying to push some behavior that we’ve currently got in a large, stateful SinkFunction implementation into Flink’s windowing system. The task at hand is similar to what StreamingFileSink provides, but more flexible. I don’t want to re-implement that sink, because it uses the StreamingRuntimeContext.getProcessingTimeService() via a cast - that class is marked as internal, and I’d like to avoid the exposure to an interface that could change. Extending it similarly introduces complexity I would rather not add to our codebase.
>>>> 
>>>> WindowedStream.process() provides more or less the pieces I need, but the stream continues on after a ProcessFunction - there’s no way to process() directly into a sink. I could use a ProcessFunction[In, Unit, Key, Window], and follow that immediately with a no-op sink that discards the Unit values, or I could just leave the stream “unfinished," with no sink.
>>>> 
>>>> Is there a downside to either of these approaches? Is there anything special about doing sink-like work in a ProcessFunction or FlatMapFunction instead of a SinkFunction?
>>>> 
>>>> Thanks,
>>>> 
>>>> Andrew
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> *Confidentiality Notice: The information contained in this e-mail and any
>>>> 
>>>> attachments may be confidential. If you are not an intended recipient, you
>>>> 
>>>> are hereby notified that any dissemination, distribution or copying of this
>>>> 
>>>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>>> 
>>>> please notify the sender and permanently delete the e-mail and any
>>>> 
>>>> attachments immediately. You should not retain, copy or use this e-mail or
>>>> 
>>>> any attachment for any purpose, nor disclose all or any part of the
>>>> 
>>>> contents to any other person. Thank you.*
>>> 
>>> 
>>> -- 
>>> Konstantin Knauf | Solutions Architect
>>> +49 160 91394525
>>> 
>>> Follow us @VervericaData Ververica
>>> 
>>> --
>>> Join Flink Forward - The Apache Flink Conference
>>> Stream Processing | Event Driven | Real Time
>>> --
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng

-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*

Re: Is there anything strictly special about sink functions?

Posted by Till Rohrmann <tr...@apache.org>.
As far as I know you don't have to define a sink in order to define a valid
Flink program (using Flink >= 1.9). Your topology can simply end in a map
function and it should be executable once you call env.execute().

Cheers,
Till

On Tue, Jan 28, 2020 at 10:06 AM Arvid Heise <ar...@ververica.com> wrote:

> As Konstantin said, you need to use a sink, but you could use
> `org.apache.flink.streaming.api.functions.sink.DiscardingSink`.
>
> There is nothing inherently wrong with outputting things through a UDF.
> You need to solve the same challenges as in a SinkFunction: you need to
> implement your own state management. Also make sure that you can handle
> duplicates occurring during recovery after a restart.
>
> On Tue, Jan 28, 2020 at 6:43 AM Konstantin Knauf <ko...@ververica.com>
> wrote:
>
>> Hi Andrew,
>>
>> as far as I know there is nothing particularly special about the sink in
>> terms of how it handles state or time. You can not leave the pipeline
>> "unfinished", only sinks trigger the execution of the whole pipeline.
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>>
>> On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts <ar...@fuze.com> wrote:
>>
>>> Hello,
>>>
>>> I’m trying to push some behavior that we’ve currently got in a large,
>>> stateful SinkFunction implementation into Flink’s windowing system. The
>>> task at hand is similar to what StreamingFileSink provides, but more
>>> flexible. I don’t want to re-implement that sink, because it uses the
>>> StreamingRuntimeContext.getProcessingTimeService() via a cast - that class
>>> is marked as internal, and I’d like to avoid the exposure to an interface
>>> that could change. Extending it similarly introduces complexity I would
>>> rather not add to our codebase.
>>>
>>> WindowedStream.process() provides more or less the pieces I need, but
>>> the stream continues on after a ProcessFunction - there’s no way to
>>> process() directly into a sink. I could use a ProcessFunction[In, Unit,
>>> Key, Window], and follow that immediately with a no-op sink that discards
>>> the Unit values, or I could just leave the stream “unfinished," with no
>>> sink.
>>>
>>> Is there a downside to either of these approaches? Is there anything
>>> special about doing sink-like work in a ProcessFunction or FlatMapFunction
>>> instead of a SinkFunction?
>>>
>>> Thanks,
>>>
>>> Andrew
>>>
>>>
>>>
>>> --
>>> *Confidentiality Notice: The information contained in this e-mail and any
>>>
>>> attachments may be confidential. If you are not an intended recipient,
>>> you
>>>
>>> are hereby notified that any dissemination, distribution or copying of
>>> this
>>>
>>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>>
>>> please notify the sender and permanently delete the e-mail and any
>>>
>>> attachments immediately. You should not retain, copy or use this e-mail
>>> or
>>>
>>> any attachment for any purpose, nor disclose all or any part of the
>>>
>>> contents to any other person. Thank you.*
>>>
>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>>
>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Tony) Cheng
>>
>

Re: Is there anything strictly special about sink functions?

Posted by Arvid Heise <ar...@ververica.com>.
As Konstantin said, you need to use a sink, but you could use
`org.apache.flink.streaming.api.functions.sink.DiscardingSink`.

There is nothing inherently wrong with outputting things through a UDF. You
need to solve the same challenges as in a SinkFunction: you need to
implement your own state management. Also make sure that you can handle
duplicates occurring during recovery after a restart.

On Tue, Jan 28, 2020 at 6:43 AM Konstantin Knauf <ko...@ververica.com>
wrote:

> Hi Andrew,
>
> as far as I know there is nothing particularly special about the sink in
> terms of how it handles state or time. You can not leave the pipeline
> "unfinished", only sinks trigger the execution of the whole pipeline.
>
> Cheers,
>
> Konstantin
>
>
>
> On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts <ar...@fuze.com> wrote:
>
>> Hello,
>>
>> I’m trying to push some behavior that we’ve currently got in a large,
>> stateful SinkFunction implementation into Flink’s windowing system. The
>> task at hand is similar to what StreamingFileSink provides, but more
>> flexible. I don’t want to re-implement that sink, because it uses the
>> StreamingRuntimeContext.getProcessingTimeService() via a cast - that class
>> is marked as internal, and I’d like to avoid the exposure to an interface
>> that could change. Extending it similarly introduces complexity I would
>> rather not add to our codebase.
>>
>> WindowedStream.process() provides more or less the pieces I need, but the
>> stream continues on after a ProcessFunction - there’s no way to process()
>> directly into a sink. I could use a ProcessFunction[In, Unit, Key, Window],
>> and follow that immediately with a no-op sink that discards the Unit
>> values, or I could just leave the stream “unfinished," with no sink.
>>
>> Is there a downside to either of these approaches? Is there anything
>> special about doing sink-like work in a ProcessFunction or FlatMapFunction
>> instead of a SinkFunction?
>>
>> Thanks,
>>
>> Andrew
>>
>>
>>
>> --
>> *Confidentiality Notice: The information contained in this e-mail and any
>>
>> attachments may be confidential. If you are not an intended recipient, you
>>
>> are hereby notified that any dissemination, distribution or copying of
>> this
>>
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>
>> please notify the sender and permanently delete the e-mail and any
>>
>> attachments immediately. You should not retain, copy or use this e-mail or
>>
>> any attachment for any purpose, nor disclose all or any part of the
>>
>> contents to any other person. Thank you.*
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>

Re: Is there anything strictly special about sink functions?

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi Andrew,

as far as I know there is nothing particularly special about the sink in
terms of how it handles state or time. You can not leave the pipeline
"unfinished", only sinks trigger the execution of the whole pipeline.

Cheers,

Konstantin



On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts <ar...@fuze.com> wrote:

> Hello,
>
> I’m trying to push some behavior that we’ve currently got in a large,
> stateful SinkFunction implementation into Flink’s windowing system. The
> task at hand is similar to what StreamingFileSink provides, but more
> flexible. I don’t want to re-implement that sink, because it uses the
> StreamingRuntimeContext.getProcessingTimeService() via a cast - that class
> is marked as internal, and I’d like to avoid the exposure to an interface
> that could change. Extending it similarly introduces complexity I would
> rather not add to our codebase.
>
> WindowedStream.process() provides more or less the pieces I need, but the
> stream continues on after a ProcessFunction - there’s no way to process()
> directly into a sink. I could use a ProcessFunction[In, Unit, Key, Window],
> and follow that immediately with a no-op sink that discards the Unit
> values, or I could just leave the stream “unfinished," with no sink.
>
> Is there a downside to either of these approaches? Is there anything
> special about doing sink-like work in a ProcessFunction or FlatMapFunction
> instead of a SinkFunction?
>
> Thanks,
>
> Andrew
>
>
>
> --
> *Confidentiality Notice: The information contained in this e-mail and any
>
> attachments may be confidential. If you are not an intended recipient, you
>
> are hereby notified that any dissemination, distribution or copying of this
>
> e-mail is strictly prohibited. If you have received this e-mail in error,
>
> please notify the sender and permanently delete the e-mail and any
>
> attachments immediately. You should not retain, copy or use this e-mail or
>
> any attachment for any purpose, nor disclose all or any part of the
>
> contents to any other person. Thank you.*
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData Ververica <https://www.ververica.com/>


--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng