You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Zhu Zhu <re...@gmail.com> on 2023/04/03 09:42:28 UTC

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Hi João,

Thanks for creating this FLIP!
I'm overall +1 for it to unblock the migration of sinks to SinkV2.

Yet I think it's better to let the `ReadableExecutionConfig` extend
`ExecutionConfig`, because otherwise we have to introduce a new method
`TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
method may require every `TypeInformation` to implement it, including
Flink built-in ones and custom ones, otherwise exceptions will happen.
That goal, however, is pretty hard to achieve.

Thanks,
Zhu

João Boto <es...@apache.org> 于2023年2月28日周二 23:34写道:
>
> I have update the FLIP with the 2 options that we have discussed..
>
> Option 1: Expose ExecutionConfig directly on InitContext
> this have a minimal impact as we only have to expose the new methods
>
> Option 2: Expose ReadableExecutionConfig on InitContext
> with this option we have more impact as we need to add a new method to TypeInformation and change all implementations (current exists 72 implementations)
>
> Waiting for feedback or concerns about the two options

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

> I think we can get the serializer directly in InitContextImpl through
`getOperatorConfig().getTypeSerializerIn(0,
getUserCodeClassloader()).duplicate()`.

This should work, yes.

+1 to the updated FLIP so far. Thank you, Joao, for being on top of this!

Thanks,
Gordon

On Tue, May 30, 2023 at 12:34 AM João Boto <es...@apache.org> wrote:

> Hi Lijie,
>
> I left the two options to use whatever you want, but I can clean the FLIP
> to have only one..
>
> Updated the FLIP
>
> Regards
>
> On 2023/05/23 07:23:45 Lijie Wang wrote:
> > Hi Joao,
> >
> > I noticed the FLIP currently contains the following 2 methods about type
> > serializer:
> >
> > (1) <IN> TypeSerializer<IN> createInputSerializer();
> > (2) <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType);
> >
> > Is the method (2) still needed now?
> >
> > Best,
> > Lijie
> >
> > João Boto <es...@apache.org> 于2023年5月19日周五 16:53写道:
> >
> > > Updated the FLIP to use this option.
> > >
> >
>

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by João Boto <es...@apache.org>.
Hi Lijie,

I left the two options to use whatever you want, but I can clean the FLIP to have only one..

Updated the FLIP

Regards

On 2023/05/23 07:23:45 Lijie Wang wrote:
> Hi Joao,
> 
> I noticed the FLIP currently contains the following 2 methods about type
> serializer:
> 
> (1) <IN> TypeSerializer<IN> createInputSerializer();
> (2) <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType);
> 
> Is the method (2) still needed now?
> 
> Best,
> Lijie
> 
> João Boto <es...@apache.org> 于2023年5月19日周五 16:53写道:
> 
> > Updated the FLIP to use this option.
> >
> 

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by Lijie Wang <wa...@gmail.com>.
Hi Joao,

I noticed the FLIP currently contains the following 2 methods about type
serializer:

(1) <IN> TypeSerializer<IN> createInputSerializer();
(2) <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType);

Is the method (2) still needed now?

Best,
Lijie

João Boto <es...@apache.org> 于2023年5月19日周五 16:53写道:

> Updated the FLIP to use this option.
>

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by João Boto <es...@apache.org>.
Updated the FLIP to use this option.

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by Lijie Wang <wa...@gmail.com>.
Hi,

+1 for `InitContext#createInputSerializer()` .

I think we can get the serializer directly in InitContextImpl through
`getOperatorConfig().getTypeSerializerIn(0,
getUserCodeClassloader()).duplicate()`.

Best,
Lijie.

João Boto <es...@apache.org> 于2023年4月24日周一 19:04写道:

> Hi @Gordon,
>
> `InitContext#createInputSerializer()` is a great option and will solve
> more than one problem, but I cant find a way to get the TypeInformation<T>
> on InitContextImpl (I can be missing something)
>
> On current (legacy) implementations we rely on interface ´
> InputTypeConfigurable´ to get the TypeInformation but this will not work
> for Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink)
> As a side note, the ExecutionConfig provided by this interface could not
> be used as can be changed after the call is made, for Table Planning for
> example on DefaultExecutor.configureBatchSpecificProperties()
>
> At the end what we need to do is something like:
> if (isObjectReuseEnabled()) serializer.copy(record) else record;
>
> So responding to your question, yes last option is ok for this but I dont
> see how to implementing it as Im missing the TypeInformation on
> InitContextImpl.
>
> Best regards,
>
> On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote:
> > Do we have to introduce
> `InitContext#createSerializer(TypeInformation<T>)`
> > which returns TypeSerializer<T>, or is it sufficient to only provide
> > `InitContext#createInputSerializer()` which returns TypeSerializer<IN>?
> >
> > I had the impression that buffering sinks like JDBC only need the
> > latter. @Joao, could you confirm?
> >
> > If that's the case, +1 to adding the following method signatures to
> > InitContext:
> > * TypeSerializer<IN> createInputSerializer()
> > * boolean isObjectReuseEnabled()
> >
> > Thanks,
> > Gordon
> >
> > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu <re...@gmail.com> wrote:
> >
> > > Good point! @Gordon
> > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a
> > > better option to me, so we do not need to introduce an unmodifiable
> > > `ExecutionConfig` at this moment.
> > >
> > > Hope that we can make `ExecutionConfig` a read-only interface in
> > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
> > > while mutating the values at runtime is actually an undefined behavior.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Tzu-Li (Gordon) Tai <tz...@apache.org> 于2023年4月18日周二 01:02写道:
> > > >
> > > > Hi,
> > > >
> > > > Sorry for chiming in late.
> > > >
> > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> > > > directly through Sink#InitContext is the right thing to do.
> > > >
> > > > 1. A lot of the read-only getter methods on ExecutionConfig are
> > > irrelevant
> > > > for sinks. Expanding the scope of the InitContext interface with so
> many
> > > > irrelevant methods is probably going to make writing unit tests a
> pain.
> > > >
> > > > 2. There's actually a few getter methods on `InitContext` that have
> > > > duplicate/redundant info for what ExecutionConfig exposes. For
> example,
> > > > InitContext#getNumberOfParallelSubtasks and
> InitContext#getAttemptNumber
> > > > currently exist and it can be confusing if users find 2 sources of
> that
> > > > information (either via the `InitContext` and via the wrapped
> > > > `ExecutionConfig`).
> > > >
> > > > All in all, it feels like `Sink#InitContext` was introduced
> initially as
> > > a
> > > > means to selectively only expose certain information to sinks.
> > > >
> > > > It looks like right now, the only requirement is that some sinks
> require
> > > 1)
> > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type.
> Would it
> > > > make sense to follow the original intent and only selectively expose
> > > these?
> > > > For 1), we can just add a new method to `InitContext` and forward the
> > > > information from `ExecutionConfig` accessible at the operator level.
> > > > For 2), would it make sense to create the serializer at the operator
> > > level
> > > > and then provide it through `InitContext`?
> > > >
> > > > Thanks,
> > > > Gordon
> > > >
> > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu <re...@gmail.com> wrote:
> > > >
> > > > > We can let the `InitContext` return `ExecutionConfig` in the
> interface.
> > > > > However, a `ReadableExecutionConfig` implementation should be
> returned
> > > > > so that exceptions will be thrown if users tries to modify the
> > > > > `ExecutionConfig`.
> > > > >
> > > > > We can rework all the setters of `ExecutionConfig` to internally
> > > invoke a
> > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig`
> can
> > > > > just override that method. But pay attention to a few exceptional
> > > > > setters, i.e. those for globalJobParameters and serializers.
> > > > >
> > > > > We should also explicitly state in the documentation of
> > > > > `InitContext #getExecutionConfig()`, that the returned
> > > `ExecutionConfig`
> > > > > is unmodifiable.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > João Boto <es...@apache.org> 于2023年4月17日周一 16:51写道:
> > > > > >
> > > > > > Hi Zhu,
> > > > > >
> > > > > > Thanks for you time for reviewing this.
> > > > > >
> > > > > > Extending ´ExecutionConfig´ will allow to modify the values in
> the
> > > > > config (this is what we want to prevent with Option2)
> > > > > >
> > > > > > To extend the ExecutionConfig is not simpler to do Option1
> (expose
> > > > > ExecutionConfig directly).
> > > > > >
> > > > > > Regards
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > > > > > > Hi João,
> > > > > > >
> > > > > > > Thanks for creating this FLIP!
> > > > > > > I'm overall +1 for it to unblock the migration of sinks to
> SinkV2.
> > > > > > >
> > > > > > > Yet I think it's better to let the `ReadableExecutionConfig`
> extend
> > > > > > > `ExecutionConfig`, because otherwise we have to introduce a new
> > > method
> > > > > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`.
> The
> > > new
> > > > > > > method may require every `TypeInformation` to implement it,
> > > including
> > > > > > > Flink built-in ones and custom ones, otherwise exceptions will
> > > happen.
> > > > > > > That goal, however, is pretty hard to achieve.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Zhu
> > > > > > >
> > > > > > > João Boto <es...@apache.org> 于2023年2月28日周二 23:34写道:
> > > > > > > >
> > > > > > > > I have update the FLIP with the 2 options that we have
> > > discussed..
> > > > > > > >
> > > > > > > > Option 1: Expose ExecutionConfig directly on InitContext
> > > > > > > > this have a minimal impact as we only have to expose the new
> > > methods
> > > > > > > >
> > > > > > > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > > > > > > with this option we have more impact as we need to add a new
> > > method
> > > > > to TypeInformation and change all implementations (current exists
> 72
> > > > > implementations)
> > > > > > > >
> > > > > > > > Waiting for feedback or concerns about the two options
> > > > > > >
> > > > >
> > >
> >
>

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by João Boto <es...@apache.org>.
Hi @Gordon,

`InitContext#createInputSerializer()` is a great option and will solve more than one problem, but I cant find a way to get the TypeInformation<T> on InitContextImpl (I can be missing something)

On current (legacy) implementations we rely on interface ´ InputTypeConfigurable´ to get the TypeInformation but this will not work for Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink)
As a side note, the ExecutionConfig provided by this interface could not be used as can be changed after the call is made, for Table Planning for example on DefaultExecutor.configureBatchSpecificProperties()

At the end what we need to do is something like:
if (isObjectReuseEnabled()) serializer.copy(record) else record;

So responding to your question, yes last option is ok for this but I dont see how to implementing it as Im missing the TypeInformation on InitContextImpl.

Best regards,

On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote:
> Do we have to introduce `InitContext#createSerializer(TypeInformation<T>)`
> which returns TypeSerializer<T>, or is it sufficient to only provide
> `InitContext#createInputSerializer()` which returns TypeSerializer<IN>?
> 
> I had the impression that buffering sinks like JDBC only need the
> latter. @Joao, could you confirm?
> 
> If that's the case, +1 to adding the following method signatures to
> InitContext:
> * TypeSerializer<IN> createInputSerializer()
> * boolean isObjectReuseEnabled()
> 
> Thanks,
> Gordon
> 
> On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu <re...@gmail.com> wrote:
> 
> > Good point! @Gordon
> > Introducing an `InitContext#createSerializer(TypeInformation)` looks a
> > better option to me, so we do not need to introduce an unmodifiable
> > `ExecutionConfig` at this moment.
> >
> > Hope that we can make `ExecutionConfig` a read-only interface in
> > Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
> > while mutating the values at runtime is actually an undefined behavior.
> >
> > Thanks,
> > Zhu
> >
> > Tzu-Li (Gordon) Tai <tz...@apache.org> 于2023年4月18日周二 01:02写道:
> > >
> > > Hi,
> > >
> > > Sorry for chiming in late.
> > >
> > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> > > directly through Sink#InitContext is the right thing to do.
> > >
> > > 1. A lot of the read-only getter methods on ExecutionConfig are
> > irrelevant
> > > for sinks. Expanding the scope of the InitContext interface with so many
> > > irrelevant methods is probably going to make writing unit tests a pain.
> > >
> > > 2. There's actually a few getter methods on `InitContext` that have
> > > duplicate/redundant info for what ExecutionConfig exposes. For example,
> > > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber
> > > currently exist and it can be confusing if users find 2 sources of that
> > > information (either via the `InitContext` and via the wrapped
> > > `ExecutionConfig`).
> > >
> > > All in all, it feels like `Sink#InitContext` was introduced initially as
> > a
> > > means to selectively only expose certain information to sinks.
> > >
> > > It looks like right now, the only requirement is that some sinks require
> > 1)
> > > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it
> > > make sense to follow the original intent and only selectively expose
> > these?
> > > For 1), we can just add a new method to `InitContext` and forward the
> > > information from `ExecutionConfig` accessible at the operator level.
> > > For 2), would it make sense to create the serializer at the operator
> > level
> > > and then provide it through `InitContext`?
> > >
> > > Thanks,
> > > Gordon
> > >
> > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu <re...@gmail.com> wrote:
> > >
> > > > We can let the `InitContext` return `ExecutionConfig` in the interface.
> > > > However, a `ReadableExecutionConfig` implementation should be returned
> > > > so that exceptions will be thrown if users tries to modify the
> > > > `ExecutionConfig`.
> > > >
> > > > We can rework all the setters of `ExecutionConfig` to internally
> > invoke a
> > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
> > > > just override that method. But pay attention to a few exceptional
> > > > setters, i.e. those for globalJobParameters and serializers.
> > > >
> > > > We should also explicitly state in the documentation of
> > > > `InitContext #getExecutionConfig()`, that the returned
> > `ExecutionConfig`
> > > > is unmodifiable.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > João Boto <es...@apache.org> 于2023年4月17日周一 16:51写道:
> > > > >
> > > > > Hi Zhu,
> > > > >
> > > > > Thanks for you time for reviewing this.
> > > > >
> > > > > Extending ´ExecutionConfig´ will allow to modify the values in the
> > > > config (this is what we want to prevent with Option2)
> > > > >
> > > > > To extend the ExecutionConfig is not simpler to do Option1 (expose
> > > > ExecutionConfig directly).
> > > > >
> > > > > Regards
> > > > >
> > > > >
> > > > >
> > > > > On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > > > > > Hi João,
> > > > > >
> > > > > > Thanks for creating this FLIP!
> > > > > > I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> > > > > >
> > > > > > Yet I think it's better to let the `ReadableExecutionConfig` extend
> > > > > > `ExecutionConfig`, because otherwise we have to introduce a new
> > method
> > > > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The
> > new
> > > > > > method may require every `TypeInformation` to implement it,
> > including
> > > > > > Flink built-in ones and custom ones, otherwise exceptions will
> > happen.
> > > > > > That goal, however, is pretty hard to achieve.
> > > > > >
> > > > > > Thanks,
> > > > > > Zhu
> > > > > >
> > > > > > João Boto <es...@apache.org> 于2023年2月28日周二 23:34写道:
> > > > > > >
> > > > > > > I have update the FLIP with the 2 options that we have
> > discussed..
> > > > > > >
> > > > > > > Option 1: Expose ExecutionConfig directly on InitContext
> > > > > > > this have a minimal impact as we only have to expose the new
> > methods
> > > > > > >
> > > > > > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > > > > > with this option we have more impact as we need to add a new
> > method
> > > > to TypeInformation and change all implementations (current exists 72
> > > > implementations)
> > > > > > >
> > > > > > > Waiting for feedback or concerns about the two options
> > > > > >
> > > >
> >
> 

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Do we have to introduce `InitContext#createSerializer(TypeInformation<T>)`
which returns TypeSerializer<T>, or is it sufficient to only provide
`InitContext#createInputSerializer()` which returns TypeSerializer<IN>?

I had the impression that buffering sinks like JDBC only need the
latter. @Joao, could you confirm?

If that's the case, +1 to adding the following method signatures to
InitContext:
* TypeSerializer<IN> createInputSerializer()
* boolean isObjectReuseEnabled()

Thanks,
Gordon

On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu <re...@gmail.com> wrote:

> Good point! @Gordon
> Introducing an `InitContext#createSerializer(TypeInformation)` looks a
> better option to me, so we do not need to introduce an unmodifiable
> `ExecutionConfig` at this moment.
>
> Hope that we can make `ExecutionConfig` a read-only interface in
> Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
> while mutating the values at runtime is actually an undefined behavior.
>
> Thanks,
> Zhu
>
> Tzu-Li (Gordon) Tai <tz...@apache.org> 于2023年4月18日周二 01:02写道:
> >
> > Hi,
> >
> > Sorry for chiming in late.
> >
> > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> > directly through Sink#InitContext is the right thing to do.
> >
> > 1. A lot of the read-only getter methods on ExecutionConfig are
> irrelevant
> > for sinks. Expanding the scope of the InitContext interface with so many
> > irrelevant methods is probably going to make writing unit tests a pain.
> >
> > 2. There's actually a few getter methods on `InitContext` that have
> > duplicate/redundant info for what ExecutionConfig exposes. For example,
> > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber
> > currently exist and it can be confusing if users find 2 sources of that
> > information (either via the `InitContext` and via the wrapped
> > `ExecutionConfig`).
> >
> > All in all, it feels like `Sink#InitContext` was introduced initially as
> a
> > means to selectively only expose certain information to sinks.
> >
> > It looks like right now, the only requirement is that some sinks require
> 1)
> > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it
> > make sense to follow the original intent and only selectively expose
> these?
> > For 1), we can just add a new method to `InitContext` and forward the
> > information from `ExecutionConfig` accessible at the operator level.
> > For 2), would it make sense to create the serializer at the operator
> level
> > and then provide it through `InitContext`?
> >
> > Thanks,
> > Gordon
> >
> > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu <re...@gmail.com> wrote:
> >
> > > We can let the `InitContext` return `ExecutionConfig` in the interface.
> > > However, a `ReadableExecutionConfig` implementation should be returned
> > > so that exceptions will be thrown if users tries to modify the
> > > `ExecutionConfig`.
> > >
> > > We can rework all the setters of `ExecutionConfig` to internally
> invoke a
> > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
> > > just override that method. But pay attention to a few exceptional
> > > setters, i.e. those for globalJobParameters and serializers.
> > >
> > > We should also explicitly state in the documentation of
> > > `InitContext #getExecutionConfig()`, that the returned
> `ExecutionConfig`
> > > is unmodifiable.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > João Boto <es...@apache.org> 于2023年4月17日周一 16:51写道:
> > > >
> > > > Hi Zhu,
> > > >
> > > > Thanks for you time for reviewing this.
> > > >
> > > > Extending ´ExecutionConfig´ will allow to modify the values in the
> > > config (this is what we want to prevent with Option2)
> > > >
> > > > To extend the ExecutionConfig is not simpler to do Option1 (expose
> > > ExecutionConfig directly).
> > > >
> > > > Regards
> > > >
> > > >
> > > >
> > > > On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > > > > Hi João,
> > > > >
> > > > > Thanks for creating this FLIP!
> > > > > I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> > > > >
> > > > > Yet I think it's better to let the `ReadableExecutionConfig` extend
> > > > > `ExecutionConfig`, because otherwise we have to introduce a new
> method
> > > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The
> new
> > > > > method may require every `TypeInformation` to implement it,
> including
> > > > > Flink built-in ones and custom ones, otherwise exceptions will
> happen.
> > > > > That goal, however, is pretty hard to achieve.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > João Boto <es...@apache.org> 于2023年2月28日周二 23:34写道:
> > > > > >
> > > > > > I have update the FLIP with the 2 options that we have
> discussed..
> > > > > >
> > > > > > Option 1: Expose ExecutionConfig directly on InitContext
> > > > > > this have a minimal impact as we only have to expose the new
> methods
> > > > > >
> > > > > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > > > > with this option we have more impact as we need to add a new
> method
> > > to TypeInformation and change all implementations (current exists 72
> > > implementations)
> > > > > >
> > > > > > Waiting for feedback or concerns about the two options
> > > > >
> > >
>

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by Zhu Zhu <re...@gmail.com>.
Good point! @Gordon
Introducing an `InitContext#createSerializer(TypeInformation)` looks a
better option to me, so we do not need to introduce an unmodifiable
`ExecutionConfig` at this moment.

Hope that we can make `ExecutionConfig` a read-only interface in
Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
while mutating the values at runtime is actually an undefined behavior.

Thanks,
Zhu

Tzu-Li (Gordon) Tai <tz...@apache.org> 于2023年4月18日周二 01:02写道:
>
> Hi,
>
> Sorry for chiming in late.
>
> I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> directly through Sink#InitContext is the right thing to do.
>
> 1. A lot of the read-only getter methods on ExecutionConfig are irrelevant
> for sinks. Expanding the scope of the InitContext interface with so many
> irrelevant methods is probably going to make writing unit tests a pain.
>
> 2. There's actually a few getter methods on `InitContext` that have
> duplicate/redundant info for what ExecutionConfig exposes. For example,
> InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber
> currently exist and it can be confusing if users find 2 sources of that
> information (either via the `InitContext` and via the wrapped
> `ExecutionConfig`).
>
> All in all, it feels like `Sink#InitContext` was introduced initially as a
> means to selectively only expose certain information to sinks.
>
> It looks like right now, the only requirement is that some sinks require 1)
> isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it
> make sense to follow the original intent and only selectively expose these?
> For 1), we can just add a new method to `InitContext` and forward the
> information from `ExecutionConfig` accessible at the operator level.
> For 2), would it make sense to create the serializer at the operator level
> and then provide it through `InitContext`?
>
> Thanks,
> Gordon
>
> On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu <re...@gmail.com> wrote:
>
> > We can let the `InitContext` return `ExecutionConfig` in the interface.
> > However, a `ReadableExecutionConfig` implementation should be returned
> > so that exceptions will be thrown if users tries to modify the
> > `ExecutionConfig`.
> >
> > We can rework all the setters of `ExecutionConfig` to internally invoke a
> > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
> > just override that method. But pay attention to a few exceptional
> > setters, i.e. those for globalJobParameters and serializers.
> >
> > We should also explicitly state in the documentation of
> > `InitContext #getExecutionConfig()`, that the returned `ExecutionConfig`
> > is unmodifiable.
> >
> > Thanks,
> > Zhu
> >
> > João Boto <es...@apache.org> 于2023年4月17日周一 16:51写道:
> > >
> > > Hi Zhu,
> > >
> > > Thanks for you time for reviewing this.
> > >
> > > Extending ´ExecutionConfig´ will allow to modify the values in the
> > config (this is what we want to prevent with Option2)
> > >
> > > To extend the ExecutionConfig is not simpler to do Option1 (expose
> > ExecutionConfig directly).
> > >
> > > Regards
> > >
> > >
> > >
> > > On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > > > Hi João,
> > > >
> > > > Thanks for creating this FLIP!
> > > > I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> > > >
> > > > Yet I think it's better to let the `ReadableExecutionConfig` extend
> > > > `ExecutionConfig`, because otherwise we have to introduce a new method
> > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
> > > > method may require every `TypeInformation` to implement it, including
> > > > Flink built-in ones and custom ones, otherwise exceptions will happen.
> > > > That goal, however, is pretty hard to achieve.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > João Boto <es...@apache.org> 于2023年2月28日周二 23:34写道:
> > > > >
> > > > > I have update the FLIP with the 2 options that we have discussed..
> > > > >
> > > > > Option 1: Expose ExecutionConfig directly on InitContext
> > > > > this have a minimal impact as we only have to expose the new methods
> > > > >
> > > > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > > > with this option we have more impact as we need to add a new method
> > to TypeInformation and change all implementations (current exists 72
> > implementations)
> > > > >
> > > > > Waiting for feedback or concerns about the two options
> > > >
> >

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Sorry for chiming in late.

I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
directly through Sink#InitContext is the right thing to do.

1. A lot of the read-only getter methods on ExecutionConfig are irrelevant
for sinks. Expanding the scope of the InitContext interface with so many
irrelevant methods is probably going to make writing unit tests a pain.

2. There's actually a few getter methods on `InitContext` that have
duplicate/redundant info for what ExecutionConfig exposes. For example,
InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber
currently exist and it can be confusing if users find 2 sources of that
information (either via the `InitContext` and via the wrapped
`ExecutionConfig`).

All in all, it feels like `Sink#InitContext` was introduced initially as a
means to selectively only expose certain information to sinks.

It looks like right now, the only requirement is that some sinks require 1)
isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it
make sense to follow the original intent and only selectively expose these?
For 1), we can just add a new method to `InitContext` and forward the
information from `ExecutionConfig` accessible at the operator level.
For 2), would it make sense to create the serializer at the operator level
and then provide it through `InitContext`?

Thanks,
Gordon

On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu <re...@gmail.com> wrote:

> We can let the `InitContext` return `ExecutionConfig` in the interface.
> However, a `ReadableExecutionConfig` implementation should be returned
> so that exceptions will be thrown if users tries to modify the
> `ExecutionConfig`.
>
> We can rework all the setters of `ExecutionConfig` to internally invoke a
> `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
> just override that method. But pay attention to a few exceptional
> setters, i.e. those for globalJobParameters and serializers.
>
> We should also explicitly state in the documentation of
> `InitContext #getExecutionConfig()`, that the returned `ExecutionConfig`
> is unmodifiable.
>
> Thanks,
> Zhu
>
> João Boto <es...@apache.org> 于2023年4月17日周一 16:51写道:
> >
> > Hi Zhu,
> >
> > Thanks for you time for reviewing this.
> >
> > Extending ´ExecutionConfig´ will allow to modify the values in the
> config (this is what we want to prevent with Option2)
> >
> > To extend the ExecutionConfig is not simpler to do Option1 (expose
> ExecutionConfig directly).
> >
> > Regards
> >
> >
> >
> > On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > > Hi João,
> > >
> > > Thanks for creating this FLIP!
> > > I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> > >
> > > Yet I think it's better to let the `ReadableExecutionConfig` extend
> > > `ExecutionConfig`, because otherwise we have to introduce a new method
> > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
> > > method may require every `TypeInformation` to implement it, including
> > > Flink built-in ones and custom ones, otherwise exceptions will happen.
> > > That goal, however, is pretty hard to achieve.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > João Boto <es...@apache.org> 于2023年2月28日周二 23:34写道:
> > > >
> > > > I have update the FLIP with the 2 options that we have discussed..
> > > >
> > > > Option 1: Expose ExecutionConfig directly on InitContext
> > > > this have a minimal impact as we only have to expose the new methods
> > > >
> > > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > > with this option we have more impact as we need to add a new method
> to TypeInformation and change all implementations (current exists 72
> implementations)
> > > >
> > > > Waiting for feedback or concerns about the two options
> > >
>

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by Zhu Zhu <re...@gmail.com>.
We can let the `InitContext` return `ExecutionConfig` in the interface.
However, a `ReadableExecutionConfig` implementation should be returned
so that exceptions will be thrown if users tries to modify the
`ExecutionConfig`.

We can rework all the setters of `ExecutionConfig` to internally invoke a
`setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
just override that method. But pay attention to a few exceptional
setters, i.e. those for globalJobParameters and serializers.

We should also explicitly state in the documentation of
`InitContext #getExecutionConfig()`, that the returned `ExecutionConfig`
is unmodifiable.

Thanks,
Zhu

João Boto <es...@apache.org> 于2023年4月17日周一 16:51写道:
>
> Hi Zhu,
>
> Thanks for you time for reviewing this.
>
> Extending ´ExecutionConfig´ will allow to modify the values in the config (this is what we want to prevent with Option2)
>
> To extend the ExecutionConfig is not simpler to do Option1 (expose ExecutionConfig directly).
>
> Regards
>
>
>
> On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > Hi João,
> >
> > Thanks for creating this FLIP!
> > I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> >
> > Yet I think it's better to let the `ReadableExecutionConfig` extend
> > `ExecutionConfig`, because otherwise we have to introduce a new method
> > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
> > method may require every `TypeInformation` to implement it, including
> > Flink built-in ones and custom ones, otherwise exceptions will happen.
> > That goal, however, is pretty hard to achieve.
> >
> > Thanks,
> > Zhu
> >
> > João Boto <es...@apache.org> 于2023年2月28日周二 23:34写道:
> > >
> > > I have update the FLIP with the 2 options that we have discussed..
> > >
> > > Option 1: Expose ExecutionConfig directly on InitContext
> > > this have a minimal impact as we only have to expose the new methods
> > >
> > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > with this option we have more impact as we need to add a new method to TypeInformation and change all implementations (current exists 72 implementations)
> > >
> > > Waiting for feedback or concerns about the two options
> >

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

Posted by João Boto <es...@apache.org>.
Hi Zhu,

Thanks for you time for reviewing this.

Extending ´ExecutionConfig´ will allow to modify the values in the config (this is what we want to prevent with Option2)

To extend the ExecutionConfig is not simpler to do Option1 (expose ExecutionConfig directly).

Regards



On 2023/04/03 09:42:28 Zhu Zhu wrote:
> Hi João,
> 
> Thanks for creating this FLIP!
> I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> 
> Yet I think it's better to let the `ReadableExecutionConfig` extend
> `ExecutionConfig`, because otherwise we have to introduce a new method
> `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
> method may require every `TypeInformation` to implement it, including
> Flink built-in ones and custom ones, otherwise exceptions will happen.
> That goal, however, is pretty hard to achieve.
> 
> Thanks,
> Zhu
> 
> João Boto <es...@apache.org> 于2023年2月28日周二 23:34写道:
> >
> > I have update the FLIP with the 2 options that we have discussed..
> >
> > Option 1: Expose ExecutionConfig directly on InitContext
> > this have a minimal impact as we only have to expose the new methods
> >
> > Option 2: Expose ReadableExecutionConfig on InitContext
> > with this option we have more impact as we need to add a new method to TypeInformation and change all implementations (current exists 72 implementations)
> >
> > Waiting for feedback or concerns about the two options
>