You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jingsong Li <ji...@gmail.com> on 2020/10/09 11:57:08 UTC

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Hi Aljoscha,

I want to separate `Customized parallelism` and `Parallelism inference`.

### Customized parallelism

First, I want to explain the current DataStream parallelism setting:
`env.fromSource(...).setParallelism(...)`.
This is how users explicitly specify parallelism, and it is the only way to
set parallelism.

The underlying Source (Eg.: SourceFunction) is completely independent of
specific parallelism. The peripheral DataStream is responsible for setting
parallelism.
The table layer also needs to provide peer-to-peer capability.

### Parallelism inference

Some sources have the ability to infer parallelism, like Kafka, parallelism
can be inferred from the partition number.

I think you are right, we should provide this to the underlying Source.
This capability must be related to the underlying Source (Eg.:
SourceFunction), so this capability must introduce a new interface for the
underlying Source.

The Table layer just tell underlying Source that user want to open
parallelism inference:

new MyRealSource(path, and, whatnot, parallelismInfer = true)

What do you think?

Best,
Jingsong

On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I'll only respond regarding the parallelism for now because I need to
> think some more about DataStream.
>
> What I'm saying is that exposing a parallelism only for Table Connectors
> is not the right thing. If we want to allow sources to tell the
> system/framework what would be a good parallelism it would be at the
> underlying level.
>
> I'll explain with the SourceFunction. A Table API Source connector is
> basically a factory that will give you a SourceFunction that corresponds
> to whatever the user configured via properties and other means. If the
> Table Connector somehow happens to know what would be a good parallelism
> for the source it could "tell" the source when creating it, i.e.
>
>    new MyRealSource(path, and, whatnot, parallelismHint)
>
> Then the source could either work with that information it got, by
> shutting down (at runtime) some of its parallel instances. Or we could
> extend the Source (SourceFunction) API to expose a "parallelism hint" to
> the system.
>
> The basic thing is that Table Connectors are not the real connectors,
> they just delegate to underlying real connectors. So those underlying
> connectors are where we need to change things. Otherwise we would just
> have special-case solutions for the Table API.
>
> Best,
> Aljoscha
>
> On 25.09.20 14:30, admin wrote:
> > Hi everyone,
> > Thanks for the proposal.
> >
> > In our company,we meet the same situation as @liu shouwei.
> > We developed some features base on flink.Such as parallelism of sql
> source/sink  connector, and kafka delay consumer which is adding a flatmap
> and a keyby transformation after the source Datastream.
> > What make us embarrassing is that when we migrate this features to Flink
> 1.11,we found that the DataSteam is missing,So we modify the blink’s code
> to support parallelism.But kafka delay comsumer is unsolved until now.
> >
> >  From user’s perspective,it necessary to manipulate DataStream or have
> the interoperability between Table API and DataStream.
> >
> > Best
> >
> >
> >
> >> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
> >>
> >> Hi Jingsong,
> >>
> >> Thanks for driving this effort. I have two minor comments.
> >>
> >>
> >>    1. IMHO, parallelism is a concept that applies to all
> ScanTableSource.
> >>    So instead of defining a new interface, is it more natural to
> incorporate
> >>    parallel inference to existing interfaces, e.g. ScanTableSource
> >>    or ScanRuntimeProvider?
> >>    2. `scan.infer-parallelism.enabled` doesn't seem very useful to me.
> From
> >>    a user's perspective, parallelism is either set by
> `scan.parallelism`, or
> >>    automatically decided by Flink. If a user doesn't want the connector
> to
> >>    infer parallelism, he/she can simply set `scan.parallelism`, no?
> >>
> >>
> >> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com>
> wrote:
> >>
> >>> Hi Aljoscha,
> >>>
> >>> Thank you for your feedback,
> >>>
> >>> ## Connector parallelism
> >>>
> >>> Requirements:
> >>> Set parallelism by user specified or inferred by connector.
> >>>
> >>> How to configure parallelism in DataStream:
> >>> In the DataStream world, the only way to configure parallelism is
> >>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to
> have
> >>> access to DataStream when using a connector, not just the
> `SourceFunction`
> >>> / `Source` interface.
> >>> Is parallelism related to connectors? I think yes, there are many
> >>> connectors that can support obtaining parallelism related information
> from
> >>> them, and users do exactly that. This means parallelism inference (From
> >>> connectors).
> >>> The key is that `DataStream` is an open programming API, and users can
> >>> freely program to set parallelism.
> >>>
> >>> How to configure parallelism in Table/SQL:
> >>> But Table/SQL is not an open programming API, every feature needs a
> >>> corresponding mechanism, because the user is no longer able to
> program. Our
> >>> current connector interface: SourceFunctionProvider,
> SinkFunctionProvider,
> >>> through these interfaces, there is no ability to generate connector
> related
> >>> parallelism.
> >>> Back to our original intention: to avoid users directly manipulating
> >>> `DataStream`. Since we want to avoid it, we need to provide
> corresponding
> >>> features.
> >>>
> >>> And parallelism is the runtime information of connectors, It fits the
> name
> >>> of `ScanRuntimeProvider`.
> >>>
> >>>> If we wanted to add a "get parallelism" it would be in those
> underlying
> >>> connectors but I'm also skeptical about adding such a method there
> because
> >>> it is a static assignment and would preclude clever optimizations
> about the
> >>> parallelism of a connector at runtime.
> >>>
> >>> I think that when a job is submitted, it is in compile time. It should
> only
> >>> provide static parallelism.
> >>>
> >>> ## DataStream in table connector
> >>>
> >>> As I said before, if we want to completely cancel DataStream in the
> table
> >>> connector, we need to provide corresponding functions in
> >>> `xxRuntimeProvider`.
> >>> Otherwise, we and users may not be able to migrate the old connectors.
> >>> Including Hive/FileSystem connectors and the user cases I mentioned
> above.
> >>> CC: @liu shouwei
> >>>
> >>> We really need to consider these cases.
> >>> If there is no alternative in a short period of time, for a long
> >>> time, users need to continue to use the old table connector API, which
> has
> >>> been deprecated.
> >>>
> >>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
> >>> - These tables are just temporary tables. Can not be integrated/stored
> into
> >>> Catalog.
> >>> - Creating table DDL can not work...
> >>> - We need to lose the kinds of useful features of Table/SQL on the
> >>> connector. For example, projection pushdown, filter pushdown,
> partitions
> >>> and etc...
> >>>
> >>> But I believe you are right in the long run. The source and sink APIs
> >>> should be powerful enough to cover all reasonable cases.
> >>> Maybe we can just introduce them in a minimal way. For example, we only
> >>> introduce `DataStreamSinkProvider` in planner as an internal API.
> >>>
> >>> Your points are very meaningful, hope to get your reply.
> >>>
> >>> Best,
> >>> Jingsong
> >>>
> >>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <we...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,Aljoscha, I would like to share a use case to second setting
> >>> parallelism
> >>>> of table sink(or limiting parallelism range of table sink): When
> writing
> >>>> data to databases, there is limitation for number of jdbc connections
> and
> >>>> query TPS. we would get errors of too many connections or high load
> for
> >>>> db and poor performance because of too many small requests if the
> >>> optimizer
> >>>> didn't know such information, and set a large parallelism for sink
> when
> >>>> matching the parallelism of its input.
> >>>>
> >>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <al...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Thanks for the proposal! I think the use cases that we are trying to
> >>>>> solve are indeed valid. However, I think we might have to take a step
> >>>>> back to look at what we're trying to solve and how we can solve it.
> >>>>>
> >>>>> The FLIP seems to have two broader topics: 1) add "get parallelism"
> to
> >>>>> sinks/sources 2) let users write DataStream topologies for
> >>>>> sinks/sources. I'll treat them separately below.
> >>>>>
> >>>>> I think we should not add "get parallelism" to the Table Sink API
> >>>>> because I think it's the wrong level of abstraction. The Table API
> >>>>> connectors are (or should be) more or less thin wrappers around
> >>>>> "physical" connectors. By "physical" I mean the underlying (mostly
> >>>>> DataStream API) connectors. For example, with the Kafka Connector the
> >>>>> Table API connector just does the configuration parsing and
> determines
> >>> a
> >>>>> good (de)serialization format and then creates the underlying
> >>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
> >>>>>
> >>>>> If we wanted to add a "get parallelism" it would be in those
> underlying
> >>>>> connectors but I'm also skeptical about adding such a method there
> >>>>> because it is a static assignment and would preclude clever
> >>>>> optimizations about the parallelism of a connector at runtime. But
> >>> maybe
> >>>>> that's thinking too much about future work so I'm open to discussion
> >>>> there.
> >>>>>
> >>>>> Regarding the second point of letting Table connector developers use
> >>>>> DataStream: I think we should not do it. One of the purposes of
> FLIP-95
> >>>>> [1] was to decouple the Table API from the DataStream API for the
> basic
> >>>>> interfaces. Coupling the two too closely at that basic level will
> make
> >>>>> our live harder in the future when we want to evolve those APIs or
> when
> >>>>> we want the system to be better at choosing how to execute sources
> and
> >>>>> sinks. An example of this is actually the past of the Table API.
> Before
> >>>>> FLIP-95 we had connectors that dealt directly with DataSet and
> >>>>> DataStream, meaning that if users wanted their Table Sink to work in
> >>>>> both BATCH and STREAMING mode they had to provide two
> implementations.
> >>>>> The trend is towards unifying the sources/sinks to common interfaces
> >>>>> that can be used for both BATCH and STREAMING execution but, again, I
> >>>>> think exposing DataStream here would be a step back in the wrong
> >>>> direction.
> >>>>>
> >>>>> I think the solution to the existing user requirement of using
> >>>>> DataStream sources and sinks with the Table API should be better
> >>>>> interoperability between the two APIs, which is being tackled right
> now
> >>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> >>>>> we're trying to solve here, maybe we should think about FLIP-136 some
> >>>> more.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Best,
> >>>>> Aljoscha
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >>>>> [2]
> >>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>>
> >>
> >>
> >> --
> >> Best regards!
> >> Rui Li
> >
>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Aljoscha Krettek <al...@apache.org>.
Agreed!

Aljoscha

On 14.10.20 06:38, Jingsong Li wrote:
> Hi Aljoscha,
> 
> Thanks for your feedback.
> 
> Yes, we should add DataStream Providers to the table bridge module.
> 
> I think your concerns are right, including the relationship between
> DataStream and table.
> My understanding is that the parallelism specified by the user is only the
> initialization parallelism. After that, the framework still has certain
> freedom, which is just like the user's personalized intervention on the
> optimizer.
> 
> At present, the final state of the source parallelism setting is not clear
> (DataStream is not ready, and 1.12 is imminent). So I consider shelving the
> parallelism of source. Let's focus on the parallelism setting of sink and
> rest of this FLIP. (users specify the parallelism on sink)
> And we can confirm that users can migrate their connectors to new table
> source/sink interfaces in Flink 1.12.
> What do you think?
> 
> Best,
> Jingsong
> 
> On Tue, Oct 13, 2020 at 5:01 PM Aljoscha Krettek <al...@apache.org>
> wrote:
> 
>> Hi Jingsong,
>>
>> I'm sorry, I didn't want to block you for so long on this. I thought
>> about it again.
>>
>> I think it's fine to add a DataStream Provider if this really unblocks
>> users from migrating to newer Flink versions. I'm guessing you will add
>> that to the table bridge module?
>>
>> Regarding the parallelism: I see your point of letting users set that
>> explicitly. I'm still skeptical about it but I also think it wasn't such
>> a good idea to let users specify the parallelism of individual
>> operations in the DataStream API because it again takes freedom away
>> from the framework. So if it's really sth that users need we should go
>> ahead.
>>
>> Best,
>> Aljoscha
>>
>> On 09.10.20 13:57, Jingsong Li wrote:
>>> Hi Aljoscha,
>>>
>>> I want to separate `Customized parallelism` and `Parallelism inference`.
>>>
>>> ### Customized parallelism
>>>
>>> First, I want to explain the current DataStream parallelism setting:
>>> `env.fromSource(...).setParallelism(...)`.
>>> This is how users explicitly specify parallelism, and it is the only way
>> to
>>> set parallelism.
>>>
>>> The underlying Source (Eg.: SourceFunction) is completely independent of
>>> specific parallelism. The peripheral DataStream is responsible for
>> setting
>>> parallelism.
>>> The table layer also needs to provide peer-to-peer capability.
>>>
>>> ### Parallelism inference
>>>
>>> Some sources have the ability to infer parallelism, like Kafka,
>> parallelism
>>> can be inferred from the partition number.
>>>
>>> I think you are right, we should provide this to the underlying Source.
>>> This capability must be related to the underlying Source (Eg.:
>>> SourceFunction), so this capability must introduce a new interface for
>> the
>>> underlying Source.
>>>
>>> The Table layer just tell underlying Source that user want to open
>>> parallelism inference:
>>>
>>> new MyRealSource(path, and, whatnot, parallelismInfer = true)
>>>
>>> What do you think?
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'll only respond regarding the parallelism for now because I need to
>>>> think some more about DataStream.
>>>>
>>>> What I'm saying is that exposing a parallelism only for Table Connectors
>>>> is not the right thing. If we want to allow sources to tell the
>>>> system/framework what would be a good parallelism it would be at the
>>>> underlying level.
>>>>
>>>> I'll explain with the SourceFunction. A Table API Source connector is
>>>> basically a factory that will give you a SourceFunction that corresponds
>>>> to whatever the user configured via properties and other means. If the
>>>> Table Connector somehow happens to know what would be a good parallelism
>>>> for the source it could "tell" the source when creating it, i.e.
>>>>
>>>>      new MyRealSource(path, and, whatnot, parallelismHint)
>>>>
>>>> Then the source could either work with that information it got, by
>>>> shutting down (at runtime) some of its parallel instances. Or we could
>>>> extend the Source (SourceFunction) API to expose a "parallelism hint" to
>>>> the system.
>>>>
>>>> The basic thing is that Table Connectors are not the real connectors,
>>>> they just delegate to underlying real connectors. So those underlying
>>>> connectors are where we need to change things. Otherwise we would just
>>>> have special-case solutions for the Table API.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 25.09.20 14:30, admin wrote:
>>>>> Hi everyone,
>>>>> Thanks for the proposal.
>>>>>
>>>>> In our company,we meet the same situation as @liu shouwei.
>>>>> We developed some features base on flink.Such as parallelism of sql
>>>> source/sink  connector, and kafka delay consumer which is adding a
>> flatmap
>>>> and a keyby transformation after the source Datastream.
>>>>> What make us embarrassing is that when we migrate this features to
>> Flink
>>>> 1.11,we found that the DataSteam is missing,So we modify the blink’s
>> code
>>>> to support parallelism.But kafka delay comsumer is unsolved until now.
>>>>>
>>>>>    From user’s perspective,it necessary to manipulate DataStream or have
>>>> the interoperability between Table API and DataStream.
>>>>>
>>>>> Best
>>>>>
>>>>>
>>>>>
>>>>>> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
>>>>>>
>>>>>> Hi Jingsong,
>>>>>>
>>>>>> Thanks for driving this effort. I have two minor comments.
>>>>>>
>>>>>>
>>>>>>      1. IMHO, parallelism is a concept that applies to all
>>>> ScanTableSource.
>>>>>>      So instead of defining a new interface, is it more natural to
>>>> incorporate
>>>>>>      parallel inference to existing interfaces, e.g. ScanTableSource
>>>>>>      or ScanRuntimeProvider?
>>>>>>      2. `scan.infer-parallelism.enabled` doesn't seem very useful to
>> me.
>>>> From
>>>>>>      a user's perspective, parallelism is either set by
>>>> `scan.parallelism`, or
>>>>>>      automatically decided by Flink. If a user doesn't want the
>> connector
>>>> to
>>>>>>      infer parallelism, he/she can simply set `scan.parallelism`, no?
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> Thank you for your feedback,
>>>>>>>
>>>>>>> ## Connector parallelism
>>>>>>>
>>>>>>> Requirements:
>>>>>>> Set parallelism by user specified or inferred by connector.
>>>>>>>
>>>>>>> How to configure parallelism in DataStream:
>>>>>>> In the DataStream world, the only way to configure parallelism is
>>>>>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to
>>>> have
>>>>>>> access to DataStream when using a connector, not just the
>>>> `SourceFunction`
>>>>>>> / `Source` interface.
>>>>>>> Is parallelism related to connectors? I think yes, there are many
>>>>>>> connectors that can support obtaining parallelism related information
>>>> from
>>>>>>> them, and users do exactly that. This means parallelism inference
>> (From
>>>>>>> connectors).
>>>>>>> The key is that `DataStream` is an open programming API, and users
>> can
>>>>>>> freely program to set parallelism.
>>>>>>>
>>>>>>> How to configure parallelism in Table/SQL:
>>>>>>> But Table/SQL is not an open programming API, every feature needs a
>>>>>>> corresponding mechanism, because the user is no longer able to
>>>> program. Our
>>>>>>> current connector interface: SourceFunctionProvider,
>>>> SinkFunctionProvider,
>>>>>>> through these interfaces, there is no ability to generate connector
>>>> related
>>>>>>> parallelism.
>>>>>>> Back to our original intention: to avoid users directly manipulating
>>>>>>> `DataStream`. Since we want to avoid it, we need to provide
>>>> corresponding
>>>>>>> features.
>>>>>>>
>>>>>>> And parallelism is the runtime information of connectors, It fits the
>>>> name
>>>>>>> of `ScanRuntimeProvider`.
>>>>>>>
>>>>>>>> If we wanted to add a "get parallelism" it would be in those
>>>> underlying
>>>>>>> connectors but I'm also skeptical about adding such a method there
>>>> because
>>>>>>> it is a static assignment and would preclude clever optimizations
>>>> about the
>>>>>>> parallelism of a connector at runtime.
>>>>>>>
>>>>>>> I think that when a job is submitted, it is in compile time. It
>> should
>>>> only
>>>>>>> provide static parallelism.
>>>>>>>
>>>>>>> ## DataStream in table connector
>>>>>>>
>>>>>>> As I said before, if we want to completely cancel DataStream in the
>>>> table
>>>>>>> connector, we need to provide corresponding functions in
>>>>>>> `xxRuntimeProvider`.
>>>>>>> Otherwise, we and users may not be able to migrate the old
>> connectors.
>>>>>>> Including Hive/FileSystem connectors and the user cases I mentioned
>>>> above.
>>>>>>> CC: @liu shouwei
>>>>>>>
>>>>>>> We really need to consider these cases.
>>>>>>> If there is no alternative in a short period of time, for a long
>>>>>>> time, users need to continue to use the old table connector API,
>> which
>>>> has
>>>>>>> been deprecated.
>>>>>>>
>>>>>>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
>>>>>>> - These tables are just temporary tables. Can not be
>> integrated/stored
>>>> into
>>>>>>> Catalog.
>>>>>>> - Creating table DDL can not work...
>>>>>>> - We need to lose the kinds of useful features of Table/SQL on the
>>>>>>> connector. For example, projection pushdown, filter pushdown,
>>>> partitions
>>>>>>> and etc...
>>>>>>>
>>>>>>> But I believe you are right in the long run. The source and sink APIs
>>>>>>> should be powerful enough to cover all reasonable cases.
>>>>>>> Maybe we can just introduce them in a minimal way. For example, we
>> only
>>>>>>> introduce `DataStreamSinkProvider` in planner as an internal API.
>>>>>>>
>>>>>>> Your points are very meaningful, hope to get your reply.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong
>>>>>>>
>>>>>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <
>> wenlong88.lwl@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,Aljoscha, I would like to share a use case to second setting
>>>>>>> parallelism
>>>>>>>> of table sink(or limiting parallelism range of table sink): When
>>>> writing
>>>>>>>> data to databases, there is limitation for number of jdbc
>> connections
>>>> and
>>>>>>>> query TPS. we would get errors of too many connections or high load
>>>> for
>>>>>>>> db and poor performance because of too many small requests if the
>>>>>>> optimizer
>>>>>>>> didn't know such information, and set a large parallelism for sink
>>>> when
>>>>>>>> matching the parallelism of its input.
>>>>>>>>
>>>>>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljoscha@apache.org
>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the proposal! I think the use cases that we are trying
>> to
>>>>>>>>> solve are indeed valid. However, I think we might have to take a
>> step
>>>>>>>>> back to look at what we're trying to solve and how we can solve it.
>>>>>>>>>
>>>>>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism"
>>>> to
>>>>>>>>> sinks/sources 2) let users write DataStream topologies for
>>>>>>>>> sinks/sources. I'll treat them separately below.
>>>>>>>>>
>>>>>>>>> I think we should not add "get parallelism" to the Table Sink API
>>>>>>>>> because I think it's the wrong level of abstraction. The Table API
>>>>>>>>> connectors are (or should be) more or less thin wrappers around
>>>>>>>>> "physical" connectors. By "physical" I mean the underlying (mostly
>>>>>>>>> DataStream API) connectors. For example, with the Kafka Connector
>> the
>>>>>>>>> Table API connector just does the configuration parsing and
>>>> determines
>>>>>>> a
>>>>>>>>> good (de)serialization format and then creates the underlying
>>>>>>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
>>>>>>>>>
>>>>>>>>> If we wanted to add a "get parallelism" it would be in those
>>>> underlying
>>>>>>>>> connectors but I'm also skeptical about adding such a method there
>>>>>>>>> because it is a static assignment and would preclude clever
>>>>>>>>> optimizations about the parallelism of a connector at runtime. But
>>>>>>> maybe
>>>>>>>>> that's thinking too much about future work so I'm open to
>> discussion
>>>>>>>> there.
>>>>>>>>>
>>>>>>>>> Regarding the second point of letting Table connector developers
>> use
>>>>>>>>> DataStream: I think we should not do it. One of the purposes of
>>>> FLIP-95
>>>>>>>>> [1] was to decouple the Table API from the DataStream API for the
>>>> basic
>>>>>>>>> interfaces. Coupling the two too closely at that basic level will
>>>> make
>>>>>>>>> our live harder in the future when we want to evolve those APIs or
>>>> when
>>>>>>>>> we want the system to be better at choosing how to execute sources
>>>> and
>>>>>>>>> sinks. An example of this is actually the past of the Table API.
>>>> Before
>>>>>>>>> FLIP-95 we had connectors that dealt directly with DataSet and
>>>>>>>>> DataStream, meaning that if users wanted their Table Sink to work
>> in
>>>>>>>>> both BATCH and STREAMING mode they had to provide two
>>>> implementations.
>>>>>>>>> The trend is towards unifying the sources/sinks to common
>> interfaces
>>>>>>>>> that can be used for both BATCH and STREAMING execution but,
>> again, I
>>>>>>>>> think exposing DataStream here would be a step back in the wrong
>>>>>>>> direction.
>>>>>>>>>
>>>>>>>>> I think the solution to the existing user requirement of using
>>>>>>>>> DataStream sources and sinks with the Table API should be better
>>>>>>>>> interoperability between the two APIs, which is being tackled right
>>>> now
>>>>>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
>>>>>>>>> we're trying to solve here, maybe we should think about FLIP-136
>> some
>>>>>>>> more.
>>>>>>>>>
>>>>>>>>> What do you think?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>> [2]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards!
>>>>>> Rui Li
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Jingsong Li <ji...@gmail.com>.
Hi Aljoscha,

Thanks for your feedback.

Yes, we should add DataStream Providers to the table bridge module.

I think your concerns are right, including the relationship between
DataStream and table.
My understanding is that the parallelism specified by the user is only the
initialization parallelism. After that, the framework still has certain
freedom, which is just like the user's personalized intervention on the
optimizer.

At present, the final state of the source parallelism setting is not clear
(DataStream is not ready, and 1.12 is imminent). So I consider shelving the
parallelism of source. Let's focus on the parallelism setting of sink and
rest of this FLIP. (users specify the parallelism on sink)
And we can confirm that users can migrate their connectors to new table
source/sink interfaces in Flink 1.12.
What do you think?

Best,
Jingsong

On Tue, Oct 13, 2020 at 5:01 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Jingsong,
>
> I'm sorry, I didn't want to block you for so long on this. I thought
> about it again.
>
> I think it's fine to add a DataStream Provider if this really unblocks
> users from migrating to newer Flink versions. I'm guessing you will add
> that to the table bridge module?
>
> Regarding the parallelism: I see your point of letting users set that
> explicitly. I'm still skeptical about it but I also think it wasn't such
> a good idea to let users specify the parallelism of individual
> operations in the DataStream API because it again takes freedom away
> from the framework. So if it's really sth that users need we should go
> ahead.
>
> Best,
> Aljoscha
>
> On 09.10.20 13:57, Jingsong Li wrote:
> > Hi Aljoscha,
> >
> > I want to separate `Customized parallelism` and `Parallelism inference`.
> >
> > ### Customized parallelism
> >
> > First, I want to explain the current DataStream parallelism setting:
> > `env.fromSource(...).setParallelism(...)`.
> > This is how users explicitly specify parallelism, and it is the only way
> to
> > set parallelism.
> >
> > The underlying Source (Eg.: SourceFunction) is completely independent of
> > specific parallelism. The peripheral DataStream is responsible for
> setting
> > parallelism.
> > The table layer also needs to provide peer-to-peer capability.
> >
> > ### Parallelism inference
> >
> > Some sources have the ability to infer parallelism, like Kafka,
> parallelism
> > can be inferred from the partition number.
> >
> > I think you are right, we should provide this to the underlying Source.
> > This capability must be related to the underlying Source (Eg.:
> > SourceFunction), so this capability must introduce a new interface for
> the
> > underlying Source.
> >
> > The Table layer just tell underlying Source that user want to open
> > parallelism inference:
> >
> > new MyRealSource(path, and, whatnot, parallelismInfer = true)
> >
> > What do you think?
> >
> > Best,
> > Jingsong
> >
> > On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> I'll only respond regarding the parallelism for now because I need to
> >> think some more about DataStream.
> >>
> >> What I'm saying is that exposing a parallelism only for Table Connectors
> >> is not the right thing. If we want to allow sources to tell the
> >> system/framework what would be a good parallelism it would be at the
> >> underlying level.
> >>
> >> I'll explain with the SourceFunction. A Table API Source connector is
> >> basically a factory that will give you a SourceFunction that corresponds
> >> to whatever the user configured via properties and other means. If the
> >> Table Connector somehow happens to know what would be a good parallelism
> >> for the source it could "tell" the source when creating it, i.e.
> >>
> >>     new MyRealSource(path, and, whatnot, parallelismHint)
> >>
> >> Then the source could either work with that information it got, by
> >> shutting down (at runtime) some of its parallel instances. Or we could
> >> extend the Source (SourceFunction) API to expose a "parallelism hint" to
> >> the system.
> >>
> >> The basic thing is that Table Connectors are not the real connectors,
> >> they just delegate to underlying real connectors. So those underlying
> >> connectors are where we need to change things. Otherwise we would just
> >> have special-case solutions for the Table API.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 25.09.20 14:30, admin wrote:
> >>> Hi everyone,
> >>> Thanks for the proposal.
> >>>
> >>> In our company,we meet the same situation as @liu shouwei.
> >>> We developed some features base on flink.Such as parallelism of sql
> >> source/sink  connector, and kafka delay consumer which is adding a
> flatmap
> >> and a keyby transformation after the source Datastream.
> >>> What make us embarrassing is that when we migrate this features to
> Flink
> >> 1.11,we found that the DataSteam is missing,So we modify the blink’s
> code
> >> to support parallelism.But kafka delay comsumer is unsolved until now.
> >>>
> >>>   From user’s perspective,it necessary to manipulate DataStream or have
> >> the interoperability between Table API and DataStream.
> >>>
> >>> Best
> >>>
> >>>
> >>>
> >>>> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
> >>>>
> >>>> Hi Jingsong,
> >>>>
> >>>> Thanks for driving this effort. I have two minor comments.
> >>>>
> >>>>
> >>>>     1. IMHO, parallelism is a concept that applies to all
> >> ScanTableSource.
> >>>>     So instead of defining a new interface, is it more natural to
> >> incorporate
> >>>>     parallel inference to existing interfaces, e.g. ScanTableSource
> >>>>     or ScanRuntimeProvider?
> >>>>     2. `scan.infer-parallelism.enabled` doesn't seem very useful to
> me.
> >> From
> >>>>     a user's perspective, parallelism is either set by
> >> `scan.parallelism`, or
> >>>>     automatically decided by Flink. If a user doesn't want the
> connector
> >> to
> >>>>     infer parallelism, he/she can simply set `scan.parallelism`, no?
> >>>>
> >>>>
> >>>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi Aljoscha,
> >>>>>
> >>>>> Thank you for your feedback,
> >>>>>
> >>>>> ## Connector parallelism
> >>>>>
> >>>>> Requirements:
> >>>>> Set parallelism by user specified or inferred by connector.
> >>>>>
> >>>>> How to configure parallelism in DataStream:
> >>>>> In the DataStream world, the only way to configure parallelism is
> >>>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to
> >> have
> >>>>> access to DataStream when using a connector, not just the
> >> `SourceFunction`
> >>>>> / `Source` interface.
> >>>>> Is parallelism related to connectors? I think yes, there are many
> >>>>> connectors that can support obtaining parallelism related information
> >> from
> >>>>> them, and users do exactly that. This means parallelism inference
> (From
> >>>>> connectors).
> >>>>> The key is that `DataStream` is an open programming API, and users
> can
> >>>>> freely program to set parallelism.
> >>>>>
> >>>>> How to configure parallelism in Table/SQL:
> >>>>> But Table/SQL is not an open programming API, every feature needs a
> >>>>> corresponding mechanism, because the user is no longer able to
> >> program. Our
> >>>>> current connector interface: SourceFunctionProvider,
> >> SinkFunctionProvider,
> >>>>> through these interfaces, there is no ability to generate connector
> >> related
> >>>>> parallelism.
> >>>>> Back to our original intention: to avoid users directly manipulating
> >>>>> `DataStream`. Since we want to avoid it, we need to provide
> >> corresponding
> >>>>> features.
> >>>>>
> >>>>> And parallelism is the runtime information of connectors, It fits the
> >> name
> >>>>> of `ScanRuntimeProvider`.
> >>>>>
> >>>>>> If we wanted to add a "get parallelism" it would be in those
> >> underlying
> >>>>> connectors but I'm also skeptical about adding such a method there
> >> because
> >>>>> it is a static assignment and would preclude clever optimizations
> >> about the
> >>>>> parallelism of a connector at runtime.
> >>>>>
> >>>>> I think that when a job is submitted, it is in compile time. It
> should
> >> only
> >>>>> provide static parallelism.
> >>>>>
> >>>>> ## DataStream in table connector
> >>>>>
> >>>>> As I said before, if we want to completely cancel DataStream in the
> >> table
> >>>>> connector, we need to provide corresponding functions in
> >>>>> `xxRuntimeProvider`.
> >>>>> Otherwise, we and users may not be able to migrate the old
> connectors.
> >>>>> Including Hive/FileSystem connectors and the user cases I mentioned
> >> above.
> >>>>> CC: @liu shouwei
> >>>>>
> >>>>> We really need to consider these cases.
> >>>>> If there is no alternative in a short period of time, for a long
> >>>>> time, users need to continue to use the old table connector API,
> which
> >> has
> >>>>> been deprecated.
> >>>>>
> >>>>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
> >>>>> - These tables are just temporary tables. Can not be
> integrated/stored
> >> into
> >>>>> Catalog.
> >>>>> - Creating table DDL can not work...
> >>>>> - We need to lose the kinds of useful features of Table/SQL on the
> >>>>> connector. For example, projection pushdown, filter pushdown,
> >> partitions
> >>>>> and etc...
> >>>>>
> >>>>> But I believe you are right in the long run. The source and sink APIs
> >>>>> should be powerful enough to cover all reasonable cases.
> >>>>> Maybe we can just introduce them in a minimal way. For example, we
> only
> >>>>> introduce `DataStreamSinkProvider` in planner as an internal API.
> >>>>>
> >>>>> Your points are very meaningful, hope to get your reply.
> >>>>>
> >>>>> Best,
> >>>>> Jingsong
> >>>>>
> >>>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <
> wenlong88.lwl@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,Aljoscha, I would like to share a use case to second setting
> >>>>> parallelism
> >>>>>> of table sink(or limiting parallelism range of table sink): When
> >> writing
> >>>>>> data to databases, there is limitation for number of jdbc
> connections
> >> and
> >>>>>> query TPS. we would get errors of too many connections or high load
> >> for
> >>>>>> db and poor performance because of too many small requests if the
> >>>>> optimizer
> >>>>>> didn't know such information, and set a large parallelism for sink
> >> when
> >>>>>> matching the parallelism of its input.
> >>>>>>
> >>>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljoscha@apache.org
> >
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the proposal! I think the use cases that we are trying
> to
> >>>>>>> solve are indeed valid. However, I think we might have to take a
> step
> >>>>>>> back to look at what we're trying to solve and how we can solve it.
> >>>>>>>
> >>>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism"
> >> to
> >>>>>>> sinks/sources 2) let users write DataStream topologies for
> >>>>>>> sinks/sources. I'll treat them separately below.
> >>>>>>>
> >>>>>>> I think we should not add "get parallelism" to the Table Sink API
> >>>>>>> because I think it's the wrong level of abstraction. The Table API
> >>>>>>> connectors are (or should be) more or less thin wrappers around
> >>>>>>> "physical" connectors. By "physical" I mean the underlying (mostly
> >>>>>>> DataStream API) connectors. For example, with the Kafka Connector
> the
> >>>>>>> Table API connector just does the configuration parsing and
> >> determines
> >>>>> a
> >>>>>>> good (de)serialization format and then creates the underlying
> >>>>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
> >>>>>>>
> >>>>>>> If we wanted to add a "get parallelism" it would be in those
> >> underlying
> >>>>>>> connectors but I'm also skeptical about adding such a method there
> >>>>>>> because it is a static assignment and would preclude clever
> >>>>>>> optimizations about the parallelism of a connector at runtime. But
> >>>>> maybe
> >>>>>>> that's thinking too much about future work so I'm open to
> discussion
> >>>>>> there.
> >>>>>>>
> >>>>>>> Regarding the second point of letting Table connector developers
> use
> >>>>>>> DataStream: I think we should not do it. One of the purposes of
> >> FLIP-95
> >>>>>>> [1] was to decouple the Table API from the DataStream API for the
> >> basic
> >>>>>>> interfaces. Coupling the two too closely at that basic level will
> >> make
> >>>>>>> our live harder in the future when we want to evolve those APIs or
> >> when
> >>>>>>> we want the system to be better at choosing how to execute sources
> >> and
> >>>>>>> sinks. An example of this is actually the past of the Table API.
> >> Before
> >>>>>>> FLIP-95 we had connectors that dealt directly with DataSet and
> >>>>>>> DataStream, meaning that if users wanted their Table Sink to work
> in
> >>>>>>> both BATCH and STREAMING mode they had to provide two
> >> implementations.
> >>>>>>> The trend is towards unifying the sources/sinks to common
> interfaces
> >>>>>>> that can be used for both BATCH and STREAMING execution but,
> again, I
> >>>>>>> think exposing DataStream here would be a step back in the wrong
> >>>>>> direction.
> >>>>>>>
> >>>>>>> I think the solution to the existing user requirement of using
> >>>>>>> DataStream sources and sinks with the Table API should be better
> >>>>>>> interoperability between the two APIs, which is being tackled right
> >> now
> >>>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> >>>>>>> we're trying to solve here, maybe we should think about FLIP-136
> some
> >>>>>> more.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >>>>>>> [2]
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Best, Jingsong Lee
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Best regards!
> >>>> Rui Li
> >>>
> >>
> >>
> >
>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Jingsong,

I'm sorry, I didn't want to block you for so long on this. I thought 
about it again.

I think it's fine to add a DataStream Provider if this really unblocks 
users from migrating to newer Flink versions. I'm guessing you will add 
that to the table bridge module?

Regarding the parallelism: I see your point of letting users set that 
explicitly. I'm still skeptical about it but I also think it wasn't such 
a good idea to let users specify the parallelism of individual 
operations in the DataStream API because it again takes freedom away 
from the framework. So if it's really sth that users need we should go 
ahead.

Best,
Aljoscha

On 09.10.20 13:57, Jingsong Li wrote:
> Hi Aljoscha,
> 
> I want to separate `Customized parallelism` and `Parallelism inference`.
> 
> ### Customized parallelism
> 
> First, I want to explain the current DataStream parallelism setting:
> `env.fromSource(...).setParallelism(...)`.
> This is how users explicitly specify parallelism, and it is the only way to
> set parallelism.
> 
> The underlying Source (Eg.: SourceFunction) is completely independent of
> specific parallelism. The peripheral DataStream is responsible for setting
> parallelism.
> The table layer also needs to provide peer-to-peer capability.
> 
> ### Parallelism inference
> 
> Some sources have the ability to infer parallelism, like Kafka, parallelism
> can be inferred from the partition number.
> 
> I think you are right, we should provide this to the underlying Source.
> This capability must be related to the underlying Source (Eg.:
> SourceFunction), so this capability must introduce a new interface for the
> underlying Source.
> 
> The Table layer just tell underlying Source that user want to open
> parallelism inference:
> 
> new MyRealSource(path, and, whatnot, parallelismInfer = true)
> 
> What do you think?
> 
> Best,
> Jingsong
> 
> On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <al...@apache.org>
> wrote:
> 
>> Hi,
>>
>> I'll only respond regarding the parallelism for now because I need to
>> think some more about DataStream.
>>
>> What I'm saying is that exposing a parallelism only for Table Connectors
>> is not the right thing. If we want to allow sources to tell the
>> system/framework what would be a good parallelism it would be at the
>> underlying level.
>>
>> I'll explain with the SourceFunction. A Table API Source connector is
>> basically a factory that will give you a SourceFunction that corresponds
>> to whatever the user configured via properties and other means. If the
>> Table Connector somehow happens to know what would be a good parallelism
>> for the source it could "tell" the source when creating it, i.e.
>>
>>     new MyRealSource(path, and, whatnot, parallelismHint)
>>
>> Then the source could either work with that information it got, by
>> shutting down (at runtime) some of its parallel instances. Or we could
>> extend the Source (SourceFunction) API to expose a "parallelism hint" to
>> the system.
>>
>> The basic thing is that Table Connectors are not the real connectors,
>> they just delegate to underlying real connectors. So those underlying
>> connectors are where we need to change things. Otherwise we would just
>> have special-case solutions for the Table API.
>>
>> Best,
>> Aljoscha
>>
>> On 25.09.20 14:30, admin wrote:
>>> Hi everyone,
>>> Thanks for the proposal.
>>>
>>> In our company,we meet the same situation as @liu shouwei.
>>> We developed some features base on flink.Such as parallelism of sql
>> source/sink  connector, and kafka delay consumer which is adding a flatmap
>> and a keyby transformation after the source Datastream.
>>> What make us embarrassing is that when we migrate this features to Flink
>> 1.11,we found that the DataSteam is missing,So we modify the blink’s code
>> to support parallelism.But kafka delay comsumer is unsolved until now.
>>>
>>>   From user’s perspective,it necessary to manipulate DataStream or have
>> the interoperability between Table API and DataStream.
>>>
>>> Best
>>>
>>>
>>>
>>>> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
>>>>
>>>> Hi Jingsong,
>>>>
>>>> Thanks for driving this effort. I have two minor comments.
>>>>
>>>>
>>>>     1. IMHO, parallelism is a concept that applies to all
>> ScanTableSource.
>>>>     So instead of defining a new interface, is it more natural to
>> incorporate
>>>>     parallel inference to existing interfaces, e.g. ScanTableSource
>>>>     or ScanRuntimeProvider?
>>>>     2. `scan.infer-parallelism.enabled` doesn't seem very useful to me.
>> From
>>>>     a user's perspective, parallelism is either set by
>> `scan.parallelism`, or
>>>>     automatically decided by Flink. If a user doesn't want the connector
>> to
>>>>     infer parallelism, he/she can simply set `scan.parallelism`, no?
>>>>
>>>>
>>>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com>
>> wrote:
>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> Thank you for your feedback,
>>>>>
>>>>> ## Connector parallelism
>>>>>
>>>>> Requirements:
>>>>> Set parallelism by user specified or inferred by connector.
>>>>>
>>>>> How to configure parallelism in DataStream:
>>>>> In the DataStream world, the only way to configure parallelism is
>>>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to
>> have
>>>>> access to DataStream when using a connector, not just the
>> `SourceFunction`
>>>>> / `Source` interface.
>>>>> Is parallelism related to connectors? I think yes, there are many
>>>>> connectors that can support obtaining parallelism related information
>> from
>>>>> them, and users do exactly that. This means parallelism inference (From
>>>>> connectors).
>>>>> The key is that `DataStream` is an open programming API, and users can
>>>>> freely program to set parallelism.
>>>>>
>>>>> How to configure parallelism in Table/SQL:
>>>>> But Table/SQL is not an open programming API, every feature needs a
>>>>> corresponding mechanism, because the user is no longer able to
>> program. Our
>>>>> current connector interface: SourceFunctionProvider,
>> SinkFunctionProvider,
>>>>> through these interfaces, there is no ability to generate connector
>> related
>>>>> parallelism.
>>>>> Back to our original intention: to avoid users directly manipulating
>>>>> `DataStream`. Since we want to avoid it, we need to provide
>> corresponding
>>>>> features.
>>>>>
>>>>> And parallelism is the runtime information of connectors, It fits the
>> name
>>>>> of `ScanRuntimeProvider`.
>>>>>
>>>>>> If we wanted to add a "get parallelism" it would be in those
>> underlying
>>>>> connectors but I'm also skeptical about adding such a method there
>> because
>>>>> it is a static assignment and would preclude clever optimizations
>> about the
>>>>> parallelism of a connector at runtime.
>>>>>
>>>>> I think that when a job is submitted, it is in compile time. It should
>> only
>>>>> provide static parallelism.
>>>>>
>>>>> ## DataStream in table connector
>>>>>
>>>>> As I said before, if we want to completely cancel DataStream in the
>> table
>>>>> connector, we need to provide corresponding functions in
>>>>> `xxRuntimeProvider`.
>>>>> Otherwise, we and users may not be able to migrate the old connectors.
>>>>> Including Hive/FileSystem connectors and the user cases I mentioned
>> above.
>>>>> CC: @liu shouwei
>>>>>
>>>>> We really need to consider these cases.
>>>>> If there is no alternative in a short period of time, for a long
>>>>> time, users need to continue to use the old table connector API, which
>> has
>>>>> been deprecated.
>>>>>
>>>>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
>>>>> - These tables are just temporary tables. Can not be integrated/stored
>> into
>>>>> Catalog.
>>>>> - Creating table DDL can not work...
>>>>> - We need to lose the kinds of useful features of Table/SQL on the
>>>>> connector. For example, projection pushdown, filter pushdown,
>> partitions
>>>>> and etc...
>>>>>
>>>>> But I believe you are right in the long run. The source and sink APIs
>>>>> should be powerful enough to cover all reasonable cases.
>>>>> Maybe we can just introduce them in a minimal way. For example, we only
>>>>> introduce `DataStreamSinkProvider` in planner as an internal API.
>>>>>
>>>>> Your points are very meaningful, hope to get your reply.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <we...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,Aljoscha, I would like to share a use case to second setting
>>>>> parallelism
>>>>>> of table sink(or limiting parallelism range of table sink): When
>> writing
>>>>>> data to databases, there is limitation for number of jdbc connections
>> and
>>>>>> query TPS. we would get errors of too many connections or high load
>> for
>>>>>> db and poor performance because of too many small requests if the
>>>>> optimizer
>>>>>> didn't know such information, and set a large parallelism for sink
>> when
>>>>>> matching the parallelism of its input.
>>>>>>
>>>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <al...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the proposal! I think the use cases that we are trying to
>>>>>>> solve are indeed valid. However, I think we might have to take a step
>>>>>>> back to look at what we're trying to solve and how we can solve it.
>>>>>>>
>>>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism"
>> to
>>>>>>> sinks/sources 2) let users write DataStream topologies for
>>>>>>> sinks/sources. I'll treat them separately below.
>>>>>>>
>>>>>>> I think we should not add "get parallelism" to the Table Sink API
>>>>>>> because I think it's the wrong level of abstraction. The Table API
>>>>>>> connectors are (or should be) more or less thin wrappers around
>>>>>>> "physical" connectors. By "physical" I mean the underlying (mostly
>>>>>>> DataStream API) connectors. For example, with the Kafka Connector the
>>>>>>> Table API connector just does the configuration parsing and
>> determines
>>>>> a
>>>>>>> good (de)serialization format and then creates the underlying
>>>>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
>>>>>>>
>>>>>>> If we wanted to add a "get parallelism" it would be in those
>> underlying
>>>>>>> connectors but I'm also skeptical about adding such a method there
>>>>>>> because it is a static assignment and would preclude clever
>>>>>>> optimizations about the parallelism of a connector at runtime. But
>>>>> maybe
>>>>>>> that's thinking too much about future work so I'm open to discussion
>>>>>> there.
>>>>>>>
>>>>>>> Regarding the second point of letting Table connector developers use
>>>>>>> DataStream: I think we should not do it. One of the purposes of
>> FLIP-95
>>>>>>> [1] was to decouple the Table API from the DataStream API for the
>> basic
>>>>>>> interfaces. Coupling the two too closely at that basic level will
>> make
>>>>>>> our live harder in the future when we want to evolve those APIs or
>> when
>>>>>>> we want the system to be better at choosing how to execute sources
>> and
>>>>>>> sinks. An example of this is actually the past of the Table API.
>> Before
>>>>>>> FLIP-95 we had connectors that dealt directly with DataSet and
>>>>>>> DataStream, meaning that if users wanted their Table Sink to work in
>>>>>>> both BATCH and STREAMING mode they had to provide two
>> implementations.
>>>>>>> The trend is towards unifying the sources/sinks to common interfaces
>>>>>>> that can be used for both BATCH and STREAMING execution but, again, I
>>>>>>> think exposing DataStream here would be a step back in the wrong
>>>>>> direction.
>>>>>>>
>>>>>>> I think the solution to the existing user requirement of using
>>>>>>> DataStream sources and sinks with the Table API should be better
>>>>>>> interoperability between the two APIs, which is being tackled right
>> now
>>>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
>>>>>>> we're trying to solve here, maybe we should think about FLIP-136 some
>>>>>> more.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>> [2]
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards!
>>>> Rui Li
>>>
>>
>>
>