You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Lijie Wang <wa...@gmail.com> on 2023/02/01 11:27:45 UTC

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

+1 for Option 2, if we can abstract an "ReadableExecutionConfig"
interface(contains all is/get mehtod), and let ExecutionConfig implements
ReadableExecutionConfig

Best,
Lijie

João Boto <es...@apache.org> 于2023年1月17日周二 20:39写道:

> Hi all,
>
> As establish a read-only contract seems to be consensual approach, talking
> to Lijie we saw two ways for doing this..
>
> Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just
> like the UnmodifiableConfiguration)
> Pros:
> - we have all the get methods
> - don't need to change TypeInformation.createSerializer(ExecutionConfig
> config)
> Cons:
> - we have to override 34 methods that modify things..
> - new methods to ExecutionConfig will need to be override on
> UnmodifiableExecutionConfig
>
>
> Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig.
> Pros:
> - new class so we don't need to override nothing.
> - modifications to ExecutionConfig don't affect this class
> Cons:
> - need to change TypeInformation adding
> createSerializer(UnmodifiableExecutionConfig config)
> - need to add all get methods or only what needed (this could be a pros)
>
>
> What option you think is better?
>
>
>
> On 2023/01/13 14:15:04 Joao Boto wrote:
> > Hi flink devs,
> >
> > I'd like to start a discussion thread for FLIP-287[1].
> > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> > specially for the sink[3].
> >
> > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> > This  changes are necessary to correct migrate the current sinks to
> SinkV2
> > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> >
> > Comments are welcome!
> > Thanks,
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > [2]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > [3] https://issues.apache.org/jira/browse/FLINK-25421
> >
>

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by Lijie Wang <wa...@gmail.com>.
Hi Konstantin,

I checked the usage of ExecutionConfig in Kinesis and KafKa sinks:
- Kinesis sink: ExecutionConfig is not used by Kinesis sink. The one that
uses getAutoWatermarkInterval is the Kinesis Source, Joao may have made a
mistake.
- Kafka sink: isObjectReuseEnabled and ExecutionConfig are used in upsert
kafka table sink. The upsert kafka table sink obtains the ExecutionConfig
through DataStreamSinkProvider, however, this way cannot be used for
datastream sink and other SinkRuntimeProviders.

Besides, I know that all the jdbc sinks(DataStream/Table) need the
isObjectReuseEnabled and ExecutionConfig. The jdbc sink will buffer the
records received and only flush them out when the buffer is full or a
periodic timer is triggered or a checkpoint happens. The jdbc sinks will
decide whether to buffer the copies or the original records based on
isObjectReuseEnabled, when the object reuse is enabled, we should buffer
the copies(because the content of the objects may be changed before flush),
otherwise we should buffer the original records. And it needs the
ExecutionConfig to create TypeSerializer to copy the records.

Actaully, the upsert kafka table sink is similar to jdbc sink, I think all
the sinks that with the "buffer records" behavior needs the
isObjectReuseEnabled and ExecutionConfig.

Best,
Lijie

Konstantin Knauf <kn...@apache.org> 于2023年2月4日周六 01:41写道:

> Hi everyone,
>
> if I am not mistaken of the sinks mentioned by Joao Kafka, Kinesis &
> Kinesis already use the Sink2 API. How were those implemented without
> exposing the ExecutionConfig?
>
> Best,
>
> Konstantin
>
>
> Am Mi., 1. Feb. 2023 um 12:28 Uhr schrieb Lijie Wang <
> wangdachui9501@gmail.com>:
>
> > +1 for Option 2, if we can abstract an "ReadableExecutionConfig"
> > interface(contains all is/get mehtod), and let ExecutionConfig implements
> > ReadableExecutionConfig
> >
> > Best,
> > Lijie
> >
> > João Boto <es...@apache.org> 于2023年1月17日周二 20:39写道:
> >
> > > Hi all,
> > >
> > > As establish a read-only contract seems to be consensual approach,
> > talking
> > > to Lijie we saw two ways for doing this..
> > >
> > > Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig
> (just
> > > like the UnmodifiableConfiguration)
> > > Pros:
> > > - we have all the get methods
> > > - don't need to change TypeInformation.createSerializer(ExecutionConfig
> > > config)
> > > Cons:
> > > - we have to override 34 methods that modify things..
> > > - new methods to ExecutionConfig will need to be override on
> > > UnmodifiableExecutionConfig
> > >
> > >
> > > Option 2: UnmodifiableExecutionConfig without extending
> ExecutionConfig.
> > > Pros:
> > > - new class so we don't need to override nothing.
> > > - modifications to ExecutionConfig don't affect this class
> > > Cons:
> > > - need to change TypeInformation adding
> > > createSerializer(UnmodifiableExecutionConfig config)
> > > - need to add all get methods or only what needed (this could be a
> pros)
> > >
> > >
> > > What option you think is better?
> > >
> > >
> > >
> > > On 2023/01/13 14:15:04 Joao Boto wrote:
> > > > Hi flink devs,
> > > >
> > > > I'd like to start a discussion thread for FLIP-287[1].
> > > > This comes from an offline discussion with @Lijie Wang, from
> > FLIP-239[2]
> > > > specially for the sink[3].
> > > >
> > > > Basically to expose the ExecutionConfig and JobId on
> > SinkV2#InitContext.
> > > > This  changes are necessary to correct migrate the current sinks to
> > > SinkV2
> > > > like JdbcSink, KafkaTableSink and so on, that relies on
> RuntimeContext
> > > >
> > > > Comments are welcome!
> > > > Thanks,
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> > > >
> > >
> >
>
>
> --
> https://twitter.com/snntrable
> https://github.com/knaufk
>

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by Konstantin Knauf <kn...@apache.org>.
Hi everyone,

if I am not mistaken of the sinks mentioned by Joao Kafka, Kinesis &
Kinesis already use the Sink2 API. How were those implemented without
exposing the ExecutionConfig?

Best,

Konstantin


Am Mi., 1. Feb. 2023 um 12:28 Uhr schrieb Lijie Wang <
wangdachui9501@gmail.com>:

> +1 for Option 2, if we can abstract an "ReadableExecutionConfig"
> interface(contains all is/get mehtod), and let ExecutionConfig implements
> ReadableExecutionConfig
>
> Best,
> Lijie
>
> João Boto <es...@apache.org> 于2023年1月17日周二 20:39写道:
>
> > Hi all,
> >
> > As establish a read-only contract seems to be consensual approach,
> talking
> > to Lijie we saw two ways for doing this..
> >
> > Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just
> > like the UnmodifiableConfiguration)
> > Pros:
> > - we have all the get methods
> > - don't need to change TypeInformation.createSerializer(ExecutionConfig
> > config)
> > Cons:
> > - we have to override 34 methods that modify things..
> > - new methods to ExecutionConfig will need to be override on
> > UnmodifiableExecutionConfig
> >
> >
> > Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig.
> > Pros:
> > - new class so we don't need to override nothing.
> > - modifications to ExecutionConfig don't affect this class
> > Cons:
> > - need to change TypeInformation adding
> > createSerializer(UnmodifiableExecutionConfig config)
> > - need to add all get methods or only what needed (this could be a pros)
> >
> >
> > What option you think is better?
> >
> >
> >
> > On 2023/01/13 14:15:04 Joao Boto wrote:
> > > Hi flink devs,
> > >
> > > I'd like to start a discussion thread for FLIP-287[1].
> > > This comes from an offline discussion with @Lijie Wang, from
> FLIP-239[2]
> > > specially for the sink[3].
> > >
> > > Basically to expose the ExecutionConfig and JobId on
> SinkV2#InitContext.
> > > This  changes are necessary to correct migrate the current sinks to
> > SinkV2
> > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> > >
> > > Comments are welcome!
> > > Thanks,
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> > >
> >
>


-- 
https://twitter.com/snntrable
https://github.com/knaufk