You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Ran Tao <ch...@gmail.com> on 2023/04/10 07:33:48 UTC

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Hi, devs. I want to reopen this discussion because some questions have
been solved or need more discussions.

In the previous discussion, there were some questions and problems.

@Timo
1.about option prefix, we decide to use identifiers. e.g.

```
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'source-identifiers'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='kafka',
 'realtime.topic' = 'test',
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'test',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv'
);
```

@Martijn Visser <ma...@apache.org>
1.table api usage

I updated the FLIP about table api usage.

2.how dynamic switched start timestamp works?

In this FLIP, we introduce 2 interfaces to support it.
If we open switched-start-position-enabled try to use dynamic switched
start timestamp, then first source split numerator needs to
implement SupportsGetEndTimestamp, next source needs to
implement SupportsSwitchedStartTimestamp.
We use SupportsGetEndTimestamp and SupportsSwitchedStartTimestamp to get
the previous bounded source end timestamp and apply it to the next
streaming source.

@John Roesler
1.source handoff

We both support Fixed-Start-Position And Switched-start-Position. The
default is Fixed-Start-Position. Use option switched-start-position-enabled
to control it.
In Fixed-Start-Position, the next streaming source uses its own startup
strategy, e.g. in kafka, we use predefined kafka scan.startup.mode in user
sql.
In Switched-start-Position, this is the same question as `how dynamic
switched start timestamp works` from @Martijn above. We offer
SupportsGetEndTimestamp interface to extract first source split enumerator
endTimestamp
and pass it to the next source. and Next source uses
SupportsSwitchedStartTimestamp to apply it.

2.more child sources

Yes, this is consistent with the hybrid source datastream api, there is no
limit on the number of children sources.
e.g. this is a 3 source case below.

```
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'source-identifiers'='historical01,historical02,realtime',
 'historical01.connector'='filesystem'
 'historical01.path' = '/tmp/a.csv',
 'historical01.format' = 'csv',
 'historical02.connector'='filesystem'
 'historical02.path' = '/tmp/a.csv',
 'historical02.format' = 'csv',
 'realtime.connector'='kafka',
 'realtime.topic' = 'test',
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'testGroup',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv'
);
```

more details can be found at [1] & [2].
Looking forward to your more concerns and opinions.

1.https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
2.https://github.com/apache/flink/pull/21841

Best Regards,
Ran Tao

Ran Tao <ch...@gmail.com> 于2022年12月15日周四 16:02写道:

> Hi guys. HybridSource is a good feature, but now released version did not
> support table & sql api for a long time.
>
> I have wrote a discussed FLIP.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
>
> Sorry for my unclear subject of previous email,  so here i have copied the
> respond from the Timo and sent this email.  look forward to your comments.
>
> ```
> Hi Ran,
>
> Thanks for proposing a FLIP. Btw according to the process, the subject
> of this email should be `[DISCUSS] FLIP-278: Hybrid Source Connector` so
> that people can identify this discussion as a FLIP discussion.
>
> Supporting the hybrid source for SQL was a long-standing issue on our
> roadmap. Happy to give feedback here:
>
> 1) Options
>
> Coming up with stable long-term options should be a shared effort.
> Having an index as a key could cause unintended side effects if the
> index is not correctly chosen, I would suggest we use IDs instead.
>
> What do you think about the following structure?
>
> CREATE TABLE ... WITH (
>    'sources'='historical;realtime',   -- Config option of type string list
>    'historical.connector' = 'filesystem',
>    'historical.path' = '/tmp/a.csv',
>    'historcal.format' = 'csv',
>    'realtime.path' = '/tmp/b.csv',
>    'realtime.format' = 'csv'"
> )
>
> I would limit the IDs to simple [a-z0-9_] identifiers. Once we support
> metadata columns, we can also propagate these IDs easily.
>
> 2) Schema field mappings
>
> The FLIP mentions `schema-field-mappings` could you elaborate on this in
> the document?
>
> 3) Start position strategies
>
> Have you thought about how we can represent start position strategies.
> The FLIP is very minimal but it would be nice to at least hear some
> opinions on this topic. Maybe we can come up with some general strategy
> that makes the most common use case possible in the near future.
>
> Thanks,
> Timo
> ```
>
> --
> Best Regards,
> Ran Tao
> https://github.com/chucheng92
>

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Ran Tao <ch...@gmail.com>.
Hi, devs.

I don't know if you have any other considerations for this FLIP. All
discussions are welcome.
If there are no other opinions in the near days, I will try to initiate a
vote. thank you all.


Best Regards,
Ran Tao


Ran Tao <ch...@gmail.com> 于2023年4月10日周一 15:33写道:

> Hi, devs. I want to reopen this discussion because some questions have
> been solved or need more discussions.
>
> In the previous discussion, there were some questions and problems.
>
> @Timo
> 1.about option prefix, we decide to use identifiers. e.g.
>
> ```
> create table hybrid_source(
>  f0 varchar,
>  f1 varchar,
>  f2 bigint
> ) with(
>  'connector'='hybrid',
>  'source-identifiers'='historical,realtime',
>  'historical.connector'='filesystem'
>  'historical.path' = '/tmp/a.csv',
>  'historical.format' = 'csv',
>  'realtime.connector'='kafka',
>  'realtime.topic' = 'test',
>  'realtime.properties.bootstrap.servers' = 'localhost:9092',
>  'realtime.properties.group.id' = 'test',
>  'realtime.scan.startup.mode' = 'earliest-offset',
>  'realtime.format' = 'csv'
> );
> ```
>
> @Martijn Visser <ma...@apache.org>
> 1.table api usage
>
> I updated the FLIP about table api usage.
>
> 2.how dynamic switched start timestamp works?
>
> In this FLIP, we introduce 2 interfaces to support it.
> If we open switched-start-position-enabled try to use dynamic switched
> start timestamp, then first source split numerator needs to
> implement SupportsGetEndTimestamp, next source needs to
> implement SupportsSwitchedStartTimestamp.
> We use SupportsGetEndTimestamp and SupportsSwitchedStartTimestamp to get
> the previous bounded source end timestamp and apply it to the next
> streaming source.
>
> @John Roesler
> 1.source handoff
>
> We both support Fixed-Start-Position And Switched-start-Position. The
> default is Fixed-Start-Position. Use option switched-start-position-enabled
> to control it.
> In Fixed-Start-Position, the next streaming source uses its own startup
> strategy, e.g. in kafka, we use predefined kafka scan.startup.mode in user
> sql.
> In Switched-start-Position, this is the same question as `how dynamic
> switched start timestamp works` from @Martijn above. We offer
> SupportsGetEndTimestamp interface to extract first source split enumerator
> endTimestamp
> and pass it to the next source. and Next source uses
> SupportsSwitchedStartTimestamp to apply it.
>
> 2.more child sources
>
> Yes, this is consistent with the hybrid source datastream api, there is no
> limit on the number of children sources.
> e.g. this is a 3 source case below.
>
> ```
> create table hybrid_source(
>  f0 varchar,
>  f1 varchar,
>  f2 bigint
> ) with(
>  'connector'='hybrid',
>  'source-identifiers'='historical01,historical02,realtime',
>  'historical01.connector'='filesystem'
>  'historical01.path' = '/tmp/a.csv',
>  'historical01.format' = 'csv',
>  'historical02.connector'='filesystem'
>  'historical02.path' = '/tmp/a.csv',
>  'historical02.format' = 'csv',
>  'realtime.connector'='kafka',
>  'realtime.topic' = 'test',
>  'realtime.properties.bootstrap.servers' = 'localhost:9092',
>  'realtime.properties.group.id' = 'testGroup',
>  'realtime.scan.startup.mode' = 'earliest-offset',
>  'realtime.format' = 'csv'
> );
> ```
>
> more details can be found at [1] & [2].
> Looking forward to your more concerns and opinions.
>
> 1.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
> 2.https://github.com/apache/flink/pull/21841
>
> Best Regards,
> Ran Tao
>
> Ran Tao <ch...@gmail.com> 于2022年12月15日周四 16:02写道:
>
>> Hi guys. HybridSource is a good feature, but now released version did not
>> support table & sql api for a long time.
>>
>> I have wrote a discussed FLIP.
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
>>
>> Sorry for my unclear subject of previous email,  so here i have copied
>> the respond from the Timo and sent this email.  look forward to your
>> comments.
>>
>> ```
>> Hi Ran,
>>
>> Thanks for proposing a FLIP. Btw according to the process, the subject
>> of this email should be `[DISCUSS] FLIP-278: Hybrid Source Connector` so
>> that people can identify this discussion as a FLIP discussion.
>>
>> Supporting the hybrid source for SQL was a long-standing issue on our
>> roadmap. Happy to give feedback here:
>>
>> 1) Options
>>
>> Coming up with stable long-term options should be a shared effort.
>> Having an index as a key could cause unintended side effects if the
>> index is not correctly chosen, I would suggest we use IDs instead.
>>
>> What do you think about the following structure?
>>
>> CREATE TABLE ... WITH (
>>    'sources'='historical;realtime',   -- Config option of type string list
>>    'historical.connector' = 'filesystem',
>>    'historical.path' = '/tmp/a.csv',
>>    'historcal.format' = 'csv',
>>    'realtime.path' = '/tmp/b.csv',
>>    'realtime.format' = 'csv'"
>> )
>>
>> I would limit the IDs to simple [a-z0-9_] identifiers. Once we support
>> metadata columns, we can also propagate these IDs easily.
>>
>> 2) Schema field mappings
>>
>> The FLIP mentions `schema-field-mappings` could you elaborate on this in
>> the document?
>>
>> 3) Start position strategies
>>
>> Have you thought about how we can represent start position strategies.
>> The FLIP is very minimal but it would be nice to at least hear some
>> opinions on this topic. Maybe we can come up with some general strategy
>> that makes the most common use case possible in the near future.
>>
>> Thanks,
>> Timo
>> ```
>>
>> --
>> Best Regards,
>> Ran Tao
>> https://github.com/chucheng92
>>
>