You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Christopher Lee <ch...@gmail.com> on 2023/04/03 05:45:39 UTC

Access ExecutionConfig from new Source and Sink API

Hello,

I'm trying to develop Flink connectors to NATS using the new FLIP-27 and
FLIP-143 APIs. The scaffolding is more complicated than the old
SourceFunction and SinkFunction, but not terrible. However I can't figure
out how to access the ExecutionConfig under these new APIs. This was
possible in the old APIs by way of the RuntimeContext of the
AbstractRichFunction (which are extended by RichSourceFunction and
RichSinkFunction).

The reason I would like this is:  some interactions with external systems
may be invalid under certain Flink job execution parameters. Consider a
system like NATS which allows for acknowledgements of messages received. I
would ideally acknowledge all received messages by the source connector
during checkpointing. If I fail to acknowledge the delivered messages,
after a pre-configured amount of time, NATS would resend the message (which
is good in my case for fault tolerance).

However, if a Flink job using these connectors has disabled checkpointing
or made the interval too large, the connector will never acknowledge
delivered messages and the NATS system may send the message again and cause
duplicate data. I would be able to avoid this if I could access the
ExecutionConfig to check these parameters and throw early.

I know that the SourceReaderContext gives me access to the Configuration,
but that doesn't handle the case where the execution-environment is set
programatically in a job definition rather than through configuration. Any
ideas?

Thanks,
Chris

Re: Access ExecutionConfig from new Source and Sink API

Posted by Hang Ruan <ru...@gmail.com>.
Hi, christopher,

I think there is already about the ExecutionConfig for new Sink API in
the FLIP-287[1]. What we actually need is a read-only ExecutionConfig for
Source API and Sink API.
Maybe we could continue to discuss this topic under FLIP-287.

Best,
Hang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853

Yufan Sheng <sy...@gmail.com> 于2023年4月3日周一 14:06写道:

> I agree with you. It's quite useful to access the ExecutionConfig in
> Source API. When I develop the flink-connector-pulsar. The only
> configuration that I can't access is the checkpoint configure which is
> defined in ExecutionConfig. I can switch the behavior automatically by
> the checkpoint switch. So I have to add more custom configurations for
> the Pulsar Source.
>
> On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee <ch...@gmail.com>
> wrote:
> >
> > Hello,
> >
> > I'm trying to develop Flink connectors to NATS using the new FLIP-27 and
> FLIP-143 APIs. The scaffolding is more complicated than the old
> SourceFunction and SinkFunction, but not terrible. However I can't figure
> out how to access the ExecutionConfig under these new APIs. This was
> possible in the old APIs by way of the RuntimeContext of the
> AbstractRichFunction (which are extended by RichSourceFunction and
> RichSinkFunction).
> >
> > The reason I would like this is:  some interactions with external
> systems may be invalid under certain Flink job execution parameters.
> Consider a system like NATS which allows for acknowledgements of messages
> received. I would ideally acknowledge all received messages by the source
> connector during checkpointing. If I fail to acknowledge the delivered
> messages, after a pre-configured amount of time, NATS would resend the
> message (which is good in my case for fault tolerance).
> >
> > However, if a Flink job using these connectors has disabled
> checkpointing or made the interval too large, the connector will never
> acknowledge delivered messages and the NATS system may send the message
> again and cause duplicate data. I would be able to avoid this if I could
> access the ExecutionConfig to check these parameters and throw early.
> >
> > I know that the SourceReaderContext gives me access to the
> Configuration, but that doesn't handle the case where the
> execution-environment is set programatically in a job definition rather
> than through configuration. Any ideas?
> >
> > Thanks,
> > Chris
>

Re: Access ExecutionConfig from new Source and Sink API

Posted by Yufan Sheng <sy...@gmail.com>.
I agree with you. It's quite useful to access the ExecutionConfig in
Source API. When I develop the flink-connector-pulsar. The only
configuration that I can't access is the checkpoint configure which is
defined in ExecutionConfig. I can switch the behavior automatically by
the checkpoint switch. So I have to add more custom configurations for
the Pulsar Source.

On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee <ch...@gmail.com> wrote:
>
> Hello,
>
> I'm trying to develop Flink connectors to NATS using the new FLIP-27 and FLIP-143 APIs. The scaffolding is more complicated than the old SourceFunction and SinkFunction, but not terrible. However I can't figure out how to access the ExecutionConfig under these new APIs. This was possible in the old APIs by way of the RuntimeContext of the AbstractRichFunction (which are extended by RichSourceFunction and RichSinkFunction).
>
> The reason I would like this is:  some interactions with external systems may be invalid under certain Flink job execution parameters. Consider a system like NATS which allows for acknowledgements of messages received. I would ideally acknowledge all received messages by the source connector during checkpointing. If I fail to acknowledge the delivered messages, after a pre-configured amount of time, NATS would resend the message (which is good in my case for fault tolerance).
>
> However, if a Flink job using these connectors has disabled checkpointing or made the interval too large, the connector will never acknowledge delivered messages and the NATS system may send the message again and cause duplicate data. I would be able to avoid this if I could access the ExecutionConfig to check these parameters and throw early.
>
> I know that the SourceReaderContext gives me access to the Configuration, but that doesn't handle the case where the execution-environment is set programatically in a job definition rather than through configuration. Any ideas?
>
> Thanks,
> Chris