You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by santhosh venkat <sa...@gmail.com> on 2021/06/02 21:19:16 UTC

Integrating new connector with Flink SQL.

Hi,

Please correct me if I'm wrong anywhere. I'm just new to Flink and trying
to navigate the landscape.

Within my company, currently we're trying to develop a connector for our
internal change data capture system(brooklin) for flink. We are planning to
use Flink SQL as a primary API to build streaming applications.

When exploring flink contracts, we noticed that there are two different
flavors of APIs available in Flink for Source integration.

a) Flink Table API : The Flink ScanTableSource abstractions which are
currently relying upon the SourceFunction interfaces for integrating with
the underlying messaging-client libraries. For instance, KafkaDynamicSource
and KinesisDynamicSource are currently using the FlinkKafkaConsumer(an
implementation of RichParallelSourceFunction) and KinesisConsumer(an
implementation of RichParallelSourceFunction) respectively to read from the
broker.

b) FLIP-27 style connector implementations: There are connectors which
 implement SplitEnumerator and SourceReader abstractions, where the
Enumerator runs with the JobMaster and the Readers runs within the
TaskManager processes respectively.

Questions:

1. If I want to integrate a new connector and want to use Flink SQL, then
what is the recommendation? Are the users supposed to implement the
RichParallelSourceFunction, CheckpointListener etc similar to
FlinkKafkaConsumer and wire into the ScanTableSource API?

2. Just wondering, what is the long term plan for the ScanTablesource APIs?
Are there plans for them to use and integrate with the SplitEnumerator and
SourceReader abstractions?

3. If I want to offer my connector implementation to both Flink DataStream
and Flink SQL APIs, then should I implement both the flavors of source
APIs(SplitEnumerator/SourceReader as well as SourceFunction) in flink?

I would really appreciate it if someone can help and answer the above
questions.

Thanks.

Re: Integrating new connector with Flink SQL.

Posted by Jingsong Li <ji...@gmail.com>.
Hi santhosh,

1.Yes.

2.I'm very glad if you can contribute.

Best,
Jingsong

On Fri, Jun 4, 2021 at 1:13 AM santhosh venkat <sa...@gmail.com>
wrote:

> Hi, Jingsong,
>
> Thanks for taking time to respond to my questions. Really appreciate it.
>
> 1. Just to ensure we are on the same page, are you recommending us to
> implement Source, SourceReader and SplitEnumberator abstractions for the
> new source connector. And use either the DataStreamScanProvider or
> SourceProvider types in flink-table-api-bridge to integrate the
> Source(factory) implementation with ScanTableSource abstraction? Is that
> what you're recommending?
>
> 2. Also, if we integrate the Kafkadynamicsource in Flink table with the
> KafkaSourceReader and KafkaSplitEnumerator abstractions, then would it be
> possible for us to contribute it back to the community?
>
> Thanks.
>
> On Wed, Jun 2, 2021 at 7:36 PM Jingsong Li <ji...@gmail.com> wrote:
>
> > Hi santhosh,
> >
> > 1.I recommend you use the new source with ScanTablesource.
> >
> > 2.You can use `org.apache.flink.table.connector.source.SourceProvider` to
> > integrate to ScanTablesource. (Introduced in 1.12)
> >
> > 3.You can just implement a new source, this one can be used by both Flink
> > DataStream and Flink SQL. (As well as SourceFunction, it can be used by
> > both Flink DataStream and Flink SQL too)
> >
> > Actually, the connector of the table is just a wrapper of DataStream.
> They
> > should not have core differences.
> >
> > I believe we should migrate KafkaDynamicSource to the new
> > source KafkaSource in the Flink 1.14. Maybe @Qingsheng Ren is working on
> > this.
> >
> > Best,
> > Jingsong
> >
> > On Thu, Jun 3, 2021 at 5:19 AM santhosh venkat <
> > santhoshvenkat1988@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Please correct me if I'm wrong anywhere. I'm just new to Flink and
> trying
> > > to navigate the landscape.
> > >
> > > Within my company, currently we're trying to develop a connector for
> our
> > > internal change data capture system(brooklin) for flink. We are
> planning
> > to
> > > use Flink SQL as a primary API to build streaming applications.
> > >
> > > When exploring flink contracts, we noticed that there are two different
> > > flavors of APIs available in Flink for Source integration.
> > >
> > > a) Flink Table API : The Flink ScanTableSource abstractions which are
> > > currently relying upon the SourceFunction interfaces for integrating
> with
> > > the underlying messaging-client libraries. For instance,
> > KafkaDynamicSource
> > > and KinesisDynamicSource are currently using the FlinkKafkaConsumer(an
> > > implementation of RichParallelSourceFunction) and KinesisConsumer(an
> > > implementation of RichParallelSourceFunction) respectively to read from
> > the
> > > broker.
> > >
> > > b) FLIP-27 style connector implementations: There are connectors which
> > >  implement SplitEnumerator and SourceReader abstractions, where the
> > > Enumerator runs with the JobMaster and the Readers runs within the
> > > TaskManager processes respectively.
> > >
> > > Questions:
> > >
> > > 1. If I want to integrate a new connector and want to use Flink SQL,
> then
> > > what is the recommendation? Are the users supposed to implement the
> > > RichParallelSourceFunction, CheckpointListener etc similar to
> > > FlinkKafkaConsumer and wire into the ScanTableSource API?
> > >
> > > 2. Just wondering, what is the long term plan for the ScanTablesource
> > APIs?
> > > Are there plans for them to use and integrate with the SplitEnumerator
> > and
> > > SourceReader abstractions?
> > >
> > > 3. If I want to offer my connector implementation to both Flink
> > DataStream
> > > and Flink SQL APIs, then should I implement both the flavors of source
> > > APIs(SplitEnumerator/SourceReader as well as SourceFunction) in flink?
> > >
> > > I would really appreciate it if someone can help and answer the above
> > > questions.
> > >
> > > Thanks.
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee

Re: Integrating new connector with Flink SQL.

Posted by santhosh venkat <sa...@gmail.com>.
Hi, Jingsong,

Thanks for taking time to respond to my questions. Really appreciate it.

1. Just to ensure we are on the same page, are you recommending us to
implement Source, SourceReader and SplitEnumberator abstractions for the
new source connector. And use either the DataStreamScanProvider or
SourceProvider types in flink-table-api-bridge to integrate the
Source(factory) implementation with ScanTableSource abstraction? Is that
what you're recommending?

2. Also, if we integrate the Kafkadynamicsource in Flink table with the
KafkaSourceReader and KafkaSplitEnumerator abstractions, then would it be
possible for us to contribute it back to the community?

Thanks.

On Wed, Jun 2, 2021 at 7:36 PM Jingsong Li <ji...@gmail.com> wrote:

> Hi santhosh,
>
> 1.I recommend you use the new source with ScanTablesource.
>
> 2.You can use `org.apache.flink.table.connector.source.SourceProvider` to
> integrate to ScanTablesource. (Introduced in 1.12)
>
> 3.You can just implement a new source, this one can be used by both Flink
> DataStream and Flink SQL. (As well as SourceFunction, it can be used by
> both Flink DataStream and Flink SQL too)
>
> Actually, the connector of the table is just a wrapper of DataStream. They
> should not have core differences.
>
> I believe we should migrate KafkaDynamicSource to the new
> source KafkaSource in the Flink 1.14. Maybe @Qingsheng Ren is working on
> this.
>
> Best,
> Jingsong
>
> On Thu, Jun 3, 2021 at 5:19 AM santhosh venkat <
> santhoshvenkat1988@gmail.com>
> wrote:
>
> > Hi,
> >
> > Please correct me if I'm wrong anywhere. I'm just new to Flink and trying
> > to navigate the landscape.
> >
> > Within my company, currently we're trying to develop a connector for our
> > internal change data capture system(brooklin) for flink. We are planning
> to
> > use Flink SQL as a primary API to build streaming applications.
> >
> > When exploring flink contracts, we noticed that there are two different
> > flavors of APIs available in Flink for Source integration.
> >
> > a) Flink Table API : The Flink ScanTableSource abstractions which are
> > currently relying upon the SourceFunction interfaces for integrating with
> > the underlying messaging-client libraries. For instance,
> KafkaDynamicSource
> > and KinesisDynamicSource are currently using the FlinkKafkaConsumer(an
> > implementation of RichParallelSourceFunction) and KinesisConsumer(an
> > implementation of RichParallelSourceFunction) respectively to read from
> the
> > broker.
> >
> > b) FLIP-27 style connector implementations: There are connectors which
> >  implement SplitEnumerator and SourceReader abstractions, where the
> > Enumerator runs with the JobMaster and the Readers runs within the
> > TaskManager processes respectively.
> >
> > Questions:
> >
> > 1. If I want to integrate a new connector and want to use Flink SQL, then
> > what is the recommendation? Are the users supposed to implement the
> > RichParallelSourceFunction, CheckpointListener etc similar to
> > FlinkKafkaConsumer and wire into the ScanTableSource API?
> >
> > 2. Just wondering, what is the long term plan for the ScanTablesource
> APIs?
> > Are there plans for them to use and integrate with the SplitEnumerator
> and
> > SourceReader abstractions?
> >
> > 3. If I want to offer my connector implementation to both Flink
> DataStream
> > and Flink SQL APIs, then should I implement both the flavors of source
> > APIs(SplitEnumerator/SourceReader as well as SourceFunction) in flink?
> >
> > I would really appreciate it if someone can help and answer the above
> > questions.
> >
> > Thanks.
> >
>
>
> --
> Best, Jingsong Lee
>

Re: Integrating new connector with Flink SQL.

Posted by Jingsong Li <ji...@gmail.com>.
Hi santhosh,

1.I recommend you use the new source with ScanTablesource.

2.You can use `org.apache.flink.table.connector.source.SourceProvider` to
integrate to ScanTablesource. (Introduced in 1.12)

3.You can just implement a new source, this one can be used by both Flink
DataStream and Flink SQL. (As well as SourceFunction, it can be used by
both Flink DataStream and Flink SQL too)

Actually, the connector of the table is just a wrapper of DataStream. They
should not have core differences.

I believe we should migrate KafkaDynamicSource to the new
source KafkaSource in the Flink 1.14. Maybe @Qingsheng Ren is working on
this.

Best,
Jingsong

On Thu, Jun 3, 2021 at 5:19 AM santhosh venkat <sa...@gmail.com>
wrote:

> Hi,
>
> Please correct me if I'm wrong anywhere. I'm just new to Flink and trying
> to navigate the landscape.
>
> Within my company, currently we're trying to develop a connector for our
> internal change data capture system(brooklin) for flink. We are planning to
> use Flink SQL as a primary API to build streaming applications.
>
> When exploring flink contracts, we noticed that there are two different
> flavors of APIs available in Flink for Source integration.
>
> a) Flink Table API : The Flink ScanTableSource abstractions which are
> currently relying upon the SourceFunction interfaces for integrating with
> the underlying messaging-client libraries. For instance, KafkaDynamicSource
> and KinesisDynamicSource are currently using the FlinkKafkaConsumer(an
> implementation of RichParallelSourceFunction) and KinesisConsumer(an
> implementation of RichParallelSourceFunction) respectively to read from the
> broker.
>
> b) FLIP-27 style connector implementations: There are connectors which
>  implement SplitEnumerator and SourceReader abstractions, where the
> Enumerator runs with the JobMaster and the Readers runs within the
> TaskManager processes respectively.
>
> Questions:
>
> 1. If I want to integrate a new connector and want to use Flink SQL, then
> what is the recommendation? Are the users supposed to implement the
> RichParallelSourceFunction, CheckpointListener etc similar to
> FlinkKafkaConsumer and wire into the ScanTableSource API?
>
> 2. Just wondering, what is the long term plan for the ScanTablesource APIs?
> Are there plans for them to use and integrate with the SplitEnumerator and
> SourceReader abstractions?
>
> 3. If I want to offer my connector implementation to both Flink DataStream
> and Flink SQL APIs, then should I implement both the flavors of source
> APIs(SplitEnumerator/SourceReader as well as SourceFunction) in flink?
>
> I would really appreciate it if someone can help and answer the above
> questions.
>
> Thanks.
>


-- 
Best, Jingsong Lee