You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jingsong Li <ji...@gmail.com> on 2020/09/23 10:22:51 UTC

[DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Hi all,

I'd like to start a discussion about improving the new TableSource and
TableSink interfaces.

Most connectors have been migrated to FLIP-95, but there are still the
Filesystem and Hive that have not been migrated. They have some
requirements on table connector API. And users also have some additional
requirements:
- Some connectors have the ability to infer parallelism, the parallelism is
good for most cases.
- Users have customized parallelism configuration requirements for source
and sink.
- The connectors need to use topology to build their source/sink instead of
a single function. Like JIRA[1], Partition Commit feature and File
Compaction feature.

Details are in [2].

[1]https://issues.apache.org/jira/browse/FLINK-18674
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces

Best,
Jingsong

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Aljoscha Krettek <al...@apache.org>.
Agreed!

Aljoscha

On 14.10.20 06:38, Jingsong Li wrote:
> Hi Aljoscha,
> 
> Thanks for your feedback.
> 
> Yes, we should add DataStream Providers to the table bridge module.
> 
> I think your concerns are right, including the relationship between
> DataStream and table.
> My understanding is that the parallelism specified by the user is only the
> initialization parallelism. After that, the framework still has certain
> freedom, which is just like the user's personalized intervention on the
> optimizer.
> 
> At present, the final state of the source parallelism setting is not clear
> (DataStream is not ready, and 1.12 is imminent). So I consider shelving the
> parallelism of source. Let's focus on the parallelism setting of sink and
> rest of this FLIP. (users specify the parallelism on sink)
> And we can confirm that users can migrate their connectors to new table
> source/sink interfaces in Flink 1.12.
> What do you think?
> 
> Best,
> Jingsong
> 
> On Tue, Oct 13, 2020 at 5:01 PM Aljoscha Krettek <al...@apache.org>
> wrote:
> 
>> Hi Jingsong,
>>
>> I'm sorry, I didn't want to block you for so long on this. I thought
>> about it again.
>>
>> I think it's fine to add a DataStream Provider if this really unblocks
>> users from migrating to newer Flink versions. I'm guessing you will add
>> that to the table bridge module?
>>
>> Regarding the parallelism: I see your point of letting users set that
>> explicitly. I'm still skeptical about it but I also think it wasn't such
>> a good idea to let users specify the parallelism of individual
>> operations in the DataStream API because it again takes freedom away
>> from the framework. So if it's really sth that users need we should go
>> ahead.
>>
>> Best,
>> Aljoscha
>>
>> On 09.10.20 13:57, Jingsong Li wrote:
>>> Hi Aljoscha,
>>>
>>> I want to separate `Customized parallelism` and `Parallelism inference`.
>>>
>>> ### Customized parallelism
>>>
>>> First, I want to explain the current DataStream parallelism setting:
>>> `env.fromSource(...).setParallelism(...)`.
>>> This is how users explicitly specify parallelism, and it is the only way
>> to
>>> set parallelism.
>>>
>>> The underlying Source (Eg.: SourceFunction) is completely independent of
>>> specific parallelism. The peripheral DataStream is responsible for
>> setting
>>> parallelism.
>>> The table layer also needs to provide peer-to-peer capability.
>>>
>>> ### Parallelism inference
>>>
>>> Some sources have the ability to infer parallelism, like Kafka,
>> parallelism
>>> can be inferred from the partition number.
>>>
>>> I think you are right, we should provide this to the underlying Source.
>>> This capability must be related to the underlying Source (Eg.:
>>> SourceFunction), so this capability must introduce a new interface for
>> the
>>> underlying Source.
>>>
>>> The Table layer just tell underlying Source that user want to open
>>> parallelism inference:
>>>
>>> new MyRealSource(path, and, whatnot, parallelismInfer = true)
>>>
>>> What do you think?
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'll only respond regarding the parallelism for now because I need to
>>>> think some more about DataStream.
>>>>
>>>> What I'm saying is that exposing a parallelism only for Table Connectors
>>>> is not the right thing. If we want to allow sources to tell the
>>>> system/framework what would be a good parallelism it would be at the
>>>> underlying level.
>>>>
>>>> I'll explain with the SourceFunction. A Table API Source connector is
>>>> basically a factory that will give you a SourceFunction that corresponds
>>>> to whatever the user configured via properties and other means. If the
>>>> Table Connector somehow happens to know what would be a good parallelism
>>>> for the source it could "tell" the source when creating it, i.e.
>>>>
>>>>      new MyRealSource(path, and, whatnot, parallelismHint)
>>>>
>>>> Then the source could either work with that information it got, by
>>>> shutting down (at runtime) some of its parallel instances. Or we could
>>>> extend the Source (SourceFunction) API to expose a "parallelism hint" to
>>>> the system.
>>>>
>>>> The basic thing is that Table Connectors are not the real connectors,
>>>> they just delegate to underlying real connectors. So those underlying
>>>> connectors are where we need to change things. Otherwise we would just
>>>> have special-case solutions for the Table API.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 25.09.20 14:30, admin wrote:
>>>>> Hi everyone,
>>>>> Thanks for the proposal.
>>>>>
>>>>> In our company,we meet the same situation as @liu shouwei.
>>>>> We developed some features base on flink.Such as parallelism of sql
>>>> source/sink  connector, and kafka delay consumer which is adding a
>> flatmap
>>>> and a keyby transformation after the source Datastream.
>>>>> What make us embarrassing is that when we migrate this features to
>> Flink
>>>> 1.11,we found that the DataSteam is missing,So we modify the blink’s
>> code
>>>> to support parallelism.But kafka delay comsumer is unsolved until now.
>>>>>
>>>>>    From user’s perspective,it necessary to manipulate DataStream or have
>>>> the interoperability between Table API and DataStream.
>>>>>
>>>>> Best
>>>>>
>>>>>
>>>>>
>>>>>> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
>>>>>>
>>>>>> Hi Jingsong,
>>>>>>
>>>>>> Thanks for driving this effort. I have two minor comments.
>>>>>>
>>>>>>
>>>>>>      1. IMHO, parallelism is a concept that applies to all
>>>> ScanTableSource.
>>>>>>      So instead of defining a new interface, is it more natural to
>>>> incorporate
>>>>>>      parallel inference to existing interfaces, e.g. ScanTableSource
>>>>>>      or ScanRuntimeProvider?
>>>>>>      2. `scan.infer-parallelism.enabled` doesn't seem very useful to
>> me.
>>>> From
>>>>>>      a user's perspective, parallelism is either set by
>>>> `scan.parallelism`, or
>>>>>>      automatically decided by Flink. If a user doesn't want the
>> connector
>>>> to
>>>>>>      infer parallelism, he/she can simply set `scan.parallelism`, no?
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> Thank you for your feedback,
>>>>>>>
>>>>>>> ## Connector parallelism
>>>>>>>
>>>>>>> Requirements:
>>>>>>> Set parallelism by user specified or inferred by connector.
>>>>>>>
>>>>>>> How to configure parallelism in DataStream:
>>>>>>> In the DataStream world, the only way to configure parallelism is
>>>>>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to
>>>> have
>>>>>>> access to DataStream when using a connector, not just the
>>>> `SourceFunction`
>>>>>>> / `Source` interface.
>>>>>>> Is parallelism related to connectors? I think yes, there are many
>>>>>>> connectors that can support obtaining parallelism related information
>>>> from
>>>>>>> them, and users do exactly that. This means parallelism inference
>> (From
>>>>>>> connectors).
>>>>>>> The key is that `DataStream` is an open programming API, and users
>> can
>>>>>>> freely program to set parallelism.
>>>>>>>
>>>>>>> How to configure parallelism in Table/SQL:
>>>>>>> But Table/SQL is not an open programming API, every feature needs a
>>>>>>> corresponding mechanism, because the user is no longer able to
>>>> program. Our
>>>>>>> current connector interface: SourceFunctionProvider,
>>>> SinkFunctionProvider,
>>>>>>> through these interfaces, there is no ability to generate connector
>>>> related
>>>>>>> parallelism.
>>>>>>> Back to our original intention: to avoid users directly manipulating
>>>>>>> `DataStream`. Since we want to avoid it, we need to provide
>>>> corresponding
>>>>>>> features.
>>>>>>>
>>>>>>> And parallelism is the runtime information of connectors, It fits the
>>>> name
>>>>>>> of `ScanRuntimeProvider`.
>>>>>>>
>>>>>>>> If we wanted to add a "get parallelism" it would be in those
>>>> underlying
>>>>>>> connectors but I'm also skeptical about adding such a method there
>>>> because
>>>>>>> it is a static assignment and would preclude clever optimizations
>>>> about the
>>>>>>> parallelism of a connector at runtime.
>>>>>>>
>>>>>>> I think that when a job is submitted, it is in compile time. It
>> should
>>>> only
>>>>>>> provide static parallelism.
>>>>>>>
>>>>>>> ## DataStream in table connector
>>>>>>>
>>>>>>> As I said before, if we want to completely cancel DataStream in the
>>>> table
>>>>>>> connector, we need to provide corresponding functions in
>>>>>>> `xxRuntimeProvider`.
>>>>>>> Otherwise, we and users may not be able to migrate the old
>> connectors.
>>>>>>> Including Hive/FileSystem connectors and the user cases I mentioned
>>>> above.
>>>>>>> CC: @liu shouwei
>>>>>>>
>>>>>>> We really need to consider these cases.
>>>>>>> If there is no alternative in a short period of time, for a long
>>>>>>> time, users need to continue to use the old table connector API,
>> which
>>>> has
>>>>>>> been deprecated.
>>>>>>>
>>>>>>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
>>>>>>> - These tables are just temporary tables. Can not be
>> integrated/stored
>>>> into
>>>>>>> Catalog.
>>>>>>> - Creating table DDL can not work...
>>>>>>> - We need to lose the kinds of useful features of Table/SQL on the
>>>>>>> connector. For example, projection pushdown, filter pushdown,
>>>> partitions
>>>>>>> and etc...
>>>>>>>
>>>>>>> But I believe you are right in the long run. The source and sink APIs
>>>>>>> should be powerful enough to cover all reasonable cases.
>>>>>>> Maybe we can just introduce them in a minimal way. For example, we
>> only
>>>>>>> introduce `DataStreamSinkProvider` in planner as an internal API.
>>>>>>>
>>>>>>> Your points are very meaningful, hope to get your reply.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong
>>>>>>>
>>>>>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <
>> wenlong88.lwl@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,Aljoscha, I would like to share a use case to second setting
>>>>>>> parallelism
>>>>>>>> of table sink(or limiting parallelism range of table sink): When
>>>> writing
>>>>>>>> data to databases, there is limitation for number of jdbc
>> connections
>>>> and
>>>>>>>> query TPS. we would get errors of too many connections or high load
>>>> for
>>>>>>>> db and poor performance because of too many small requests if the
>>>>>>> optimizer
>>>>>>>> didn't know such information, and set a large parallelism for sink
>>>> when
>>>>>>>> matching the parallelism of its input.
>>>>>>>>
>>>>>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljoscha@apache.org
>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the proposal! I think the use cases that we are trying
>> to
>>>>>>>>> solve are indeed valid. However, I think we might have to take a
>> step
>>>>>>>>> back to look at what we're trying to solve and how we can solve it.
>>>>>>>>>
>>>>>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism"
>>>> to
>>>>>>>>> sinks/sources 2) let users write DataStream topologies for
>>>>>>>>> sinks/sources. I'll treat them separately below.
>>>>>>>>>
>>>>>>>>> I think we should not add "get parallelism" to the Table Sink API
>>>>>>>>> because I think it's the wrong level of abstraction. The Table API
>>>>>>>>> connectors are (or should be) more or less thin wrappers around
>>>>>>>>> "physical" connectors. By "physical" I mean the underlying (mostly
>>>>>>>>> DataStream API) connectors. For example, with the Kafka Connector
>> the
>>>>>>>>> Table API connector just does the configuration parsing and
>>>> determines
>>>>>>> a
>>>>>>>>> good (de)serialization format and then creates the underlying
>>>>>>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
>>>>>>>>>
>>>>>>>>> If we wanted to add a "get parallelism" it would be in those
>>>> underlying
>>>>>>>>> connectors but I'm also skeptical about adding such a method there
>>>>>>>>> because it is a static assignment and would preclude clever
>>>>>>>>> optimizations about the parallelism of a connector at runtime. But
>>>>>>> maybe
>>>>>>>>> that's thinking too much about future work so I'm open to
>> discussion
>>>>>>>> there.
>>>>>>>>>
>>>>>>>>> Regarding the second point of letting Table connector developers
>> use
>>>>>>>>> DataStream: I think we should not do it. One of the purposes of
>>>> FLIP-95
>>>>>>>>> [1] was to decouple the Table API from the DataStream API for the
>>>> basic
>>>>>>>>> interfaces. Coupling the two too closely at that basic level will
>>>> make
>>>>>>>>> our live harder in the future when we want to evolve those APIs or
>>>> when
>>>>>>>>> we want the system to be better at choosing how to execute sources
>>>> and
>>>>>>>>> sinks. An example of this is actually the past of the Table API.
>>>> Before
>>>>>>>>> FLIP-95 we had connectors that dealt directly with DataSet and
>>>>>>>>> DataStream, meaning that if users wanted their Table Sink to work
>> in
>>>>>>>>> both BATCH and STREAMING mode they had to provide two
>>>> implementations.
>>>>>>>>> The trend is towards unifying the sources/sinks to common
>> interfaces
>>>>>>>>> that can be used for both BATCH and STREAMING execution but,
>> again, I
>>>>>>>>> think exposing DataStream here would be a step back in the wrong
>>>>>>>> direction.
>>>>>>>>>
>>>>>>>>> I think the solution to the existing user requirement of using
>>>>>>>>> DataStream sources and sinks with the Table API should be better
>>>>>>>>> interoperability between the two APIs, which is being tackled right
>>>> now
>>>>>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
>>>>>>>>> we're trying to solve here, maybe we should think about FLIP-136
>> some
>>>>>>>> more.
>>>>>>>>>
>>>>>>>>> What do you think?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>>>> [2]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards!
>>>>>> Rui Li
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Jingsong Li <ji...@gmail.com>.
Hi Aljoscha,

Thanks for your feedback.

Yes, we should add DataStream Providers to the table bridge module.

I think your concerns are right, including the relationship between
DataStream and table.
My understanding is that the parallelism specified by the user is only the
initialization parallelism. After that, the framework still has certain
freedom, which is just like the user's personalized intervention on the
optimizer.

At present, the final state of the source parallelism setting is not clear
(DataStream is not ready, and 1.12 is imminent). So I consider shelving the
parallelism of source. Let's focus on the parallelism setting of sink and
rest of this FLIP. (users specify the parallelism on sink)
And we can confirm that users can migrate their connectors to new table
source/sink interfaces in Flink 1.12.
What do you think?

Best,
Jingsong

On Tue, Oct 13, 2020 at 5:01 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Jingsong,
>
> I'm sorry, I didn't want to block you for so long on this. I thought
> about it again.
>
> I think it's fine to add a DataStream Provider if this really unblocks
> users from migrating to newer Flink versions. I'm guessing you will add
> that to the table bridge module?
>
> Regarding the parallelism: I see your point of letting users set that
> explicitly. I'm still skeptical about it but I also think it wasn't such
> a good idea to let users specify the parallelism of individual
> operations in the DataStream API because it again takes freedom away
> from the framework. So if it's really sth that users need we should go
> ahead.
>
> Best,
> Aljoscha
>
> On 09.10.20 13:57, Jingsong Li wrote:
> > Hi Aljoscha,
> >
> > I want to separate `Customized parallelism` and `Parallelism inference`.
> >
> > ### Customized parallelism
> >
> > First, I want to explain the current DataStream parallelism setting:
> > `env.fromSource(...).setParallelism(...)`.
> > This is how users explicitly specify parallelism, and it is the only way
> to
> > set parallelism.
> >
> > The underlying Source (Eg.: SourceFunction) is completely independent of
> > specific parallelism. The peripheral DataStream is responsible for
> setting
> > parallelism.
> > The table layer also needs to provide peer-to-peer capability.
> >
> > ### Parallelism inference
> >
> > Some sources have the ability to infer parallelism, like Kafka,
> parallelism
> > can be inferred from the partition number.
> >
> > I think you are right, we should provide this to the underlying Source.
> > This capability must be related to the underlying Source (Eg.:
> > SourceFunction), so this capability must introduce a new interface for
> the
> > underlying Source.
> >
> > The Table layer just tell underlying Source that user want to open
> > parallelism inference:
> >
> > new MyRealSource(path, and, whatnot, parallelismInfer = true)
> >
> > What do you think?
> >
> > Best,
> > Jingsong
> >
> > On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> I'll only respond regarding the parallelism for now because I need to
> >> think some more about DataStream.
> >>
> >> What I'm saying is that exposing a parallelism only for Table Connectors
> >> is not the right thing. If we want to allow sources to tell the
> >> system/framework what would be a good parallelism it would be at the
> >> underlying level.
> >>
> >> I'll explain with the SourceFunction. A Table API Source connector is
> >> basically a factory that will give you a SourceFunction that corresponds
> >> to whatever the user configured via properties and other means. If the
> >> Table Connector somehow happens to know what would be a good parallelism
> >> for the source it could "tell" the source when creating it, i.e.
> >>
> >>     new MyRealSource(path, and, whatnot, parallelismHint)
> >>
> >> Then the source could either work with that information it got, by
> >> shutting down (at runtime) some of its parallel instances. Or we could
> >> extend the Source (SourceFunction) API to expose a "parallelism hint" to
> >> the system.
> >>
> >> The basic thing is that Table Connectors are not the real connectors,
> >> they just delegate to underlying real connectors. So those underlying
> >> connectors are where we need to change things. Otherwise we would just
> >> have special-case solutions for the Table API.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 25.09.20 14:30, admin wrote:
> >>> Hi everyone,
> >>> Thanks for the proposal.
> >>>
> >>> In our company,we meet the same situation as @liu shouwei.
> >>> We developed some features base on flink.Such as parallelism of sql
> >> source/sink  connector, and kafka delay consumer which is adding a
> flatmap
> >> and a keyby transformation after the source Datastream.
> >>> What make us embarrassing is that when we migrate this features to
> Flink
> >> 1.11,we found that the DataSteam is missing,So we modify the blink’s
> code
> >> to support parallelism.But kafka delay comsumer is unsolved until now.
> >>>
> >>>   From user’s perspective,it necessary to manipulate DataStream or have
> >> the interoperability between Table API and DataStream.
> >>>
> >>> Best
> >>>
> >>>
> >>>
> >>>> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
> >>>>
> >>>> Hi Jingsong,
> >>>>
> >>>> Thanks for driving this effort. I have two minor comments.
> >>>>
> >>>>
> >>>>     1. IMHO, parallelism is a concept that applies to all
> >> ScanTableSource.
> >>>>     So instead of defining a new interface, is it more natural to
> >> incorporate
> >>>>     parallel inference to existing interfaces, e.g. ScanTableSource
> >>>>     or ScanRuntimeProvider?
> >>>>     2. `scan.infer-parallelism.enabled` doesn't seem very useful to
> me.
> >> From
> >>>>     a user's perspective, parallelism is either set by
> >> `scan.parallelism`, or
> >>>>     automatically decided by Flink. If a user doesn't want the
> connector
> >> to
> >>>>     infer parallelism, he/she can simply set `scan.parallelism`, no?
> >>>>
> >>>>
> >>>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi Aljoscha,
> >>>>>
> >>>>> Thank you for your feedback,
> >>>>>
> >>>>> ## Connector parallelism
> >>>>>
> >>>>> Requirements:
> >>>>> Set parallelism by user specified or inferred by connector.
> >>>>>
> >>>>> How to configure parallelism in DataStream:
> >>>>> In the DataStream world, the only way to configure parallelism is
> >>>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to
> >> have
> >>>>> access to DataStream when using a connector, not just the
> >> `SourceFunction`
> >>>>> / `Source` interface.
> >>>>> Is parallelism related to connectors? I think yes, there are many
> >>>>> connectors that can support obtaining parallelism related information
> >> from
> >>>>> them, and users do exactly that. This means parallelism inference
> (From
> >>>>> connectors).
> >>>>> The key is that `DataStream` is an open programming API, and users
> can
> >>>>> freely program to set parallelism.
> >>>>>
> >>>>> How to configure parallelism in Table/SQL:
> >>>>> But Table/SQL is not an open programming API, every feature needs a
> >>>>> corresponding mechanism, because the user is no longer able to
> >> program. Our
> >>>>> current connector interface: SourceFunctionProvider,
> >> SinkFunctionProvider,
> >>>>> through these interfaces, there is no ability to generate connector
> >> related
> >>>>> parallelism.
> >>>>> Back to our original intention: to avoid users directly manipulating
> >>>>> `DataStream`. Since we want to avoid it, we need to provide
> >> corresponding
> >>>>> features.
> >>>>>
> >>>>> And parallelism is the runtime information of connectors, It fits the
> >> name
> >>>>> of `ScanRuntimeProvider`.
> >>>>>
> >>>>>> If we wanted to add a "get parallelism" it would be in those
> >> underlying
> >>>>> connectors but I'm also skeptical about adding such a method there
> >> because
> >>>>> it is a static assignment and would preclude clever optimizations
> >> about the
> >>>>> parallelism of a connector at runtime.
> >>>>>
> >>>>> I think that when a job is submitted, it is in compile time. It
> should
> >> only
> >>>>> provide static parallelism.
> >>>>>
> >>>>> ## DataStream in table connector
> >>>>>
> >>>>> As I said before, if we want to completely cancel DataStream in the
> >> table
> >>>>> connector, we need to provide corresponding functions in
> >>>>> `xxRuntimeProvider`.
> >>>>> Otherwise, we and users may not be able to migrate the old
> connectors.
> >>>>> Including Hive/FileSystem connectors and the user cases I mentioned
> >> above.
> >>>>> CC: @liu shouwei
> >>>>>
> >>>>> We really need to consider these cases.
> >>>>> If there is no alternative in a short period of time, for a long
> >>>>> time, users need to continue to use the old table connector API,
> which
> >> has
> >>>>> been deprecated.
> >>>>>
> >>>>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
> >>>>> - These tables are just temporary tables. Can not be
> integrated/stored
> >> into
> >>>>> Catalog.
> >>>>> - Creating table DDL can not work...
> >>>>> - We need to lose the kinds of useful features of Table/SQL on the
> >>>>> connector. For example, projection pushdown, filter pushdown,
> >> partitions
> >>>>> and etc...
> >>>>>
> >>>>> But I believe you are right in the long run. The source and sink APIs
> >>>>> should be powerful enough to cover all reasonable cases.
> >>>>> Maybe we can just introduce them in a minimal way. For example, we
> only
> >>>>> introduce `DataStreamSinkProvider` in planner as an internal API.
> >>>>>
> >>>>> Your points are very meaningful, hope to get your reply.
> >>>>>
> >>>>> Best,
> >>>>> Jingsong
> >>>>>
> >>>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <
> wenlong88.lwl@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,Aljoscha, I would like to share a use case to second setting
> >>>>> parallelism
> >>>>>> of table sink(or limiting parallelism range of table sink): When
> >> writing
> >>>>>> data to databases, there is limitation for number of jdbc
> connections
> >> and
> >>>>>> query TPS. we would get errors of too many connections or high load
> >> for
> >>>>>> db and poor performance because of too many small requests if the
> >>>>> optimizer
> >>>>>> didn't know such information, and set a large parallelism for sink
> >> when
> >>>>>> matching the parallelism of its input.
> >>>>>>
> >>>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <aljoscha@apache.org
> >
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the proposal! I think the use cases that we are trying
> to
> >>>>>>> solve are indeed valid. However, I think we might have to take a
> step
> >>>>>>> back to look at what we're trying to solve and how we can solve it.
> >>>>>>>
> >>>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism"
> >> to
> >>>>>>> sinks/sources 2) let users write DataStream topologies for
> >>>>>>> sinks/sources. I'll treat them separately below.
> >>>>>>>
> >>>>>>> I think we should not add "get parallelism" to the Table Sink API
> >>>>>>> because I think it's the wrong level of abstraction. The Table API
> >>>>>>> connectors are (or should be) more or less thin wrappers around
> >>>>>>> "physical" connectors. By "physical" I mean the underlying (mostly
> >>>>>>> DataStream API) connectors. For example, with the Kafka Connector
> the
> >>>>>>> Table API connector just does the configuration parsing and
> >> determines
> >>>>> a
> >>>>>>> good (de)serialization format and then creates the underlying
> >>>>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
> >>>>>>>
> >>>>>>> If we wanted to add a "get parallelism" it would be in those
> >> underlying
> >>>>>>> connectors but I'm also skeptical about adding such a method there
> >>>>>>> because it is a static assignment and would preclude clever
> >>>>>>> optimizations about the parallelism of a connector at runtime. But
> >>>>> maybe
> >>>>>>> that's thinking too much about future work so I'm open to
> discussion
> >>>>>> there.
> >>>>>>>
> >>>>>>> Regarding the second point of letting Table connector developers
> use
> >>>>>>> DataStream: I think we should not do it. One of the purposes of
> >> FLIP-95
> >>>>>>> [1] was to decouple the Table API from the DataStream API for the
> >> basic
> >>>>>>> interfaces. Coupling the two too closely at that basic level will
> >> make
> >>>>>>> our live harder in the future when we want to evolve those APIs or
> >> when
> >>>>>>> we want the system to be better at choosing how to execute sources
> >> and
> >>>>>>> sinks. An example of this is actually the past of the Table API.
> >> Before
> >>>>>>> FLIP-95 we had connectors that dealt directly with DataSet and
> >>>>>>> DataStream, meaning that if users wanted their Table Sink to work
> in
> >>>>>>> both BATCH and STREAMING mode they had to provide two
> >> implementations.
> >>>>>>> The trend is towards unifying the sources/sinks to common
> interfaces
> >>>>>>> that can be used for both BATCH and STREAMING execution but,
> again, I
> >>>>>>> think exposing DataStream here would be a step back in the wrong
> >>>>>> direction.
> >>>>>>>
> >>>>>>> I think the solution to the existing user requirement of using
> >>>>>>> DataStream sources and sinks with the Table API should be better
> >>>>>>> interoperability between the two APIs, which is being tackled right
> >> now
> >>>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> >>>>>>> we're trying to solve here, maybe we should think about FLIP-136
> some
> >>>>>> more.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >>>>>>> [2]
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Best, Jingsong Lee
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Best regards!
> >>>> Rui Li
> >>>
> >>
> >>
> >
>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Jingsong,

I'm sorry, I didn't want to block you for so long on this. I thought 
about it again.

I think it's fine to add a DataStream Provider if this really unblocks 
users from migrating to newer Flink versions. I'm guessing you will add 
that to the table bridge module?

Regarding the parallelism: I see your point of letting users set that 
explicitly. I'm still skeptical about it but I also think it wasn't such 
a good idea to let users specify the parallelism of individual 
operations in the DataStream API because it again takes freedom away 
from the framework. So if it's really sth that users need we should go 
ahead.

Best,
Aljoscha

On 09.10.20 13:57, Jingsong Li wrote:
> Hi Aljoscha,
> 
> I want to separate `Customized parallelism` and `Parallelism inference`.
> 
> ### Customized parallelism
> 
> First, I want to explain the current DataStream parallelism setting:
> `env.fromSource(...).setParallelism(...)`.
> This is how users explicitly specify parallelism, and it is the only way to
> set parallelism.
> 
> The underlying Source (Eg.: SourceFunction) is completely independent of
> specific parallelism. The peripheral DataStream is responsible for setting
> parallelism.
> The table layer also needs to provide peer-to-peer capability.
> 
> ### Parallelism inference
> 
> Some sources have the ability to infer parallelism, like Kafka, parallelism
> can be inferred from the partition number.
> 
> I think you are right, we should provide this to the underlying Source.
> This capability must be related to the underlying Source (Eg.:
> SourceFunction), so this capability must introduce a new interface for the
> underlying Source.
> 
> The Table layer just tell underlying Source that user want to open
> parallelism inference:
> 
> new MyRealSource(path, and, whatnot, parallelismInfer = true)
> 
> What do you think?
> 
> Best,
> Jingsong
> 
> On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <al...@apache.org>
> wrote:
> 
>> Hi,
>>
>> I'll only respond regarding the parallelism for now because I need to
>> think some more about DataStream.
>>
>> What I'm saying is that exposing a parallelism only for Table Connectors
>> is not the right thing. If we want to allow sources to tell the
>> system/framework what would be a good parallelism it would be at the
>> underlying level.
>>
>> I'll explain with the SourceFunction. A Table API Source connector is
>> basically a factory that will give you a SourceFunction that corresponds
>> to whatever the user configured via properties and other means. If the
>> Table Connector somehow happens to know what would be a good parallelism
>> for the source it could "tell" the source when creating it, i.e.
>>
>>     new MyRealSource(path, and, whatnot, parallelismHint)
>>
>> Then the source could either work with that information it got, by
>> shutting down (at runtime) some of its parallel instances. Or we could
>> extend the Source (SourceFunction) API to expose a "parallelism hint" to
>> the system.
>>
>> The basic thing is that Table Connectors are not the real connectors,
>> they just delegate to underlying real connectors. So those underlying
>> connectors are where we need to change things. Otherwise we would just
>> have special-case solutions for the Table API.
>>
>> Best,
>> Aljoscha
>>
>> On 25.09.20 14:30, admin wrote:
>>> Hi everyone,
>>> Thanks for the proposal.
>>>
>>> In our company,we meet the same situation as @liu shouwei.
>>> We developed some features base on flink.Such as parallelism of sql
>> source/sink  connector, and kafka delay consumer which is adding a flatmap
>> and a keyby transformation after the source Datastream.
>>> What make us embarrassing is that when we migrate this features to Flink
>> 1.11,we found that the DataSteam is missing,So we modify the blink’s code
>> to support parallelism.But kafka delay comsumer is unsolved until now.
>>>
>>>   From user’s perspective,it necessary to manipulate DataStream or have
>> the interoperability between Table API and DataStream.
>>>
>>> Best
>>>
>>>
>>>
>>>> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
>>>>
>>>> Hi Jingsong,
>>>>
>>>> Thanks for driving this effort. I have two minor comments.
>>>>
>>>>
>>>>     1. IMHO, parallelism is a concept that applies to all
>> ScanTableSource.
>>>>     So instead of defining a new interface, is it more natural to
>> incorporate
>>>>     parallel inference to existing interfaces, e.g. ScanTableSource
>>>>     or ScanRuntimeProvider?
>>>>     2. `scan.infer-parallelism.enabled` doesn't seem very useful to me.
>> From
>>>>     a user's perspective, parallelism is either set by
>> `scan.parallelism`, or
>>>>     automatically decided by Flink. If a user doesn't want the connector
>> to
>>>>     infer parallelism, he/she can simply set `scan.parallelism`, no?
>>>>
>>>>
>>>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com>
>> wrote:
>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> Thank you for your feedback,
>>>>>
>>>>> ## Connector parallelism
>>>>>
>>>>> Requirements:
>>>>> Set parallelism by user specified or inferred by connector.
>>>>>
>>>>> How to configure parallelism in DataStream:
>>>>> In the DataStream world, the only way to configure parallelism is
>>>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to
>> have
>>>>> access to DataStream when using a connector, not just the
>> `SourceFunction`
>>>>> / `Source` interface.
>>>>> Is parallelism related to connectors? I think yes, there are many
>>>>> connectors that can support obtaining parallelism related information
>> from
>>>>> them, and users do exactly that. This means parallelism inference (From
>>>>> connectors).
>>>>> The key is that `DataStream` is an open programming API, and users can
>>>>> freely program to set parallelism.
>>>>>
>>>>> How to configure parallelism in Table/SQL:
>>>>> But Table/SQL is not an open programming API, every feature needs a
>>>>> corresponding mechanism, because the user is no longer able to
>> program. Our
>>>>> current connector interface: SourceFunctionProvider,
>> SinkFunctionProvider,
>>>>> through these interfaces, there is no ability to generate connector
>> related
>>>>> parallelism.
>>>>> Back to our original intention: to avoid users directly manipulating
>>>>> `DataStream`. Since we want to avoid it, we need to provide
>> corresponding
>>>>> features.
>>>>>
>>>>> And parallelism is the runtime information of connectors, It fits the
>> name
>>>>> of `ScanRuntimeProvider`.
>>>>>
>>>>>> If we wanted to add a "get parallelism" it would be in those
>> underlying
>>>>> connectors but I'm also skeptical about adding such a method there
>> because
>>>>> it is a static assignment and would preclude clever optimizations
>> about the
>>>>> parallelism of a connector at runtime.
>>>>>
>>>>> I think that when a job is submitted, it is in compile time. It should
>> only
>>>>> provide static parallelism.
>>>>>
>>>>> ## DataStream in table connector
>>>>>
>>>>> As I said before, if we want to completely cancel DataStream in the
>> table
>>>>> connector, we need to provide corresponding functions in
>>>>> `xxRuntimeProvider`.
>>>>> Otherwise, we and users may not be able to migrate the old connectors.
>>>>> Including Hive/FileSystem connectors and the user cases I mentioned
>> above.
>>>>> CC: @liu shouwei
>>>>>
>>>>> We really need to consider these cases.
>>>>> If there is no alternative in a short period of time, for a long
>>>>> time, users need to continue to use the old table connector API, which
>> has
>>>>> been deprecated.
>>>>>
>>>>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
>>>>> - These tables are just temporary tables. Can not be integrated/stored
>> into
>>>>> Catalog.
>>>>> - Creating table DDL can not work...
>>>>> - We need to lose the kinds of useful features of Table/SQL on the
>>>>> connector. For example, projection pushdown, filter pushdown,
>> partitions
>>>>> and etc...
>>>>>
>>>>> But I believe you are right in the long run. The source and sink APIs
>>>>> should be powerful enough to cover all reasonable cases.
>>>>> Maybe we can just introduce them in a minimal way. For example, we only
>>>>> introduce `DataStreamSinkProvider` in planner as an internal API.
>>>>>
>>>>> Your points are very meaningful, hope to get your reply.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <we...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,Aljoscha, I would like to share a use case to second setting
>>>>> parallelism
>>>>>> of table sink(or limiting parallelism range of table sink): When
>> writing
>>>>>> data to databases, there is limitation for number of jdbc connections
>> and
>>>>>> query TPS. we would get errors of too many connections or high load
>> for
>>>>>> db and poor performance because of too many small requests if the
>>>>> optimizer
>>>>>> didn't know such information, and set a large parallelism for sink
>> when
>>>>>> matching the parallelism of its input.
>>>>>>
>>>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <al...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the proposal! I think the use cases that we are trying to
>>>>>>> solve are indeed valid. However, I think we might have to take a step
>>>>>>> back to look at what we're trying to solve and how we can solve it.
>>>>>>>
>>>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism"
>> to
>>>>>>> sinks/sources 2) let users write DataStream topologies for
>>>>>>> sinks/sources. I'll treat them separately below.
>>>>>>>
>>>>>>> I think we should not add "get parallelism" to the Table Sink API
>>>>>>> because I think it's the wrong level of abstraction. The Table API
>>>>>>> connectors are (or should be) more or less thin wrappers around
>>>>>>> "physical" connectors. By "physical" I mean the underlying (mostly
>>>>>>> DataStream API) connectors. For example, with the Kafka Connector the
>>>>>>> Table API connector just does the configuration parsing and
>> determines
>>>>> a
>>>>>>> good (de)serialization format and then creates the underlying
>>>>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
>>>>>>>
>>>>>>> If we wanted to add a "get parallelism" it would be in those
>> underlying
>>>>>>> connectors but I'm also skeptical about adding such a method there
>>>>>>> because it is a static assignment and would preclude clever
>>>>>>> optimizations about the parallelism of a connector at runtime. But
>>>>> maybe
>>>>>>> that's thinking too much about future work so I'm open to discussion
>>>>>> there.
>>>>>>>
>>>>>>> Regarding the second point of letting Table connector developers use
>>>>>>> DataStream: I think we should not do it. One of the purposes of
>> FLIP-95
>>>>>>> [1] was to decouple the Table API from the DataStream API for the
>> basic
>>>>>>> interfaces. Coupling the two too closely at that basic level will
>> make
>>>>>>> our live harder in the future when we want to evolve those APIs or
>> when
>>>>>>> we want the system to be better at choosing how to execute sources
>> and
>>>>>>> sinks. An example of this is actually the past of the Table API.
>> Before
>>>>>>> FLIP-95 we had connectors that dealt directly with DataSet and
>>>>>>> DataStream, meaning that if users wanted their Table Sink to work in
>>>>>>> both BATCH and STREAMING mode they had to provide two
>> implementations.
>>>>>>> The trend is towards unifying the sources/sinks to common interfaces
>>>>>>> that can be used for both BATCH and STREAMING execution but, again, I
>>>>>>> think exposing DataStream here would be a step back in the wrong
>>>>>> direction.
>>>>>>>
>>>>>>> I think the solution to the existing user requirement of using
>>>>>>> DataStream sources and sinks with the Table API should be better
>>>>>>> interoperability between the two APIs, which is being tackled right
>> now
>>>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
>>>>>>> we're trying to solve here, maybe we should think about FLIP-136 some
>>>>>> more.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>>>> [2]
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards!
>>>> Rui Li
>>>
>>
>>
> 


Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Jingsong Li <ji...@gmail.com>.
Hi Aljoscha,

I want to separate `Customized parallelism` and `Parallelism inference`.

### Customized parallelism

First, I want to explain the current DataStream parallelism setting:
`env.fromSource(...).setParallelism(...)`.
This is how users explicitly specify parallelism, and it is the only way to
set parallelism.

The underlying Source (Eg.: SourceFunction) is completely independent of
specific parallelism. The peripheral DataStream is responsible for setting
parallelism.
The table layer also needs to provide peer-to-peer capability.

### Parallelism inference

Some sources have the ability to infer parallelism, like Kafka, parallelism
can be inferred from the partition number.

I think you are right, we should provide this to the underlying Source.
This capability must be related to the underlying Source (Eg.:
SourceFunction), so this capability must introduce a new interface for the
underlying Source.

The Table layer just tell underlying Source that user want to open
parallelism inference:

new MyRealSource(path, and, whatnot, parallelismInfer = true)

What do you think?

Best,
Jingsong

On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I'll only respond regarding the parallelism for now because I need to
> think some more about DataStream.
>
> What I'm saying is that exposing a parallelism only for Table Connectors
> is not the right thing. If we want to allow sources to tell the
> system/framework what would be a good parallelism it would be at the
> underlying level.
>
> I'll explain with the SourceFunction. A Table API Source connector is
> basically a factory that will give you a SourceFunction that corresponds
> to whatever the user configured via properties and other means. If the
> Table Connector somehow happens to know what would be a good parallelism
> for the source it could "tell" the source when creating it, i.e.
>
>    new MyRealSource(path, and, whatnot, parallelismHint)
>
> Then the source could either work with that information it got, by
> shutting down (at runtime) some of its parallel instances. Or we could
> extend the Source (SourceFunction) API to expose a "parallelism hint" to
> the system.
>
> The basic thing is that Table Connectors are not the real connectors,
> they just delegate to underlying real connectors. So those underlying
> connectors are where we need to change things. Otherwise we would just
> have special-case solutions for the Table API.
>
> Best,
> Aljoscha
>
> On 25.09.20 14:30, admin wrote:
> > Hi everyone,
> > Thanks for the proposal.
> >
> > In our company,we meet the same situation as @liu shouwei.
> > We developed some features base on flink.Such as parallelism of sql
> source/sink  connector, and kafka delay consumer which is adding a flatmap
> and a keyby transformation after the source Datastream.
> > What make us embarrassing is that when we migrate this features to Flink
> 1.11,we found that the DataSteam is missing,So we modify the blink’s code
> to support parallelism.But kafka delay comsumer is unsolved until now.
> >
> >  From user’s perspective,it necessary to manipulate DataStream or have
> the interoperability between Table API and DataStream.
> >
> > Best
> >
> >
> >
> >> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
> >>
> >> Hi Jingsong,
> >>
> >> Thanks for driving this effort. I have two minor comments.
> >>
> >>
> >>    1. IMHO, parallelism is a concept that applies to all
> ScanTableSource.
> >>    So instead of defining a new interface, is it more natural to
> incorporate
> >>    parallel inference to existing interfaces, e.g. ScanTableSource
> >>    or ScanRuntimeProvider?
> >>    2. `scan.infer-parallelism.enabled` doesn't seem very useful to me.
> From
> >>    a user's perspective, parallelism is either set by
> `scan.parallelism`, or
> >>    automatically decided by Flink. If a user doesn't want the connector
> to
> >>    infer parallelism, he/she can simply set `scan.parallelism`, no?
> >>
> >>
> >> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com>
> wrote:
> >>
> >>> Hi Aljoscha,
> >>>
> >>> Thank you for your feedback,
> >>>
> >>> ## Connector parallelism
> >>>
> >>> Requirements:
> >>> Set parallelism by user specified or inferred by connector.
> >>>
> >>> How to configure parallelism in DataStream:
> >>> In the DataStream world, the only way to configure parallelism is
> >>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to
> have
> >>> access to DataStream when using a connector, not just the
> `SourceFunction`
> >>> / `Source` interface.
> >>> Is parallelism related to connectors? I think yes, there are many
> >>> connectors that can support obtaining parallelism related information
> from
> >>> them, and users do exactly that. This means parallelism inference (From
> >>> connectors).
> >>> The key is that `DataStream` is an open programming API, and users can
> >>> freely program to set parallelism.
> >>>
> >>> How to configure parallelism in Table/SQL:
> >>> But Table/SQL is not an open programming API, every feature needs a
> >>> corresponding mechanism, because the user is no longer able to
> program. Our
> >>> current connector interface: SourceFunctionProvider,
> SinkFunctionProvider,
> >>> through these interfaces, there is no ability to generate connector
> related
> >>> parallelism.
> >>> Back to our original intention: to avoid users directly manipulating
> >>> `DataStream`. Since we want to avoid it, we need to provide
> corresponding
> >>> features.
> >>>
> >>> And parallelism is the runtime information of connectors, It fits the
> name
> >>> of `ScanRuntimeProvider`.
> >>>
> >>>> If we wanted to add a "get parallelism" it would be in those
> underlying
> >>> connectors but I'm also skeptical about adding such a method there
> because
> >>> it is a static assignment and would preclude clever optimizations
> about the
> >>> parallelism of a connector at runtime.
> >>>
> >>> I think that when a job is submitted, it is in compile time. It should
> only
> >>> provide static parallelism.
> >>>
> >>> ## DataStream in table connector
> >>>
> >>> As I said before, if we want to completely cancel DataStream in the
> table
> >>> connector, we need to provide corresponding functions in
> >>> `xxRuntimeProvider`.
> >>> Otherwise, we and users may not be able to migrate the old connectors.
> >>> Including Hive/FileSystem connectors and the user cases I mentioned
> above.
> >>> CC: @liu shouwei
> >>>
> >>> We really need to consider these cases.
> >>> If there is no alternative in a short period of time, for a long
> >>> time, users need to continue to use the old table connector API, which
> has
> >>> been deprecated.
> >>>
> >>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
> >>> - These tables are just temporary tables. Can not be integrated/stored
> into
> >>> Catalog.
> >>> - Creating table DDL can not work...
> >>> - We need to lose the kinds of useful features of Table/SQL on the
> >>> connector. For example, projection pushdown, filter pushdown,
> partitions
> >>> and etc...
> >>>
> >>> But I believe you are right in the long run. The source and sink APIs
> >>> should be powerful enough to cover all reasonable cases.
> >>> Maybe we can just introduce them in a minimal way. For example, we only
> >>> introduce `DataStreamSinkProvider` in planner as an internal API.
> >>>
> >>> Your points are very meaningful, hope to get your reply.
> >>>
> >>> Best,
> >>> Jingsong
> >>>
> >>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <we...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,Aljoscha, I would like to share a use case to second setting
> >>> parallelism
> >>>> of table sink(or limiting parallelism range of table sink): When
> writing
> >>>> data to databases, there is limitation for number of jdbc connections
> and
> >>>> query TPS. we would get errors of too many connections or high load
> for
> >>>> db and poor performance because of too many small requests if the
> >>> optimizer
> >>>> didn't know such information, and set a large parallelism for sink
> when
> >>>> matching the parallelism of its input.
> >>>>
> >>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <al...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Thanks for the proposal! I think the use cases that we are trying to
> >>>>> solve are indeed valid. However, I think we might have to take a step
> >>>>> back to look at what we're trying to solve and how we can solve it.
> >>>>>
> >>>>> The FLIP seems to have two broader topics: 1) add "get parallelism"
> to
> >>>>> sinks/sources 2) let users write DataStream topologies for
> >>>>> sinks/sources. I'll treat them separately below.
> >>>>>
> >>>>> I think we should not add "get parallelism" to the Table Sink API
> >>>>> because I think it's the wrong level of abstraction. The Table API
> >>>>> connectors are (or should be) more or less thin wrappers around
> >>>>> "physical" connectors. By "physical" I mean the underlying (mostly
> >>>>> DataStream API) connectors. For example, with the Kafka Connector the
> >>>>> Table API connector just does the configuration parsing and
> determines
> >>> a
> >>>>> good (de)serialization format and then creates the underlying
> >>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
> >>>>>
> >>>>> If we wanted to add a "get parallelism" it would be in those
> underlying
> >>>>> connectors but I'm also skeptical about adding such a method there
> >>>>> because it is a static assignment and would preclude clever
> >>>>> optimizations about the parallelism of a connector at runtime. But
> >>> maybe
> >>>>> that's thinking too much about future work so I'm open to discussion
> >>>> there.
> >>>>>
> >>>>> Regarding the second point of letting Table connector developers use
> >>>>> DataStream: I think we should not do it. One of the purposes of
> FLIP-95
> >>>>> [1] was to decouple the Table API from the DataStream API for the
> basic
> >>>>> interfaces. Coupling the two too closely at that basic level will
> make
> >>>>> our live harder in the future when we want to evolve those APIs or
> when
> >>>>> we want the system to be better at choosing how to execute sources
> and
> >>>>> sinks. An example of this is actually the past of the Table API.
> Before
> >>>>> FLIP-95 we had connectors that dealt directly with DataSet and
> >>>>> DataStream, meaning that if users wanted their Table Sink to work in
> >>>>> both BATCH and STREAMING mode they had to provide two
> implementations.
> >>>>> The trend is towards unifying the sources/sinks to common interfaces
> >>>>> that can be used for both BATCH and STREAMING execution but, again, I
> >>>>> think exposing DataStream here would be a step back in the wrong
> >>>> direction.
> >>>>>
> >>>>> I think the solution to the existing user requirement of using
> >>>>> DataStream sources and sinks with the Table API should be better
> >>>>> interoperability between the two APIs, which is being tackled right
> now
> >>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> >>>>> we're trying to solve here, maybe we should think about FLIP-136 some
> >>>> more.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Best,
> >>>>> Aljoscha
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >>>>> [2]
> >>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>>
> >>
> >>
> >> --
> >> Best regards!
> >> Rui Li
> >
>
>

-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I'll only respond regarding the parallelism for now because I need to 
think some more about DataStream.

What I'm saying is that exposing a parallelism only for Table Connectors 
is not the right thing. If we want to allow sources to tell the 
system/framework what would be a good parallelism it would be at the 
underlying level.

I'll explain with the SourceFunction. A Table API Source connector is 
basically a factory that will give you a SourceFunction that corresponds 
to whatever the user configured via properties and other means. If the 
Table Connector somehow happens to know what would be a good parallelism 
for the source it could "tell" the source when creating it, i.e.

   new MyRealSource(path, and, whatnot, parallelismHint)

Then the source could either work with that information it got, by 
shutting down (at runtime) some of its parallel instances. Or we could 
extend the Source (SourceFunction) API to expose a "parallelism hint" to 
the system.

The basic thing is that Table Connectors are not the real connectors, 
they just delegate to underlying real connectors. So those underlying 
connectors are where we need to change things. Otherwise we would just 
have special-case solutions for the Table API.

Best,
Aljoscha

On 25.09.20 14:30, admin wrote:
> Hi everyone,
> Thanks for the proposal.
> 
> In our company,we meet the same situation as @liu shouwei.
> We developed some features base on flink.Such as parallelism of sql source/sink  connector, and kafka delay consumer which is adding a flatmap and a keyby transformation after the source Datastream.
> What make us embarrassing is that when we migrate this features to Flink 1.11,we found that the DataSteam is missing,So we modify the blink’s code to support parallelism.But kafka delay comsumer is unsolved until now.
> 
>  From user’s perspective,it necessary to manipulate DataStream or have the interoperability between Table API and DataStream.
> 
> Best
> 
> 
> 
>> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
>>
>> Hi Jingsong,
>>
>> Thanks for driving this effort. I have two minor comments.
>>
>>
>>    1. IMHO, parallelism is a concept that applies to all ScanTableSource.
>>    So instead of defining a new interface, is it more natural to incorporate
>>    parallel inference to existing interfaces, e.g. ScanTableSource
>>    or ScanRuntimeProvider?
>>    2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. From
>>    a user's perspective, parallelism is either set by `scan.parallelism`, or
>>    automatically decided by Flink. If a user doesn't want the connector to
>>    infer parallelism, he/she can simply set `scan.parallelism`, no?
>>
>>
>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thank you for your feedback,
>>>
>>> ## Connector parallelism
>>>
>>> Requirements:
>>> Set parallelism by user specified or inferred by connector.
>>>
>>> How to configure parallelism in DataStream:
>>> In the DataStream world, the only way to configure parallelism is
>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to have
>>> access to DataStream when using a connector, not just the `SourceFunction`
>>> / `Source` interface.
>>> Is parallelism related to connectors? I think yes, there are many
>>> connectors that can support obtaining parallelism related information from
>>> them, and users do exactly that. This means parallelism inference (From
>>> connectors).
>>> The key is that `DataStream` is an open programming API, and users can
>>> freely program to set parallelism.
>>>
>>> How to configure parallelism in Table/SQL:
>>> But Table/SQL is not an open programming API, every feature needs a
>>> corresponding mechanism, because the user is no longer able to program. Our
>>> current connector interface: SourceFunctionProvider, SinkFunctionProvider,
>>> through these interfaces, there is no ability to generate connector related
>>> parallelism.
>>> Back to our original intention: to avoid users directly manipulating
>>> `DataStream`. Since we want to avoid it, we need to provide corresponding
>>> features.
>>>
>>> And parallelism is the runtime information of connectors, It fits the name
>>> of `ScanRuntimeProvider`.
>>>
>>>> If we wanted to add a "get parallelism" it would be in those underlying
>>> connectors but I'm also skeptical about adding such a method there because
>>> it is a static assignment and would preclude clever optimizations about the
>>> parallelism of a connector at runtime.
>>>
>>> I think that when a job is submitted, it is in compile time. It should only
>>> provide static parallelism.
>>>
>>> ## DataStream in table connector
>>>
>>> As I said before, if we want to completely cancel DataStream in the table
>>> connector, we need to provide corresponding functions in
>>> `xxRuntimeProvider`.
>>> Otherwise, we and users may not be able to migrate the old connectors.
>>> Including Hive/FileSystem connectors and the user cases I mentioned above.
>>> CC: @liu shouwei
>>>
>>> We really need to consider these cases.
>>> If there is no alternative in a short period of time, for a long
>>> time, users need to continue to use the old table connector API, which has
>>> been deprecated.
>>>
>>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
>>> - These tables are just temporary tables. Can not be integrated/stored into
>>> Catalog.
>>> - Creating table DDL can not work...
>>> - We need to lose the kinds of useful features of Table/SQL on the
>>> connector. For example, projection pushdown, filter pushdown, partitions
>>> and etc...
>>>
>>> But I believe you are right in the long run. The source and sink APIs
>>> should be powerful enough to cover all reasonable cases.
>>> Maybe we can just introduce them in a minimal way. For example, we only
>>> introduce `DataStreamSinkProvider` in planner as an internal API.
>>>
>>> Your points are very meaningful, hope to get your reply.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <we...@gmail.com>
>>> wrote:
>>>
>>>> Hi,Aljoscha, I would like to share a use case to second setting
>>> parallelism
>>>> of table sink(or limiting parallelism range of table sink): When writing
>>>> data to databases, there is limitation for number of jdbc connections and
>>>> query TPS. we would get errors of too many connections or high load for
>>>> db and poor performance because of too many small requests if the
>>> optimizer
>>>> didn't know such information, and set a large parallelism for sink when
>>>> matching the parallelism of its input.
>>>>
>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> Thanks for the proposal! I think the use cases that we are trying to
>>>>> solve are indeed valid. However, I think we might have to take a step
>>>>> back to look at what we're trying to solve and how we can solve it.
>>>>>
>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism" to
>>>>> sinks/sources 2) let users write DataStream topologies for
>>>>> sinks/sources. I'll treat them separately below.
>>>>>
>>>>> I think we should not add "get parallelism" to the Table Sink API
>>>>> because I think it's the wrong level of abstraction. The Table API
>>>>> connectors are (or should be) more or less thin wrappers around
>>>>> "physical" connectors. By "physical" I mean the underlying (mostly
>>>>> DataStream API) connectors. For example, with the Kafka Connector the
>>>>> Table API connector just does the configuration parsing and determines
>>> a
>>>>> good (de)serialization format and then creates the underlying
>>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
>>>>>
>>>>> If we wanted to add a "get parallelism" it would be in those underlying
>>>>> connectors but I'm also skeptical about adding such a method there
>>>>> because it is a static assignment and would preclude clever
>>>>> optimizations about the parallelism of a connector at runtime. But
>>> maybe
>>>>> that's thinking too much about future work so I'm open to discussion
>>>> there.
>>>>>
>>>>> Regarding the second point of letting Table connector developers use
>>>>> DataStream: I think we should not do it. One of the purposes of FLIP-95
>>>>> [1] was to decouple the Table API from the DataStream API for the basic
>>>>> interfaces. Coupling the two too closely at that basic level will make
>>>>> our live harder in the future when we want to evolve those APIs or when
>>>>> we want the system to be better at choosing how to execute sources and
>>>>> sinks. An example of this is actually the past of the Table API. Before
>>>>> FLIP-95 we had connectors that dealt directly with DataSet and
>>>>> DataStream, meaning that if users wanted their Table Sink to work in
>>>>> both BATCH and STREAMING mode they had to provide two implementations.
>>>>> The trend is towards unifying the sources/sinks to common interfaces
>>>>> that can be used for both BATCH and STREAMING execution but, again, I
>>>>> think exposing DataStream here would be a step back in the wrong
>>>> direction.
>>>>>
>>>>> I think the solution to the existing user requirement of using
>>>>> DataStream sources and sinks with the Table API should be better
>>>>> interoperability between the two APIs, which is being tackled right now
>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
>>>>> we're trying to solve here, maybe we should think about FLIP-136 some
>>>> more.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> [1]
>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>>> [2]
>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> -- 
>> Best regards!
>> Rui Li
> 


Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by admin <17...@163.com>.
Hi everyone,
Thanks for the proposal.

In our company,we meet the same situation as @liu shouwei.
We developed some features base on flink.Such as parallelism of sql source/sink  connector, and kafka delay consumer which is adding a flatmap and a keyby transformation after the source Datastream.
What make us embarrassing is that when we migrate this features to Flink 1.11,we found that the DataSteam is missing,So we modify the blink’s code to support parallelism.But kafka delay comsumer is unsolved until now.

From user’s perspective,it necessary to manipulate DataStream or have the interoperability between Table API and DataStream.

Best



> 2020年9月25日 下午4:18,Rui Li <li...@gmail.com> 写道:
> 
> Hi Jingsong,
> 
> Thanks for driving this effort. I have two minor comments.
> 
> 
>   1. IMHO, parallelism is a concept that applies to all ScanTableSource.
>   So instead of defining a new interface, is it more natural to incorporate
>   parallel inference to existing interfaces, e.g. ScanTableSource
>   or ScanRuntimeProvider?
>   2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. From
>   a user's perspective, parallelism is either set by `scan.parallelism`, or
>   automatically decided by Flink. If a user doesn't want the connector to
>   infer parallelism, he/she can simply set `scan.parallelism`, no?
> 
> 
> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com> wrote:
> 
>> Hi Aljoscha,
>> 
>> Thank you for your feedback,
>> 
>> ## Connector parallelism
>> 
>> Requirements:
>> Set parallelism by user specified or inferred by connector.
>> 
>> How to configure parallelism in DataStream:
>> In the DataStream world, the only way to configure parallelism is
>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to have
>> access to DataStream when using a connector, not just the `SourceFunction`
>> / `Source` interface.
>> Is parallelism related to connectors? I think yes, there are many
>> connectors that can support obtaining parallelism related information from
>> them, and users do exactly that. This means parallelism inference (From
>> connectors).
>> The key is that `DataStream` is an open programming API, and users can
>> freely program to set parallelism.
>> 
>> How to configure parallelism in Table/SQL:
>> But Table/SQL is not an open programming API, every feature needs a
>> corresponding mechanism, because the user is no longer able to program. Our
>> current connector interface: SourceFunctionProvider, SinkFunctionProvider,
>> through these interfaces, there is no ability to generate connector related
>> parallelism.
>> Back to our original intention: to avoid users directly manipulating
>> `DataStream`. Since we want to avoid it, we need to provide corresponding
>> features.
>> 
>> And parallelism is the runtime information of connectors, It fits the name
>> of `ScanRuntimeProvider`.
>> 
>>> If we wanted to add a "get parallelism" it would be in those underlying
>> connectors but I'm also skeptical about adding such a method there because
>> it is a static assignment and would preclude clever optimizations about the
>> parallelism of a connector at runtime.
>> 
>> I think that when a job is submitted, it is in compile time. It should only
>> provide static parallelism.
>> 
>> ## DataStream in table connector
>> 
>> As I said before, if we want to completely cancel DataStream in the table
>> connector, we need to provide corresponding functions in
>> `xxRuntimeProvider`.
>> Otherwise, we and users may not be able to migrate the old connectors.
>> Including Hive/FileSystem connectors and the user cases I mentioned above.
>> CC: @liu shouwei
>> 
>> We really need to consider these cases.
>> If there is no alternative in a short period of time, for a long
>> time, users need to continue to use the old table connector API, which has
>> been deprecated.
>> 
>> Why not use StreamTableEnvironment fromDataStream/toDataStream?
>> - These tables are just temporary tables. Can not be integrated/stored into
>> Catalog.
>> - Creating table DDL can not work...
>> - We need to lose the kinds of useful features of Table/SQL on the
>> connector. For example, projection pushdown, filter pushdown, partitions
>> and etc...
>> 
>> But I believe you are right in the long run. The source and sink APIs
>> should be powerful enough to cover all reasonable cases.
>> Maybe we can just introduce them in a minimal way. For example, we only
>> introduce `DataStreamSinkProvider` in planner as an internal API.
>> 
>> Your points are very meaningful, hope to get your reply.
>> 
>> Best,
>> Jingsong
>> 
>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <we...@gmail.com>
>> wrote:
>> 
>>> Hi,Aljoscha, I would like to share a use case to second setting
>> parallelism
>>> of table sink(or limiting parallelism range of table sink): When writing
>>> data to databases, there is limitation for number of jdbc connections and
>>> query TPS. we would get errors of too many connections or high load for
>>> db and poor performance because of too many small requests if the
>> optimizer
>>> didn't know such information, and set a large parallelism for sink when
>>> matching the parallelism of its input.
>>> 
>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>> 
>>>> Thanks for the proposal! I think the use cases that we are trying to
>>>> solve are indeed valid. However, I think we might have to take a step
>>>> back to look at what we're trying to solve and how we can solve it.
>>>> 
>>>> The FLIP seems to have two broader topics: 1) add "get parallelism" to
>>>> sinks/sources 2) let users write DataStream topologies for
>>>> sinks/sources. I'll treat them separately below.
>>>> 
>>>> I think we should not add "get parallelism" to the Table Sink API
>>>> because I think it's the wrong level of abstraction. The Table API
>>>> connectors are (or should be) more or less thin wrappers around
>>>> "physical" connectors. By "physical" I mean the underlying (mostly
>>>> DataStream API) connectors. For example, with the Kafka Connector the
>>>> Table API connector just does the configuration parsing and determines
>> a
>>>> good (de)serialization format and then creates the underlying
>>>> FlinkKafkaConsumer/FlinkKafkaProducer.
>>>> 
>>>> If we wanted to add a "get parallelism" it would be in those underlying
>>>> connectors but I'm also skeptical about adding such a method there
>>>> because it is a static assignment and would preclude clever
>>>> optimizations about the parallelism of a connector at runtime. But
>> maybe
>>>> that's thinking too much about future work so I'm open to discussion
>>> there.
>>>> 
>>>> Regarding the second point of letting Table connector developers use
>>>> DataStream: I think we should not do it. One of the purposes of FLIP-95
>>>> [1] was to decouple the Table API from the DataStream API for the basic
>>>> interfaces. Coupling the two too closely at that basic level will make
>>>> our live harder in the future when we want to evolve those APIs or when
>>>> we want the system to be better at choosing how to execute sources and
>>>> sinks. An example of this is actually the past of the Table API. Before
>>>> FLIP-95 we had connectors that dealt directly with DataSet and
>>>> DataStream, meaning that if users wanted their Table Sink to work in
>>>> both BATCH and STREAMING mode they had to provide two implementations.
>>>> The trend is towards unifying the sources/sinks to common interfaces
>>>> that can be used for both BATCH and STREAMING execution but, again, I
>>>> think exposing DataStream here would be a step back in the wrong
>>> direction.
>>>> 
>>>> I think the solution to the existing user requirement of using
>>>> DataStream sources and sinks with the Table API should be better
>>>> interoperability between the two APIs, which is being tackled right now
>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
>>>> we're trying to solve here, maybe we should think about FLIP-136 some
>>> more.
>>>> 
>>>> What do you think?
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>> [1]
>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>>>> [2]
>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>> 
>>>> 
>>> 
>> 
>> 
>> --
>> Best, Jingsong Lee
>> 
> 
> 
> -- 
> Best regards!
> Rui Li


Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Rui Li <li...@gmail.com>.
Hi Jingsong,

Thanks for driving this effort. I have two minor comments.


   1. IMHO, parallelism is a concept that applies to all ScanTableSource.
   So instead of defining a new interface, is it more natural to incorporate
   parallel inference to existing interfaces, e.g. ScanTableSource
   or ScanRuntimeProvider?
   2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. From
   a user's perspective, parallelism is either set by `scan.parallelism`, or
   automatically decided by Flink. If a user doesn't want the connector to
   infer parallelism, he/she can simply set `scan.parallelism`, no?


On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <ji...@gmail.com> wrote:

> Hi Aljoscha,
>
> Thank you for your feedback,
>
> ## Connector parallelism
>
> Requirements:
> Set parallelism by user specified or inferred by connector.
>
> How to configure parallelism in DataStream:
> In the DataStream world, the only way to configure parallelism is
> `SingleOutputStreamOperator.setParallelism`. Actually, users need to have
> access to DataStream when using a connector, not just the `SourceFunction`
> / `Source` interface.
> Is parallelism related to connectors? I think yes, there are many
> connectors that can support obtaining parallelism related information from
> them, and users do exactly that. This means parallelism inference (From
> connectors).
> The key is that `DataStream` is an open programming API, and users can
> freely program to set parallelism.
>
> How to configure parallelism in Table/SQL:
> But Table/SQL is not an open programming API, every feature needs a
> corresponding mechanism, because the user is no longer able to program. Our
> current connector interface: SourceFunctionProvider, SinkFunctionProvider,
> through these interfaces, there is no ability to generate connector related
> parallelism.
> Back to our original intention: to avoid users directly manipulating
> `DataStream`. Since we want to avoid it, we need to provide corresponding
> features.
>
> And parallelism is the runtime information of connectors, It fits the name
> of `ScanRuntimeProvider`.
>
> > If we wanted to add a "get parallelism" it would be in those underlying
> connectors but I'm also skeptical about adding such a method there because
> it is a static assignment and would preclude clever optimizations about the
> parallelism of a connector at runtime.
>
> I think that when a job is submitted, it is in compile time. It should only
> provide static parallelism.
>
> ## DataStream in table connector
>
> As I said before, if we want to completely cancel DataStream in the table
> connector, we need to provide corresponding functions in
> `xxRuntimeProvider`.
> Otherwise, we and users may not be able to migrate the old connectors.
> Including Hive/FileSystem connectors and the user cases I mentioned above.
> CC: @liu shouwei
>
> We really need to consider these cases.
> If there is no alternative in a short period of time, for a long
> time, users need to continue to use the old table connector API, which has
> been deprecated.
>
> Why not use StreamTableEnvironment fromDataStream/toDataStream?
> - These tables are just temporary tables. Can not be integrated/stored into
> Catalog.
> - Creating table DDL can not work...
> - We need to lose the kinds of useful features of Table/SQL on the
> connector. For example, projection pushdown, filter pushdown, partitions
> and etc...
>
> But I believe you are right in the long run. The source and sink APIs
> should be powerful enough to cover all reasonable cases.
> Maybe we can just introduce them in a minimal way. For example, we only
> introduce `DataStreamSinkProvider` in planner as an internal API.
>
> Your points are very meaningful, hope to get your reply.
>
> Best,
> Jingsong
>
> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <we...@gmail.com>
> wrote:
>
> > Hi,Aljoscha, I would like to share a use case to second setting
> parallelism
> > of table sink(or limiting parallelism range of table sink): When writing
> > data to databases, there is limitation for number of jdbc connections and
> > query TPS. we would get errors of too many connections or high load for
> > db and poor performance because of too many small requests if the
> optimizer
> > didn't know such information, and set a large parallelism for sink when
> > matching the parallelism of its input.
> >
> > On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Thanks for the proposal! I think the use cases that we are trying to
> > > solve are indeed valid. However, I think we might have to take a step
> > > back to look at what we're trying to solve and how we can solve it.
> > >
> > > The FLIP seems to have two broader topics: 1) add "get parallelism" to
> > > sinks/sources 2) let users write DataStream topologies for
> > > sinks/sources. I'll treat them separately below.
> > >
> > > I think we should not add "get parallelism" to the Table Sink API
> > > because I think it's the wrong level of abstraction. The Table API
> > > connectors are (or should be) more or less thin wrappers around
> > > "physical" connectors. By "physical" I mean the underlying (mostly
> > > DataStream API) connectors. For example, with the Kafka Connector the
> > > Table API connector just does the configuration parsing and determines
> a
> > > good (de)serialization format and then creates the underlying
> > > FlinkKafkaConsumer/FlinkKafkaProducer.
> > >
> > > If we wanted to add a "get parallelism" it would be in those underlying
> > > connectors but I'm also skeptical about adding such a method there
> > > because it is a static assignment and would preclude clever
> > > optimizations about the parallelism of a connector at runtime. But
> maybe
> > > that's thinking too much about future work so I'm open to discussion
> > there.
> > >
> > > Regarding the second point of letting Table connector developers use
> > > DataStream: I think we should not do it. One of the purposes of FLIP-95
> > > [1] was to decouple the Table API from the DataStream API for the basic
> > > interfaces. Coupling the two too closely at that basic level will make
> > > our live harder in the future when we want to evolve those APIs or when
> > > we want the system to be better at choosing how to execute sources and
> > > sinks. An example of this is actually the past of the Table API. Before
> > > FLIP-95 we had connectors that dealt directly with DataSet and
> > > DataStream, meaning that if users wanted their Table Sink to work in
> > > both BATCH and STREAMING mode they had to provide two implementations.
> > > The trend is towards unifying the sources/sinks to common interfaces
> > > that can be used for both BATCH and STREAMING execution but, again, I
> > > think exposing DataStream here would be a step back in the wrong
> > direction.
> > >
> > > I think the solution to the existing user requirement of using
> > > DataStream sources and sinks with the Table API should be better
> > > interoperability between the two APIs, which is being tackled right now
> > > in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> > > we're trying to solve here, maybe we should think about FLIP-136 some
> > more.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> > >
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best regards!
Rui Li

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Jingsong Li <ji...@gmail.com>.
Hi Aljoscha,

Thank you for your feedback,

## Connector parallelism

Requirements:
Set parallelism by user specified or inferred by connector.

How to configure parallelism in DataStream:
In the DataStream world, the only way to configure parallelism is
`SingleOutputStreamOperator.setParallelism`. Actually, users need to have
access to DataStream when using a connector, not just the `SourceFunction`
/ `Source` interface.
Is parallelism related to connectors? I think yes, there are many
connectors that can support obtaining parallelism related information from
them, and users do exactly that. This means parallelism inference (From
connectors).
The key is that `DataStream` is an open programming API, and users can
freely program to set parallelism.

How to configure parallelism in Table/SQL:
But Table/SQL is not an open programming API, every feature needs a
corresponding mechanism, because the user is no longer able to program. Our
current connector interface: SourceFunctionProvider, SinkFunctionProvider,
through these interfaces, there is no ability to generate connector related
parallelism.
Back to our original intention: to avoid users directly manipulating
`DataStream`. Since we want to avoid it, we need to provide corresponding
features.

And parallelism is the runtime information of connectors, It fits the name
of `ScanRuntimeProvider`.

> If we wanted to add a "get parallelism" it would be in those underlying
connectors but I'm also skeptical about adding such a method there because
it is a static assignment and would preclude clever optimizations about the
parallelism of a connector at runtime.

I think that when a job is submitted, it is in compile time. It should only
provide static parallelism.

## DataStream in table connector

As I said before, if we want to completely cancel DataStream in the table
connector, we need to provide corresponding functions in
`xxRuntimeProvider`.
Otherwise, we and users may not be able to migrate the old connectors.
Including Hive/FileSystem connectors and the user cases I mentioned above.
CC: @liu shouwei

We really need to consider these cases.
If there is no alternative in a short period of time, for a long
time, users need to continue to use the old table connector API, which has
been deprecated.

Why not use StreamTableEnvironment fromDataStream/toDataStream?
- These tables are just temporary tables. Can not be integrated/stored into
Catalog.
- Creating table DDL can not work...
- We need to lose the kinds of useful features of Table/SQL on the
connector. For example, projection pushdown, filter pushdown, partitions
and etc...

But I believe you are right in the long run. The source and sink APIs
should be powerful enough to cover all reasonable cases.
Maybe we can just introduce them in a minimal way. For example, we only
introduce `DataStreamSinkProvider` in planner as an internal API.

Your points are very meaningful, hope to get your reply.

Best,
Jingsong

On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <we...@gmail.com>
wrote:

> Hi,Aljoscha, I would like to share a use case to second setting parallelism
> of table sink(or limiting parallelism range of table sink): When writing
> data to databases, there is limitation for number of jdbc connections and
> query TPS. we would get errors of too many connections or high load for
> db and poor performance because of too many small requests if the optimizer
> didn't know such information, and set a large parallelism for sink when
> matching the parallelism of its input.
>
> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Thanks for the proposal! I think the use cases that we are trying to
> > solve are indeed valid. However, I think we might have to take a step
> > back to look at what we're trying to solve and how we can solve it.
> >
> > The FLIP seems to have two broader topics: 1) add "get parallelism" to
> > sinks/sources 2) let users write DataStream topologies for
> > sinks/sources. I'll treat them separately below.
> >
> > I think we should not add "get parallelism" to the Table Sink API
> > because I think it's the wrong level of abstraction. The Table API
> > connectors are (or should be) more or less thin wrappers around
> > "physical" connectors. By "physical" I mean the underlying (mostly
> > DataStream API) connectors. For example, with the Kafka Connector the
> > Table API connector just does the configuration parsing and determines a
> > good (de)serialization format and then creates the underlying
> > FlinkKafkaConsumer/FlinkKafkaProducer.
> >
> > If we wanted to add a "get parallelism" it would be in those underlying
> > connectors but I'm also skeptical about adding such a method there
> > because it is a static assignment and would preclude clever
> > optimizations about the parallelism of a connector at runtime. But maybe
> > that's thinking too much about future work so I'm open to discussion
> there.
> >
> > Regarding the second point of letting Table connector developers use
> > DataStream: I think we should not do it. One of the purposes of FLIP-95
> > [1] was to decouple the Table API from the DataStream API for the basic
> > interfaces. Coupling the two too closely at that basic level will make
> > our live harder in the future when we want to evolve those APIs or when
> > we want the system to be better at choosing how to execute sources and
> > sinks. An example of this is actually the past of the Table API. Before
> > FLIP-95 we had connectors that dealt directly with DataSet and
> > DataStream, meaning that if users wanted their Table Sink to work in
> > both BATCH and STREAMING mode they had to provide two implementations.
> > The trend is towards unifying the sources/sinks to common interfaces
> > that can be used for both BATCH and STREAMING execution but, again, I
> > think exposing DataStream here would be a step back in the wrong
> direction.
> >
> > I think the solution to the existing user requirement of using
> > DataStream sources and sinks with the Table API should be better
> > interoperability between the two APIs, which is being tackled right now
> > in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> > we're trying to solve here, maybe we should think about FLIP-136 some
> more.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >
> >
>


-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by "wenlong.lwl" <we...@gmail.com>.
Hi,Aljoscha, I would like to share a use case to second setting parallelism
of table sink(or limiting parallelism range of table sink): When writing
data to databases, there is limitation for number of jdbc connections and
query TPS. we would get errors of too many connections or high load for
db and poor performance because of too many small requests if the optimizer
didn't know such information, and set a large parallelism for sink when
matching the parallelism of its input.

On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <al...@apache.org> wrote:

> Thanks for the proposal! I think the use cases that we are trying to
> solve are indeed valid. However, I think we might have to take a step
> back to look at what we're trying to solve and how we can solve it.
>
> The FLIP seems to have two broader topics: 1) add "get parallelism" to
> sinks/sources 2) let users write DataStream topologies for
> sinks/sources. I'll treat them separately below.
>
> I think we should not add "get parallelism" to the Table Sink API
> because I think it's the wrong level of abstraction. The Table API
> connectors are (or should be) more or less thin wrappers around
> "physical" connectors. By "physical" I mean the underlying (mostly
> DataStream API) connectors. For example, with the Kafka Connector the
> Table API connector just does the configuration parsing and determines a
> good (de)serialization format and then creates the underlying
> FlinkKafkaConsumer/FlinkKafkaProducer.
>
> If we wanted to add a "get parallelism" it would be in those underlying
> connectors but I'm also skeptical about adding such a method there
> because it is a static assignment and would preclude clever
> optimizations about the parallelism of a connector at runtime. But maybe
> that's thinking too much about future work so I'm open to discussion there.
>
> Regarding the second point of letting Table connector developers use
> DataStream: I think we should not do it. One of the purposes of FLIP-95
> [1] was to decouple the Table API from the DataStream API for the basic
> interfaces. Coupling the two too closely at that basic level will make
> our live harder in the future when we want to evolve those APIs or when
> we want the system to be better at choosing how to execute sources and
> sinks. An example of this is actually the past of the Table API. Before
> FLIP-95 we had connectors that dealt directly with DataSet and
> DataStream, meaning that if users wanted their Table Sink to work in
> both BATCH and STREAMING mode they had to provide two implementations.
> The trend is towards unifying the sources/sinks to common interfaces
> that can be used for both BATCH and STREAMING execution but, again, I
> think exposing DataStream here would be a step back in the wrong direction.
>
> I think the solution to the existing user requirement of using
> DataStream sources and sinks with the Table API should be better
> interoperability between the two APIs, which is being tackled right now
> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that
> we're trying to solve here, maybe we should think about FLIP-136 some more.
>
> What do you think?
>
> Best,
> Aljoscha
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>
>

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks for the proposal! I think the use cases that we are trying to 
solve are indeed valid. However, I think we might have to take a step 
back to look at what we're trying to solve and how we can solve it.

The FLIP seems to have two broader topics: 1) add "get parallelism" to 
sinks/sources 2) let users write DataStream topologies for 
sinks/sources. I'll treat them separately below.

I think we should not add "get parallelism" to the Table Sink API 
because I think it's the wrong level of abstraction. The Table API 
connectors are (or should be) more or less thin wrappers around 
"physical" connectors. By "physical" I mean the underlying (mostly 
DataStream API) connectors. For example, with the Kafka Connector the 
Table API connector just does the configuration parsing and determines a 
good (de)serialization format and then creates the underlying 
FlinkKafkaConsumer/FlinkKafkaProducer.

If we wanted to add a "get parallelism" it would be in those underlying 
connectors but I'm also skeptical about adding such a method there 
because it is a static assignment and would preclude clever 
optimizations about the parallelism of a connector at runtime. But maybe 
that's thinking too much about future work so I'm open to discussion there.

Regarding the second point of letting Table connector developers use 
DataStream: I think we should not do it. One of the purposes of FLIP-95 
[1] was to decouple the Table API from the DataStream API for the basic 
interfaces. Coupling the two too closely at that basic level will make 
our live harder in the future when we want to evolve those APIs or when 
we want the system to be better at choosing how to execute sources and 
sinks. An example of this is actually the past of the Table API. Before 
FLIP-95 we had connectors that dealt directly with DataSet and 
DataStream, meaning that if users wanted their Table Sink to work in 
both BATCH and STREAMING mode they had to provide two implementations. 
The trend is towards unifying the sources/sinks to common interfaces 
that can be used for both BATCH and STREAMING execution but, again, I 
think exposing DataStream here would be a step back in the wrong direction.

I think the solution to the existing user requirement of using 
DataStream sources and sinks with the Table API should be better 
interoperability between the two APIs, which is being tackled right now 
in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that 
we're trying to solve here, maybe we should think about FLIP-136 some more.

What do you think?

Best,
Aljoscha

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API


Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Benchao Li <li...@apache.org>.
Hi Jingsong,

Thanks for preparing this FLIP.
WRT ParallelismProvider, it looks good to me.

Kurt Young <yk...@gmail.com> 于2020年9月24日周四 下午4:14写道:

> Yeah, JDBC is definitely a popular use case we should consider.
>
> Best,
> Kurt
>
>
> On Thu, Sep 24, 2020 at 4:11 PM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> > Hi Kurt, in the past we had a very interesting use case in this regard:
> our
> > customer (oracle) db was quite big and running too many queries in
> parallel
> > was too heavy and it was causing the queries to fail.
> > So we had to limit the source parallelism to 2 threads. After the
> fetching
> > of the data the other operators could use the max parallelism as usual..
> >
> > Best,
> > Flavio
> >
> > On Thu, Sep 24, 2020 at 9:59 AM Kurt Young <yk...@gmail.com> wrote:
> >
> > > Thanks Jingsong for driving this, this is indeed a useful feature and
> > lots
> > > of users are asking for it.
> > >
> > > For setting a fixed source parallelism, I'm wondering whether this is
> > > necessary. For kafka,
> > > I can imagine users would expect Flink will use the number of
> partitions
> > as
> > > the parallelism. If it's too
> > > large, one can use the max parallelism to make it smaller.
> > > But for ES, which doesn't have ability to decide a reasonable
> parallelism
> > > on its own, it might make sense
> > > to introduce a user specified parallelism for such table source.
> > >
> > > So I think it would be better to reorganize the document a little bit,
> to
> > > explain the connectors one by one. Briefly
> > > introduce use cases and what kind of options are needed in your
> opinion.
> > >
> > > Regarding the interface `DataStreamScanProvider`, a concrete example
> > would
> > > help the discussion. What kind
> > > of scenarios do you want to support? And what kind of connectors need
> > such
> > > an interface?
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Sep 23, 2020 at 9:30 PM admin <17...@163.com> wrote:
> > >
> > > > +1,it’s a good news
> > > >
> > > > > 2020年9月23日 下午6:22,Jingsong Li <ji...@gmail.com> 写道:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I'd like to start a discussion about improving the new TableSource
> > and
> > > > > TableSink interfaces.
> > > > >
> > > > > Most connectors have been migrated to FLIP-95, but there are still
> > the
> > > > > Filesystem and Hive that have not been migrated. They have some
> > > > > requirements on table connector API. And users also have some
> > > additional
> > > > > requirements:
> > > > > - Some connectors have the ability to infer parallelism, the
> > > parallelism
> > > > is
> > > > > good for most cases.
> > > > > - Users have customized parallelism configuration requirements for
> > > source
> > > > > and sink.
> > > > > - The connectors need to use topology to build their source/sink
> > > instead
> > > > of
> > > > > a single function. Like JIRA[1], Partition Commit feature and File
> > > > > Compaction feature.
> > > > >
> > > > > Details are in [2].
> > > > >
> > > > > [1]https://issues.apache.org/jira/browse/FLINK-18674
> > > > > [2]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> > > > >
> > > > > Best,
> > > > > Jingsong
> > > >
> > > >
> >
>


-- 

Best,
Benchao Li

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Jingsong Li <ji...@gmail.com>.
Thanks Kurt and Flavio for your feedback.

To Kurt:

> Briefly introduce use cases and what kind of options are needed in your
opinion.

In the "Choose Scan Parallelism" chapter:
- I explained the user cases
- I adjusted the relationship to make user specified parallelism more
convenient

To Flavio:

Yes, you can configure `scan.infer-parallelism.max` or directly use
`scan.parallelism`.

To Kurt:

> Regarding the interface `DataStreamScanProvider`, a concrete example would
help the discussion.

From the user feedback in [1]. There are two users have similar following
feedback (CC: liu shouwei):
(It is from user-zh, Translate to English)

Briefly introduce the background. One of the tasks of our group is that
users write SQL on the page. We are responsible for converting SQL
processing into Flink jobs and running them on our platform. The conversion
process depends on our SQL SDK.
Let me give you a few examples that we often use and feel that the new API
1.11 is not easy to implement:
1. We now have a specific Kafka data format. One Kafka data will be
converted into n (n is a positive integer) row data. Our approach is to add
a process / flatmap phase to emit datastream to deal with this situation,
which is transparent to users.
2. At present, we have encapsulated some of our own sinks. We will add a
process / filter before the sink to perform buffer pool / micro batch /
data filtering functions.
3. Adjust or specify the source / sink parallelism to the user specified
value. We also do this on the datastream level.
4. For some special source sinks, they will be combined with keyby
operations (transparent to users). We also do this on the datastream level.

For example, in question 2 above, we can implement it in the sinkfunction,
but I personally think it may not be ideal in design. When designing and
arranging functions / operators, I am used to following the principle of
"single responsibility of operators". This is why I split multiple process
/ filter operators in front of sink functions instead of coupling these
functions to sink functions. On the other hand, without datastream, the
cost of migrating to the new API is relatively higher. Or, we have some
special reasons. When operators are arranged, we will modify the task chain
strategy. At this time, the flexibility of datastream is essential.

[1]https://issues.apache.org/jira/browse/FLINK-18674

Best,
Jingsong

On Thu, Sep 24, 2020 at 4:15 PM Kurt Young <yk...@gmail.com> wrote:

> Yeah, JDBC is definitely a popular use case we should consider.
>
> Best,
> Kurt
>
>
> On Thu, Sep 24, 2020 at 4:11 PM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> > Hi Kurt, in the past we had a very interesting use case in this regard:
> our
> > customer (oracle) db was quite big and running too many queries in
> parallel
> > was too heavy and it was causing the queries to fail.
> > So we had to limit the source parallelism to 2 threads. After the
> fetching
> > of the data the other operators could use the max parallelism as usual..
> >
> > Best,
> > Flavio
> >
> > On Thu, Sep 24, 2020 at 9:59 AM Kurt Young <yk...@gmail.com> wrote:
> >
> > > Thanks Jingsong for driving this, this is indeed a useful feature and
> > lots
> > > of users are asking for it.
> > >
> > > For setting a fixed source parallelism, I'm wondering whether this is
> > > necessary. For kafka,
> > > I can imagine users would expect Flink will use the number of
> partitions
> > as
> > > the parallelism. If it's too
> > > large, one can use the max parallelism to make it smaller.
> > > But for ES, which doesn't have ability to decide a reasonable
> parallelism
> > > on its own, it might make sense
> > > to introduce a user specified parallelism for such table source.
> > >
> > > So I think it would be better to reorganize the document a little bit,
> to
> > > explain the connectors one by one. Briefly
> > > introduce use cases and what kind of options are needed in your
> opinion.
> > >
> > > Regarding the interface `DataStreamScanProvider`, a concrete example
> > would
> > > help the discussion. What kind
> > > of scenarios do you want to support? And what kind of connectors need
> > such
> > > an interface?
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Sep 23, 2020 at 9:30 PM admin <17...@163.com> wrote:
> > >
> > > > +1,it’s a good news
> > > >
> > > > > 2020年9月23日 下午6:22,Jingsong Li <ji...@gmail.com> 写道:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I'd like to start a discussion about improving the new TableSource
> > and
> > > > > TableSink interfaces.
> > > > >
> > > > > Most connectors have been migrated to FLIP-95, but there are still
> > the
> > > > > Filesystem and Hive that have not been migrated. They have some
> > > > > requirements on table connector API. And users also have some
> > > additional
> > > > > requirements:
> > > > > - Some connectors have the ability to infer parallelism, the
> > > parallelism
> > > > is
> > > > > good for most cases.
> > > > > - Users have customized parallelism configuration requirements for
> > > source
> > > > > and sink.
> > > > > - The connectors need to use topology to build their source/sink
> > > instead
> > > > of
> > > > > a single function. Like JIRA[1], Partition Commit feature and File
> > > > > Compaction feature.
> > > > >
> > > > > Details are in [2].
> > > > >
> > > > > [1]https://issues.apache.org/jira/browse/FLINK-18674
> > > > > [2]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> > > > >
> > > > > Best,
> > > > > Jingsong
> > > >
> > > >
> >
>


-- 
Best, Jingsong Lee

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Kurt Young <yk...@gmail.com>.
Yeah, JDBC is definitely a popular use case we should consider.

Best,
Kurt


On Thu, Sep 24, 2020 at 4:11 PM Flavio Pompermaier <po...@okkam.it>
wrote:

> Hi Kurt, in the past we had a very interesting use case in this regard: our
> customer (oracle) db was quite big and running too many queries in parallel
> was too heavy and it was causing the queries to fail.
> So we had to limit the source parallelism to 2 threads. After the fetching
> of the data the other operators could use the max parallelism as usual..
>
> Best,
> Flavio
>
> On Thu, Sep 24, 2020 at 9:59 AM Kurt Young <yk...@gmail.com> wrote:
>
> > Thanks Jingsong for driving this, this is indeed a useful feature and
> lots
> > of users are asking for it.
> >
> > For setting a fixed source parallelism, I'm wondering whether this is
> > necessary. For kafka,
> > I can imagine users would expect Flink will use the number of partitions
> as
> > the parallelism. If it's too
> > large, one can use the max parallelism to make it smaller.
> > But for ES, which doesn't have ability to decide a reasonable parallelism
> > on its own, it might make sense
> > to introduce a user specified parallelism for such table source.
> >
> > So I think it would be better to reorganize the document a little bit, to
> > explain the connectors one by one. Briefly
> > introduce use cases and what kind of options are needed in your opinion.
> >
> > Regarding the interface `DataStreamScanProvider`, a concrete example
> would
> > help the discussion. What kind
> > of scenarios do you want to support? And what kind of connectors need
> such
> > an interface?
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 23, 2020 at 9:30 PM admin <17...@163.com> wrote:
> >
> > > +1,it’s a good news
> > >
> > > > 2020年9月23日 下午6:22,Jingsong Li <ji...@gmail.com> 写道:
> > > >
> > > > Hi all,
> > > >
> > > > I'd like to start a discussion about improving the new TableSource
> and
> > > > TableSink interfaces.
> > > >
> > > > Most connectors have been migrated to FLIP-95, but there are still
> the
> > > > Filesystem and Hive that have not been migrated. They have some
> > > > requirements on table connector API. And users also have some
> > additional
> > > > requirements:
> > > > - Some connectors have the ability to infer parallelism, the
> > parallelism
> > > is
> > > > good for most cases.
> > > > - Users have customized parallelism configuration requirements for
> > source
> > > > and sink.
> > > > - The connectors need to use topology to build their source/sink
> > instead
> > > of
> > > > a single function. Like JIRA[1], Partition Commit feature and File
> > > > Compaction feature.
> > > >
> > > > Details are in [2].
> > > >
> > > > [1]https://issues.apache.org/jira/browse/FLINK-18674
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> > > >
> > > > Best,
> > > > Jingsong
> > >
> > >
>

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Kurt, in the past we had a very interesting use case in this regard: our
customer (oracle) db was quite big and running too many queries in parallel
was too heavy and it was causing the queries to fail.
So we had to limit the source parallelism to 2 threads. After the fetching
of the data the other operators could use the max parallelism as usual..

Best,
Flavio

On Thu, Sep 24, 2020 at 9:59 AM Kurt Young <yk...@gmail.com> wrote:

> Thanks Jingsong for driving this, this is indeed a useful feature and lots
> of users are asking for it.
>
> For setting a fixed source parallelism, I'm wondering whether this is
> necessary. For kafka,
> I can imagine users would expect Flink will use the number of partitions as
> the parallelism. If it's too
> large, one can use the max parallelism to make it smaller.
> But for ES, which doesn't have ability to decide a reasonable parallelism
> on its own, it might make sense
> to introduce a user specified parallelism for such table source.
>
> So I think it would be better to reorganize the document a little bit, to
> explain the connectors one by one. Briefly
> introduce use cases and what kind of options are needed in your opinion.
>
> Regarding the interface `DataStreamScanProvider`, a concrete example would
> help the discussion. What kind
> of scenarios do you want to support? And what kind of connectors need such
> an interface?
>
> Best,
> Kurt
>
>
> On Wed, Sep 23, 2020 at 9:30 PM admin <17...@163.com> wrote:
>
> > +1,it’s a good news
> >
> > > 2020年9月23日 下午6:22,Jingsong Li <ji...@gmail.com> 写道:
> > >
> > > Hi all,
> > >
> > > I'd like to start a discussion about improving the new TableSource and
> > > TableSink interfaces.
> > >
> > > Most connectors have been migrated to FLIP-95, but there are still the
> > > Filesystem and Hive that have not been migrated. They have some
> > > requirements on table connector API. And users also have some
> additional
> > > requirements:
> > > - Some connectors have the ability to infer parallelism, the
> parallelism
> > is
> > > good for most cases.
> > > - Users have customized parallelism configuration requirements for
> source
> > > and sink.
> > > - The connectors need to use topology to build their source/sink
> instead
> > of
> > > a single function. Like JIRA[1], Partition Commit feature and File
> > > Compaction feature.
> > >
> > > Details are in [2].
> > >
> > > [1]https://issues.apache.org/jira/browse/FLINK-18674
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> > >
> > > Best,
> > > Jingsong
> >
> >

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by Kurt Young <yk...@gmail.com>.
Thanks Jingsong for driving this, this is indeed a useful feature and lots
of users are asking for it.

For setting a fixed source parallelism, I'm wondering whether this is
necessary. For kafka,
I can imagine users would expect Flink will use the number of partitions as
the parallelism. If it's too
large, one can use the max parallelism to make it smaller.
But for ES, which doesn't have ability to decide a reasonable parallelism
on its own, it might make sense
to introduce a user specified parallelism for such table source.

So I think it would be better to reorganize the document a little bit, to
explain the connectors one by one. Briefly
introduce use cases and what kind of options are needed in your opinion.

Regarding the interface `DataStreamScanProvider`, a concrete example would
help the discussion. What kind
of scenarios do you want to support? And what kind of connectors need such
an interface?

Best,
Kurt


On Wed, Sep 23, 2020 at 9:30 PM admin <17...@163.com> wrote:

> +1,it’s a good news
>
> > 2020年9月23日 下午6:22,Jingsong Li <ji...@gmail.com> 写道:
> >
> > Hi all,
> >
> > I'd like to start a discussion about improving the new TableSource and
> > TableSink interfaces.
> >
> > Most connectors have been migrated to FLIP-95, but there are still the
> > Filesystem and Hive that have not been migrated. They have some
> > requirements on table connector API. And users also have some additional
> > requirements:
> > - Some connectors have the ability to infer parallelism, the parallelism
> is
> > good for most cases.
> > - Users have customized parallelism configuration requirements for source
> > and sink.
> > - The connectors need to use topology to build their source/sink instead
> of
> > a single function. Like JIRA[1], Partition Commit feature and File
> > Compaction feature.
> >
> > Details are in [2].
> >
> > [1]https://issues.apache.org/jira/browse/FLINK-18674
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> >
> > Best,
> > Jingsong
>
>

Re: [DISCUSS] FLIP-146: Improve new TableSource and TableSink interfaces

Posted by admin <17...@163.com>.
+1,it’s a good news

> 2020年9月23日 下午6:22,Jingsong Li <ji...@gmail.com> 写道:
> 
> Hi all,
> 
> I'd like to start a discussion about improving the new TableSource and
> TableSink interfaces.
> 
> Most connectors have been migrated to FLIP-95, but there are still the
> Filesystem and Hive that have not been migrated. They have some
> requirements on table connector API. And users also have some additional
> requirements:
> - Some connectors have the ability to infer parallelism, the parallelism is
> good for most cases.
> - Users have customized parallelism configuration requirements for source
> and sink.
> - The connectors need to use topology to build their source/sink instead of
> a single function. Like JIRA[1], Partition Commit feature and File
> Compaction feature.
> 
> Details are in [2].
> 
> [1]https://issues.apache.org/jira/browse/FLINK-18674
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces
> 
> Best,
> Jingsong