You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Ran Tao <ch...@gmail.com> on 2022/12/15 08:02:52 UTC

[DISCUSS] FLIP-278: Hybrid Source Connector

Hi guys. HybridSource is a good feature, but now released version did not
support table & sql api for a long time.

I have wrote a discussed FLIP.
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225

Sorry for my unclear subject of previous email,  so here i have copied the
respond from the Timo and sent this email.  look forward to your comments.

```
Hi Ran,

Thanks for proposing a FLIP. Btw according to the process, the subject
of this email should be `[DISCUSS] FLIP-278: Hybrid Source Connector` so
that people can identify this discussion as a FLIP discussion.

Supporting the hybrid source for SQL was a long-standing issue on our
roadmap. Happy to give feedback here:

1) Options

Coming up with stable long-term options should be a shared effort.
Having an index as a key could cause unintended side effects if the
index is not correctly chosen, I would suggest we use IDs instead.

What do you think about the following structure?

CREATE TABLE ... WITH (
   'sources'='historical;realtime',   -- Config option of type string list
   'historical.connector' = 'filesystem',
   'historical.path' = '/tmp/a.csv',
   'historcal.format' = 'csv',
   'realtime.path' = '/tmp/b.csv',
   'realtime.format' = 'csv'"
)

I would limit the IDs to simple [a-z0-9_] identifiers. Once we support
metadata columns, we can also propagate these IDs easily.

2) Schema field mappings

The FLIP mentions `schema-field-mappings` could you elaborate on this in
the document?

3) Start position strategies

Have you thought about how we can represent start position strategies.
The FLIP is very minimal but it would be nice to at least hear some
opinions on this topic. Maybe we can come up with some general strategy
that makes the most common use case possible in the near future.

Thanks,
Timo
```

-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Ran Tao <ch...@gmail.com>.
Hi, devs.

I don't know if you have any other considerations for this FLIP. All
discussions are welcome.
If there are no other opinions in the near days, I will try to initiate a
vote. thank you all.


Best Regards,
Ran Tao


Ran Tao <ch...@gmail.com> 于2023年4月10日周一 15:33写道:

> Hi, devs. I want to reopen this discussion because some questions have
> been solved or need more discussions.
>
> In the previous discussion, there were some questions and problems.
>
> @Timo
> 1.about option prefix, we decide to use identifiers. e.g.
>
> ```
> create table hybrid_source(
>  f0 varchar,
>  f1 varchar,
>  f2 bigint
> ) with(
>  'connector'='hybrid',
>  'source-identifiers'='historical,realtime',
>  'historical.connector'='filesystem'
>  'historical.path' = '/tmp/a.csv',
>  'historical.format' = 'csv',
>  'realtime.connector'='kafka',
>  'realtime.topic' = 'test',
>  'realtime.properties.bootstrap.servers' = 'localhost:9092',
>  'realtime.properties.group.id' = 'test',
>  'realtime.scan.startup.mode' = 'earliest-offset',
>  'realtime.format' = 'csv'
> );
> ```
>
> @Martijn Visser <ma...@apache.org>
> 1.table api usage
>
> I updated the FLIP about table api usage.
>
> 2.how dynamic switched start timestamp works?
>
> In this FLIP, we introduce 2 interfaces to support it.
> If we open switched-start-position-enabled try to use dynamic switched
> start timestamp, then first source split numerator needs to
> implement SupportsGetEndTimestamp, next source needs to
> implement SupportsSwitchedStartTimestamp.
> We use SupportsGetEndTimestamp and SupportsSwitchedStartTimestamp to get
> the previous bounded source end timestamp and apply it to the next
> streaming source.
>
> @John Roesler
> 1.source handoff
>
> We both support Fixed-Start-Position And Switched-start-Position. The
> default is Fixed-Start-Position. Use option switched-start-position-enabled
> to control it.
> In Fixed-Start-Position, the next streaming source uses its own startup
> strategy, e.g. in kafka, we use predefined kafka scan.startup.mode in user
> sql.
> In Switched-start-Position, this is the same question as `how dynamic
> switched start timestamp works` from @Martijn above. We offer
> SupportsGetEndTimestamp interface to extract first source split enumerator
> endTimestamp
> and pass it to the next source. and Next source uses
> SupportsSwitchedStartTimestamp to apply it.
>
> 2.more child sources
>
> Yes, this is consistent with the hybrid source datastream api, there is no
> limit on the number of children sources.
> e.g. this is a 3 source case below.
>
> ```
> create table hybrid_source(
>  f0 varchar,
>  f1 varchar,
>  f2 bigint
> ) with(
>  'connector'='hybrid',
>  'source-identifiers'='historical01,historical02,realtime',
>  'historical01.connector'='filesystem'
>  'historical01.path' = '/tmp/a.csv',
>  'historical01.format' = 'csv',
>  'historical02.connector'='filesystem'
>  'historical02.path' = '/tmp/a.csv',
>  'historical02.format' = 'csv',
>  'realtime.connector'='kafka',
>  'realtime.topic' = 'test',
>  'realtime.properties.bootstrap.servers' = 'localhost:9092',
>  'realtime.properties.group.id' = 'testGroup',
>  'realtime.scan.startup.mode' = 'earliest-offset',
>  'realtime.format' = 'csv'
> );
> ```
>
> more details can be found at [1] & [2].
> Looking forward to your more concerns and opinions.
>
> 1.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
> 2.https://github.com/apache/flink/pull/21841
>
> Best Regards,
> Ran Tao
>
> Ran Tao <ch...@gmail.com> 于2022年12月15日周四 16:02写道:
>
>> Hi guys. HybridSource is a good feature, but now released version did not
>> support table & sql api for a long time.
>>
>> I have wrote a discussed FLIP.
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
>>
>> Sorry for my unclear subject of previous email,  so here i have copied
>> the respond from the Timo and sent this email.  look forward to your
>> comments.
>>
>> ```
>> Hi Ran,
>>
>> Thanks for proposing a FLIP. Btw according to the process, the subject
>> of this email should be `[DISCUSS] FLIP-278: Hybrid Source Connector` so
>> that people can identify this discussion as a FLIP discussion.
>>
>> Supporting the hybrid source for SQL was a long-standing issue on our
>> roadmap. Happy to give feedback here:
>>
>> 1) Options
>>
>> Coming up with stable long-term options should be a shared effort.
>> Having an index as a key could cause unintended side effects if the
>> index is not correctly chosen, I would suggest we use IDs instead.
>>
>> What do you think about the following structure?
>>
>> CREATE TABLE ... WITH (
>>    'sources'='historical;realtime',   -- Config option of type string list
>>    'historical.connector' = 'filesystem',
>>    'historical.path' = '/tmp/a.csv',
>>    'historcal.format' = 'csv',
>>    'realtime.path' = '/tmp/b.csv',
>>    'realtime.format' = 'csv'"
>> )
>>
>> I would limit the IDs to simple [a-z0-9_] identifiers. Once we support
>> metadata columns, we can also propagate these IDs easily.
>>
>> 2) Schema field mappings
>>
>> The FLIP mentions `schema-field-mappings` could you elaborate on this in
>> the document?
>>
>> 3) Start position strategies
>>
>> Have you thought about how we can represent start position strategies.
>> The FLIP is very minimal but it would be nice to at least hear some
>> opinions on this topic. Maybe we can come up with some general strategy
>> that makes the most common use case possible in the near future.
>>
>> Thanks,
>> Timo
>> ```
>>
>> --
>> Best Regards,
>> Ran Tao
>> https://github.com/chucheng92
>>
>

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Ran Tao <ch...@gmail.com>.
Hi, devs. I want to reopen this discussion because some questions have
been solved or need more discussions.

In the previous discussion, there were some questions and problems.

@Timo
1.about option prefix, we decide to use identifiers. e.g.

```
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'source-identifiers'='historical,realtime',
 'historical.connector'='filesystem'
 'historical.path' = '/tmp/a.csv',
 'historical.format' = 'csv',
 'realtime.connector'='kafka',
 'realtime.topic' = 'test',
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'test',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv'
);
```

@Martijn Visser <ma...@apache.org>
1.table api usage

I updated the FLIP about table api usage.

2.how dynamic switched start timestamp works?

In this FLIP, we introduce 2 interfaces to support it.
If we open switched-start-position-enabled try to use dynamic switched
start timestamp, then first source split numerator needs to
implement SupportsGetEndTimestamp, next source needs to
implement SupportsSwitchedStartTimestamp.
We use SupportsGetEndTimestamp and SupportsSwitchedStartTimestamp to get
the previous bounded source end timestamp and apply it to the next
streaming source.

@John Roesler
1.source handoff

We both support Fixed-Start-Position And Switched-start-Position. The
default is Fixed-Start-Position. Use option switched-start-position-enabled
to control it.
In Fixed-Start-Position, the next streaming source uses its own startup
strategy, e.g. in kafka, we use predefined kafka scan.startup.mode in user
sql.
In Switched-start-Position, this is the same question as `how dynamic
switched start timestamp works` from @Martijn above. We offer
SupportsGetEndTimestamp interface to extract first source split enumerator
endTimestamp
and pass it to the next source. and Next source uses
SupportsSwitchedStartTimestamp to apply it.

2.more child sources

Yes, this is consistent with the hybrid source datastream api, there is no
limit on the number of children sources.
e.g. this is a 3 source case below.

```
create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'source-identifiers'='historical01,historical02,realtime',
 'historical01.connector'='filesystem'
 'historical01.path' = '/tmp/a.csv',
 'historical01.format' = 'csv',
 'historical02.connector'='filesystem'
 'historical02.path' = '/tmp/a.csv',
 'historical02.format' = 'csv',
 'realtime.connector'='kafka',
 'realtime.topic' = 'test',
 'realtime.properties.bootstrap.servers' = 'localhost:9092',
 'realtime.properties.group.id' = 'testGroup',
 'realtime.scan.startup.mode' = 'earliest-offset',
 'realtime.format' = 'csv'
);
```

more details can be found at [1] & [2].
Looking forward to your more concerns and opinions.

1.https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
2.https://github.com/apache/flink/pull/21841

Best Regards,
Ran Tao

Ran Tao <ch...@gmail.com> 于2022年12月15日周四 16:02写道:

> Hi guys. HybridSource is a good feature, but now released version did not
> support table & sql api for a long time.
>
> I have wrote a discussed FLIP.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
>
> Sorry for my unclear subject of previous email,  so here i have copied the
> respond from the Timo and sent this email.  look forward to your comments.
>
> ```
> Hi Ran,
>
> Thanks for proposing a FLIP. Btw according to the process, the subject
> of this email should be `[DISCUSS] FLIP-278: Hybrid Source Connector` so
> that people can identify this discussion as a FLIP discussion.
>
> Supporting the hybrid source for SQL was a long-standing issue on our
> roadmap. Happy to give feedback here:
>
> 1) Options
>
> Coming up with stable long-term options should be a shared effort.
> Having an index as a key could cause unintended side effects if the
> index is not correctly chosen, I would suggest we use IDs instead.
>
> What do you think about the following structure?
>
> CREATE TABLE ... WITH (
>    'sources'='historical;realtime',   -- Config option of type string list
>    'historical.connector' = 'filesystem',
>    'historical.path' = '/tmp/a.csv',
>    'historcal.format' = 'csv',
>    'realtime.path' = '/tmp/b.csv',
>    'realtime.format' = 'csv'"
> )
>
> I would limit the IDs to simple [a-z0-9_] identifiers. Once we support
> metadata columns, we can also propagate these IDs easily.
>
> 2) Schema field mappings
>
> The FLIP mentions `schema-field-mappings` could you elaborate on this in
> the document?
>
> 3) Start position strategies
>
> Have you thought about how we can represent start position strategies.
> The FLIP is very minimal but it would be nice to at least hear some
> opinions on this topic. Maybe we can come up with some general strategy
> that makes the most common use case possible in the near future.
>
> Thanks,
> Timo
> ```
>
> --
> Best Regards,
> Ran Tao
> https://github.com/chucheng92
>

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Ran Tao <ch...@gmail.com>.
Hi, Martijn. i have updated the flip about table api & switched start
timestamp.
thanks.

Martijn Visser <ma...@apache.org> 于2022年12月16日周五 16:59写道:

> Hi Ran,
>
> For completeness, this is a new thread that was already previously started
> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. I'm
> linking them because I think Timo's comments are relevant to be kept with
> this discussion thread.
>
> I agree with Timo's comments from there that having an index key isn't the
> best option, I would rather have an identifier.
>
> I do wonder how this would work when you want to specify sources from a
> catalog: could you elaborate on that?
>
> What I'm also missing in the FLIP is an example of how to specify the
> starting offset from Kafka. In the DataStream API, there
> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
> specify that in the SQL landscape?
>
> Last but not least: your examples are all SQL only. How do you propose that
> this works in the Table API?
>
> Best regards,
>
> Martijn
>
> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <ch...@gmail.com> wrote:
>
> > Fyi.
> >
> > This flip using index as child source option prefix because we may use
> the
> > same connector as hybrid child sources.
> > e.g.
> >
> > create table hybrid_source(
> >  f0 varchar,
> >  f1 varchar,
> >  f2 bigint
> > ) with(
> >  'connector'='hybrid',
> >  'sources'='filesystem,filesystem',
> >  '0.path' = '/tmp/a.csv',
> >  '0.format' = 'csv',
> >  '1.path' = '/tmp/b.csv',
> >  '1.format' = 'csv'"
> > );
> >
> > In this case, we must distinguish the format and path option belonging to
> > which filesystem connector. But as Timo says, it's not clear. He suggest
> > another way like this:
> >
> > CREATE TABLE hybrid_source WITH (
> >    'sources'='historical;realtime',   -- Config option of type string
> list
> >    'historical.connector' = 'filesystem',
> >    'historical.path' = '/tmp/a.csv',
> >    'historcal.format' = 'csv',
> >    'realtime.path' = '/tmp/b.csv',
> >    'realtime.format' = 'csv'"
> > )
> >
> > `sources` option is user-custom name instead of the concrete connector
> > type. And we use this user-custom name as prefix, and using
> > prefix.connector to call concrete connector impl.
> >
>


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Ran Tao <ch...@gmail.com>.
Hi, all. i have updated flip-278[1]. I think all problems or comments has
been addressed.

1.about option prefix, we use identifiers.
2.table api implementation and demo
3.about switched dynamic position (hybrid source use it auto switch from
previous to next source)

More details can be found at draft pr[2], it works well.

1.https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
2.https://github.com/apache/flink/pull/21841

Ran Tao <ch...@gmail.com> 于2022年12月19日周一 16:16写道:

> a mistake,
> childSources.get(sourceIndex).setStartTimetamp(switchedTimestamp);
>
> Ran Tao <ch...@gmail.com> 于2022年12月19日周一 16:10写道:
>
>> Hi, John. thanks for your comments.
>> About question-2 the "handoff" is using for switching next source
>> seamlessly. but it's an option. Not every hybrid source job need to using
>> this mode.
>>
>> The hybrid source sql or table need to implement two ways like DataStream
>> api below.  One for fixed position, user can specify the earliest, latest
>> or specific-offsets etc.
>> And the second way is that user can also specify the timestamp to let
>> second source using timestamp to consume the kafka data (no need to specify
>> earliest, latest or specific-offsets, flink do this conversion).
>>
>>  * <p>A simple example with FileSource and KafkaSource with fixed Kafka
>> start position:
>>  * <pre>{@code
>>  * FileSource<String> fileSource =
>>  *   FileSource.forRecordStreamFormat(new TextLineInputFormat(),
>> Path.fromLocalFile(testDir)).build();
>>  * KafkaSource<String> kafkaSource =
>>  *           KafkaSource.<String>builder()
>>  *                   .setBootstrapServers("localhost:9092")
>>  *                   .setGroupId("MyGroup")
>>  *                   .setTopics(Arrays.asList("quickstart-events"))
>>  *                   .setDeserializer(
>>  *
>> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>  *                   .setStartingOffsets(OffsetsInitializer.earliest())
>>  *                   .build();
>>  * HybridSource<String> hybridSource =
>>  *           HybridSource.builder(fileSource)
>>  *                   .addSource(kafkaSource)
>>  *                   .build();
>>  * }</pre>
>>  *
>>  * <p>A more complex example with Kafka start position derived from
>> previous source:
>>  *
>>  * <pre>{@code
>>  * HybridSource<String> hybridSource =
>>  *     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
>>  *         .addSource(
>>  *             switchContext -> {
>>  *               StaticFileSplitEnumerator previousEnumerator =
>>  *                   switchContext.getPreviousEnumerator();
>>  *               // how to get timestamp depends on specific enumerator
>>  *               long timestamp = previousEnumerator.getEndTimestamp();
>>  *               OffsetsInitializer offsets =
>>  *                   OffsetsInitializer.timestamp(timestamp);
>>  *               KafkaSource<String> kafkaSource =
>>  *                   KafkaSource.<String>builder()
>>  *                       .setBootstrapServers("localhost:9092")
>>  *                       .setGroupId("MyGroup")
>>  *                       .setTopics(Arrays.asList("quickstart-events"))
>>  *                       .setDeserializer(
>>  *
>> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>  *                       .setStartingOffsets(offsets)
>>  *                       .build();
>>  *               return kafkaSource;
>>  *             },
>>  *             Boundedness.CONTINUOUS_UNBOUNDED)
>>  *         .build();
>>  * }
>>
>> Currently flink SplitEnumerator interface not expose the
>> getEndTimestamp().  I think if we want to implement the "handoff" way we
>> need to let SplitEnumerator expose this method.
>> Then the question is if we get the previous endTimestamp, how to set it
>> back?  we can't build KafkaSource instance because hybrid is a common
>> implementation.
>> I think we need add a method for example startTimestamp() in new Source.
>> then we can implement this:
>>
>> Switched-start-position demo:
>>
>> HybridSource.HybridSourceBuilder<RowData, SplitEnumerator> builder =
>>                     HybridSource.builder(childSources.get(0));
>>             for (int i = 1; i < childSources.size(); i++) {
>>                 final int sourceIndex = i;
>>                 Boundedness boundedness =
>> childSources.get(sourceIndex).getBoundedness();
>>                 builder.addSource(
>>                         switchContext -> {
>>                             SplitEnumerator previousEnumerator =
>>
>> switchContext.getPreviousEnumerator();
>>                             // how to pass to kafka or other connector ?
>> We add a method in new
>>                             // source api like startTimestamp();
>>                             long switchedTimestamp =
>> previousEnumerator.getEndTimestamp();
>>                             childSources.setStartTimestamp(
>> switchedTimestamp);
>>                             return childSources.get(sourceIndex);
>>                         },
>>                         boundedness);
>>             }
>>             hybridSource = builder.build();
>>
>> e.g. if kafka is end source. then kafka use this switchedTimestamp to
>> initialize the OffsetsInitializer and consume from this timestamp.
>>
>> The last question whether this source support chaining together more
>> than two sources? absolutely yes. we support more than two sources like
>> DataStream API.
>> I have added ddl example in the flip.
>>
>>
>> John Roesler <vv...@apache.org> 于2022年12月19日周一 11:14写道:
>>
>>> Hello all,
>>>
>>> Thanks for the FLIP, Ran!
>>>
>>> The HybridSource is a really cool feature, and I was glad to see a
>>> proposal to expose it in the Table and SQL APIs.
>>>
>>> My main question is also about the switching control (question 2). It
>>> seems like the existing Kafka connector has all the options you'd want to
>>> define the switching point[1], and the issue is only how to specify a
>>> "handoff" from one source to the next. It seems like you could propose to
>>> add a reference to an extracted field or property of the first source to be
>>> used in the second one.
>>>
>>> However, the more I think about it, the more I wonder whether a
>>> "handoff" operation ought to be necessary. For example, the use case I have
>>> in mind is to bootstrap the table using a snapshot of the data and then
>>> have it seamlessly switch over to consuming all the records since that
>>> snapshot. In order to support this use case with no loss or duplicates,
>>> timestamp isn't sufficient; I'd need to know the exact vector of offsets
>>> represented in that snapshot. Then again, if I control the snapshotting
>>> process, this should be trivial to compute and store next to the snapshots.
>>>
>>> Further, when I register the table, I ought to know which exact snapshot
>>> I'm selecting, and therefore can just populate the `specific-offsets` as
>>> desired. Backing off to timestamp, if I again am naming a path to a
>>> specific snapshot of the data, it seems like I have enough information
>>> already to also specify the correct `timestamp` option.
>>>
>>> With this in mind, my question is whether it's necessary to specify some
>>> kind of dynamic property, like the DataStream API does[2]. If a fixed
>>> property is sufficient, it seems like the current proposal is actually
>>> sufficient as well. I think I just don't see the use case for dynamic
>>> configuration here.
>>>
>>> Side question, out of curiosity: would this source support chaining
>>> together more than two sources? It seems like the proposed syntax would
>>> allow it. It seems like some snapshot-rollup strategies could benefit from
>>> it (eg if you want to combine your 2021 yearly rollup with your Jan-Nov
>>> monthly rollups, then you first two weekly rollups from Dec, and finally
>>> switch over to live data from Kafka or something).
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time
>>>
>>> Thanks again,
>>> -John
>>>
>>> On Fri, Dec 16, 2022, at 06:20, Ran Tao wrote:
>>> > Hi, Martijn, thanks for your comments.
>>> >
>>> > Using identifier as child source prefix may be a good way instead of
>>> index.
>>> > i will update the flip to illustrate how we can read from hybrid
>>> schema to
>>> > generate child schemas for the question1.
>>> >
>>> > question2 is start position for the next kafka source.  But currently
>>> we
>>> > can not get the end timestamp for the first bounded source.  In the
>>> > datastream api end timestamp can be found from previous enumerator. We
>>> need
>>> > to offer bounded source(e.g. filesystem) end timestamp support.
>>> > if we can get end timestamp then kafka will start from this offset. I
>>> think
>>> > here we need a option, allow user to start next kafka source from
>>> previous
>>> > one automatically or from user custom start offset (by using with
>>> option in
>>> > sql ddl).  Not every second source need binding will previous one, for
>>> > example, the next source is already a file, then it not need a start
>>> > position.
>>> >
>>> > question3 about table api, i haven't added to flip yet. I will try to
>>> fix
>>> > some current  issues and update the flip and add  more details.
>>> Thanks for
>>> > your comments.
>>> >
>>> >
>>> > Martijn Visser <ma...@apache.org> 于2022年12月16日周五 16:59写道:
>>> >
>>> >> Hi Ran,
>>> >>
>>> >> For completeness, this is a new thread that was already previously
>>> started
>>> >> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq.
>>> I'm
>>> >> linking them because I think Timo's comments are relevant to be kept
>>> with
>>> >> this discussion thread.
>>> >>
>>> >> I agree with Timo's comments from there that having an index key
>>> isn't the
>>> >> best option, I would rather have an identifier.
>>> >>
>>> >> I do wonder how this would work when you want to specify sources from
>>> a
>>> >> catalog: could you elaborate on that?
>>> >>
>>> >> What I'm also missing in the FLIP is an example of how to specify the
>>> >> starting offset from Kafka. In the DataStream API, there
>>> >> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
>>> >> specify that in the SQL landscape?
>>> >>
>>> >> Last but not least: your examples are all SQL only. How do you
>>> propose that
>>> >> this works in the Table API?
>>> >>
>>> >> Best regards,
>>> >>
>>> >> Martijn
>>> >>
>>> >> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <ch...@gmail.com>
>>> wrote:
>>> >>
>>> >> > Fyi.
>>> >> >
>>> >> > This flip using index as child source option prefix because we may
>>> use
>>> >> the
>>> >> > same connector as hybrid child sources.
>>> >> > e.g.
>>> >> >
>>> >> > create table hybrid_source(
>>> >> >  f0 varchar,
>>> >> >  f1 varchar,
>>> >> >  f2 bigint
>>> >> > ) with(
>>> >> >  'connector'='hybrid',
>>> >> >  'sources'='filesystem,filesystem',
>>> >> >  '0.path' = '/tmp/a.csv',
>>> >> >  '0.format' = 'csv',
>>> >> >  '1.path' = '/tmp/b.csv',
>>> >> >  '1.format' = 'csv'"
>>> >> > );
>>> >> >
>>> >> > In this case, we must distinguish the format and path option
>>> belonging to
>>> >> > which filesystem connector. But as Timo says, it's not clear. He
>>> suggest
>>> >> > another way like this:
>>> >> >
>>> >> > CREATE TABLE hybrid_source WITH (
>>> >> >    'sources'='historical;realtime',   -- Config option of type
>>> string
>>> >> list
>>> >> >    'historical.connector' = 'filesystem',
>>> >> >    'historical.path' = '/tmp/a.csv',
>>> >> >    'historcal.format' = 'csv',
>>> >> >    'realtime.path' = '/tmp/b.csv',
>>> >> >    'realtime.format' = 'csv'"
>>> >> > )
>>> >> >
>>> >> > `sources` option is user-custom name instead of the concrete
>>> connector
>>> >> > type. And we use this user-custom name as prefix, and using
>>> >> > prefix.connector to call concrete connector impl.
>>> >> >
>>> >>
>>> >
>>> >
>>> > --
>>> > Best Regards,
>>> > Ran Tao
>>> > https://github.com/chucheng92
>>>
>>
>>
>> --
>> Best Regards,
>> Ran Tao
>> https://github.com/chucheng92
>>
>
>
> --
> Best Regards,
> Ran Tao
> https://github.com/chucheng92
>


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Ran Tao <ch...@gmail.com>.
a mistake,
childSources.get(sourceIndex).setStartTimetamp(switchedTimestamp);

Ran Tao <ch...@gmail.com> 于2022年12月19日周一 16:10写道:

> Hi, John. thanks for your comments.
> About question-2 the "handoff" is using for switching next source
> seamlessly. but it's an option. Not every hybrid source job need to using
> this mode.
>
> The hybrid source sql or table need to implement two ways like DataStream
> api below.  One for fixed position, user can specify the earliest, latest
> or specific-offsets etc.
> And the second way is that user can also specify the timestamp to let
> second source using timestamp to consume the kafka data (no need to specify
> earliest, latest or specific-offsets, flink do this conversion).
>
>  * <p>A simple example with FileSource and KafkaSource with fixed Kafka
> start position:
>  * <pre>{@code
>  * FileSource<String> fileSource =
>  *   FileSource.forRecordStreamFormat(new TextLineInputFormat(),
> Path.fromLocalFile(testDir)).build();
>  * KafkaSource<String> kafkaSource =
>  *           KafkaSource.<String>builder()
>  *                   .setBootstrapServers("localhost:9092")
>  *                   .setGroupId("MyGroup")
>  *                   .setTopics(Arrays.asList("quickstart-events"))
>  *                   .setDeserializer(
>  *
> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>  *                   .setStartingOffsets(OffsetsInitializer.earliest())
>  *                   .build();
>  * HybridSource<String> hybridSource =
>  *           HybridSource.builder(fileSource)
>  *                   .addSource(kafkaSource)
>  *                   .build();
>  * }</pre>
>  *
>  * <p>A more complex example with Kafka start position derived from
> previous source:
>  *
>  * <pre>{@code
>  * HybridSource<String> hybridSource =
>  *     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
>  *         .addSource(
>  *             switchContext -> {
>  *               StaticFileSplitEnumerator previousEnumerator =
>  *                   switchContext.getPreviousEnumerator();
>  *               // how to get timestamp depends on specific enumerator
>  *               long timestamp = previousEnumerator.getEndTimestamp();
>  *               OffsetsInitializer offsets =
>  *                   OffsetsInitializer.timestamp(timestamp);
>  *               KafkaSource<String> kafkaSource =
>  *                   KafkaSource.<String>builder()
>  *                       .setBootstrapServers("localhost:9092")
>  *                       .setGroupId("MyGroup")
>  *                       .setTopics(Arrays.asList("quickstart-events"))
>  *                       .setDeserializer(
>  *
> KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>  *                       .setStartingOffsets(offsets)
>  *                       .build();
>  *               return kafkaSource;
>  *             },
>  *             Boundedness.CONTINUOUS_UNBOUNDED)
>  *         .build();
>  * }
>
> Currently flink SplitEnumerator interface not expose the
> getEndTimestamp().  I think if we want to implement the "handoff" way we
> need to let SplitEnumerator expose this method.
> Then the question is if we get the previous endTimestamp, how to set it
> back?  we can't build KafkaSource instance because hybrid is a common
> implementation.
> I think we need add a method for example startTimestamp() in new Source.
> then we can implement this:
>
> Switched-start-position demo:
>
> HybridSource.HybridSourceBuilder<RowData, SplitEnumerator> builder =
>                     HybridSource.builder(childSources.get(0));
>             for (int i = 1; i < childSources.size(); i++) {
>                 final int sourceIndex = i;
>                 Boundedness boundedness =
> childSources.get(sourceIndex).getBoundedness();
>                 builder.addSource(
>                         switchContext -> {
>                             SplitEnumerator previousEnumerator =
>                                     switchContext.getPreviousEnumerator();
>                             // how to pass to kafka or other connector ?
> We add a method in new
>                             // source api like startTimestamp();
>                             long switchedTimestamp =
> previousEnumerator.getEndTimestamp();
>                             childSources.setStartTimestamp(
> switchedTimestamp);
>                             return childSources.get(sourceIndex);
>                         },
>                         boundedness);
>             }
>             hybridSource = builder.build();
>
> e.g. if kafka is end source. then kafka use this switchedTimestamp to
> initialize the OffsetsInitializer and consume from this timestamp.
>
> The last question whether this source support chaining together more than
> two sources? absolutely yes. we support more than two sources like
> DataStream API.
> I have added ddl example in the flip.
>
>
> John Roesler <vv...@apache.org> 于2022年12月19日周一 11:14写道:
>
>> Hello all,
>>
>> Thanks for the FLIP, Ran!
>>
>> The HybridSource is a really cool feature, and I was glad to see a
>> proposal to expose it in the Table and SQL APIs.
>>
>> My main question is also about the switching control (question 2). It
>> seems like the existing Kafka connector has all the options you'd want to
>> define the switching point[1], and the issue is only how to specify a
>> "handoff" from one source to the next. It seems like you could propose to
>> add a reference to an extracted field or property of the first source to be
>> used in the second one.
>>
>> However, the more I think about it, the more I wonder whether a "handoff"
>> operation ought to be necessary. For example, the use case I have in mind
>> is to bootstrap the table using a snapshot of the data and then have it
>> seamlessly switch over to consuming all the records since that snapshot. In
>> order to support this use case with no loss or duplicates, timestamp isn't
>> sufficient; I'd need to know the exact vector of offsets represented in
>> that snapshot. Then again, if I control the snapshotting process, this
>> should be trivial to compute and store next to the snapshots.
>>
>> Further, when I register the table, I ought to know which exact snapshot
>> I'm selecting, and therefore can just populate the `specific-offsets` as
>> desired. Backing off to timestamp, if I again am naming a path to a
>> specific snapshot of the data, it seems like I have enough information
>> already to also specify the correct `timestamp` option.
>>
>> With this in mind, my question is whether it's necessary to specify some
>> kind of dynamic property, like the DataStream API does[2]. If a fixed
>> property is sufficient, it seems like the current proposal is actually
>> sufficient as well. I think I just don't see the use case for dynamic
>> configuration here.
>>
>> Side question, out of curiosity: would this source support chaining
>> together more than two sources? It seems like the proposed syntax would
>> allow it. It seems like some snapshot-rollup strategies could benefit from
>> it (eg if you want to combine your 2021 yearly rollup with your Jan-Nov
>> monthly rollups, then you first two weekly rollups from Dec, and finally
>> switch over to live data from Kafka or something).
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time
>>
>> Thanks again,
>> -John
>>
>> On Fri, Dec 16, 2022, at 06:20, Ran Tao wrote:
>> > Hi, Martijn, thanks for your comments.
>> >
>> > Using identifier as child source prefix may be a good way instead of
>> index.
>> > i will update the flip to illustrate how we can read from hybrid schema
>> to
>> > generate child schemas for the question1.
>> >
>> > question2 is start position for the next kafka source.  But currently we
>> > can not get the end timestamp for the first bounded source.  In the
>> > datastream api end timestamp can be found from previous enumerator. We
>> need
>> > to offer bounded source(e.g. filesystem) end timestamp support.
>> > if we can get end timestamp then kafka will start from this offset. I
>> think
>> > here we need a option, allow user to start next kafka source from
>> previous
>> > one automatically or from user custom start offset (by using with
>> option in
>> > sql ddl).  Not every second source need binding will previous one, for
>> > example, the next source is already a file, then it not need a start
>> > position.
>> >
>> > question3 about table api, i haven't added to flip yet. I will try to
>> fix
>> > some current  issues and update the flip and add  more details.  Thanks
>> for
>> > your comments.
>> >
>> >
>> > Martijn Visser <ma...@apache.org> 于2022年12月16日周五 16:59写道:
>> >
>> >> Hi Ran,
>> >>
>> >> For completeness, this is a new thread that was already previously
>> started
>> >> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq.
>> I'm
>> >> linking them because I think Timo's comments are relevant to be kept
>> with
>> >> this discussion thread.
>> >>
>> >> I agree with Timo's comments from there that having an index key isn't
>> the
>> >> best option, I would rather have an identifier.
>> >>
>> >> I do wonder how this would work when you want to specify sources from a
>> >> catalog: could you elaborate on that?
>> >>
>> >> What I'm also missing in the FLIP is an example of how to specify the
>> >> starting offset from Kafka. In the DataStream API, there
>> >> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
>> >> specify that in the SQL landscape?
>> >>
>> >> Last but not least: your examples are all SQL only. How do you propose
>> that
>> >> this works in the Table API?
>> >>
>> >> Best regards,
>> >>
>> >> Martijn
>> >>
>> >> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <ch...@gmail.com> wrote:
>> >>
>> >> > Fyi.
>> >> >
>> >> > This flip using index as child source option prefix because we may
>> use
>> >> the
>> >> > same connector as hybrid child sources.
>> >> > e.g.
>> >> >
>> >> > create table hybrid_source(
>> >> >  f0 varchar,
>> >> >  f1 varchar,
>> >> >  f2 bigint
>> >> > ) with(
>> >> >  'connector'='hybrid',
>> >> >  'sources'='filesystem,filesystem',
>> >> >  '0.path' = '/tmp/a.csv',
>> >> >  '0.format' = 'csv',
>> >> >  '1.path' = '/tmp/b.csv',
>> >> >  '1.format' = 'csv'"
>> >> > );
>> >> >
>> >> > In this case, we must distinguish the format and path option
>> belonging to
>> >> > which filesystem connector. But as Timo says, it's not clear. He
>> suggest
>> >> > another way like this:
>> >> >
>> >> > CREATE TABLE hybrid_source WITH (
>> >> >    'sources'='historical;realtime',   -- Config option of type string
>> >> list
>> >> >    'historical.connector' = 'filesystem',
>> >> >    'historical.path' = '/tmp/a.csv',
>> >> >    'historcal.format' = 'csv',
>> >> >    'realtime.path' = '/tmp/b.csv',
>> >> >    'realtime.format' = 'csv'"
>> >> > )
>> >> >
>> >> > `sources` option is user-custom name instead of the concrete
>> connector
>> >> > type. And we use this user-custom name as prefix, and using
>> >> > prefix.connector to call concrete connector impl.
>> >> >
>> >>
>> >
>> >
>> > --
>> > Best Regards,
>> > Ran Tao
>> > https://github.com/chucheng92
>>
>
>
> --
> Best Regards,
> Ran Tao
> https://github.com/chucheng92
>


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Ran Tao <ch...@gmail.com>.
Hi, John. thanks for your comments.
About question-2 the "handoff" is using for switching next source
seamlessly. but it's an option. Not every hybrid source job need to using
this mode.

The hybrid source sql or table need to implement two ways like DataStream
api below.  One for fixed position, user can specify the earliest, latest
or specific-offsets etc.
And the second way is that user can also specify the timestamp to let
second source using timestamp to consume the kafka data (no need to specify
earliest, latest or specific-offsets, flink do this conversion).

 * <p>A simple example with FileSource and KafkaSource with fixed Kafka
start position:
 * <pre>{@code
 * FileSource<String> fileSource =
 *   FileSource.forRecordStreamFormat(new TextLineInputFormat(),
Path.fromLocalFile(testDir)).build();
 * KafkaSource<String> kafkaSource =
 *           KafkaSource.<String>builder()
 *                   .setBootstrapServers("localhost:9092")
 *                   .setGroupId("MyGroup")
 *                   .setTopics(Arrays.asList("quickstart-events"))
 *                   .setDeserializer(
 *
KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
 *                   .setStartingOffsets(OffsetsInitializer.earliest())
 *                   .build();
 * HybridSource<String> hybridSource =
 *           HybridSource.builder(fileSource)
 *                   .addSource(kafkaSource)
 *                   .build();
 * }</pre>
 *
 * <p>A more complex example with Kafka start position derived from
previous source:
 *
 * <pre>{@code
 * HybridSource<String> hybridSource =
 *     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
 *         .addSource(
 *             switchContext -> {
 *               StaticFileSplitEnumerator previousEnumerator =
 *                   switchContext.getPreviousEnumerator();
 *               // how to get timestamp depends on specific enumerator
 *               long timestamp = previousEnumerator.getEndTimestamp();
 *               OffsetsInitializer offsets =
 *                   OffsetsInitializer.timestamp(timestamp);
 *               KafkaSource<String> kafkaSource =
 *                   KafkaSource.<String>builder()
 *                       .setBootstrapServers("localhost:9092")
 *                       .setGroupId("MyGroup")
 *                       .setTopics(Arrays.asList("quickstart-events"))
 *                       .setDeserializer(
 *
KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
 *                       .setStartingOffsets(offsets)
 *                       .build();
 *               return kafkaSource;
 *             },
 *             Boundedness.CONTINUOUS_UNBOUNDED)
 *         .build();
 * }

Currently flink SplitEnumerator interface not expose the
getEndTimestamp().  I think if we want to implement the "handoff" way we
need to let SplitEnumerator expose this method.
Then the question is if we get the previous endTimestamp, how to set it
back?  we can't build KafkaSource instance because hybrid is a common
implementation.
I think we need add a method for example startTimestamp() in new Source.
then we can implement this:

Switched-start-position demo:

HybridSource.HybridSourceBuilder<RowData, SplitEnumerator> builder =
                    HybridSource.builder(childSources.get(0));
            for (int i = 1; i < childSources.size(); i++) {
                final int sourceIndex = i;
                Boundedness boundedness =
childSources.get(sourceIndex).getBoundedness();
                builder.addSource(
                        switchContext -> {
                            SplitEnumerator previousEnumerator =
                                    switchContext.getPreviousEnumerator();
                            // how to pass to kafka or other connector ? We
add a method in new
                            // source api like startTimestamp();
                            long switchedTimestamp =
previousEnumerator.getEndTimestamp();
                            childSources.setStartTimestamp(switchedTimestamp
);
                            return childSources.get(sourceIndex);
                        },
                        boundedness);
            }
            hybridSource = builder.build();

e.g. if kafka is end source. then kafka use this switchedTimestamp to
initialize the OffsetsInitializer and consume from this timestamp.

The last question whether this source support chaining together more than
two sources? absolutely yes. we support more than two sources like
DataStream API.
I have added ddl example in the flip.


John Roesler <vv...@apache.org> 于2022年12月19日周一 11:14写道:

> Hello all,
>
> Thanks for the FLIP, Ran!
>
> The HybridSource is a really cool feature, and I was glad to see a
> proposal to expose it in the Table and SQL APIs.
>
> My main question is also about the switching control (question 2). It
> seems like the existing Kafka connector has all the options you'd want to
> define the switching point[1], and the issue is only how to specify a
> "handoff" from one source to the next. It seems like you could propose to
> add a reference to an extracted field or property of the first source to be
> used in the second one.
>
> However, the more I think about it, the more I wonder whether a "handoff"
> operation ought to be necessary. For example, the use case I have in mind
> is to bootstrap the table using a snapshot of the data and then have it
> seamlessly switch over to consuming all the records since that snapshot. In
> order to support this use case with no loss or duplicates, timestamp isn't
> sufficient; I'd need to know the exact vector of offsets represented in
> that snapshot. Then again, if I control the snapshotting process, this
> should be trivial to compute and store next to the snapshots.
>
> Further, when I register the table, I ought to know which exact snapshot
> I'm selecting, and therefore can just populate the `specific-offsets` as
> desired. Backing off to timestamp, if I again am naming a path to a
> specific snapshot of the data, it seems like I have enough information
> already to also specify the correct `timestamp` option.
>
> With this in mind, my question is whether it's necessary to specify some
> kind of dynamic property, like the DataStream API does[2]. If a fixed
> property is sufficient, it seems like the current proposal is actually
> sufficient as well. I think I just don't see the use case for dynamic
> configuration here.
>
> Side question, out of curiosity: would this source support chaining
> together more than two sources? It seems like the proposed syntax would
> allow it. It seems like some snapshot-rollup strategies could benefit from
> it (eg if you want to combine your 2021 yearly rollup with your Jan-Nov
> monthly rollups, then you first two weekly rollups from Dec, and finally
> switch over to live data from Kafka or something).
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time
>
> Thanks again,
> -John
>
> On Fri, Dec 16, 2022, at 06:20, Ran Tao wrote:
> > Hi, Martijn, thanks for your comments.
> >
> > Using identifier as child source prefix may be a good way instead of
> index.
> > i will update the flip to illustrate how we can read from hybrid schema
> to
> > generate child schemas for the question1.
> >
> > question2 is start position for the next kafka source.  But currently we
> > can not get the end timestamp for the first bounded source.  In the
> > datastream api end timestamp can be found from previous enumerator. We
> need
> > to offer bounded source(e.g. filesystem) end timestamp support.
> > if we can get end timestamp then kafka will start from this offset. I
> think
> > here we need a option, allow user to start next kafka source from
> previous
> > one automatically or from user custom start offset (by using with option
> in
> > sql ddl).  Not every second source need binding will previous one, for
> > example, the next source is already a file, then it not need a start
> > position.
> >
> > question3 about table api, i haven't added to flip yet. I will try to fix
> > some current  issues and update the flip and add  more details.  Thanks
> for
> > your comments.
> >
> >
> > Martijn Visser <ma...@apache.org> 于2022年12月16日周五 16:59写道:
> >
> >> Hi Ran,
> >>
> >> For completeness, this is a new thread that was already previously
> started
> >> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq.
> I'm
> >> linking them because I think Timo's comments are relevant to be kept
> with
> >> this discussion thread.
> >>
> >> I agree with Timo's comments from there that having an index key isn't
> the
> >> best option, I would rather have an identifier.
> >>
> >> I do wonder how this would work when you want to specify sources from a
> >> catalog: could you elaborate on that?
> >>
> >> What I'm also missing in the FLIP is an example of how to specify the
> >> starting offset from Kafka. In the DataStream API, there
> >> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
> >> specify that in the SQL landscape?
> >>
> >> Last but not least: your examples are all SQL only. How do you propose
> that
> >> this works in the Table API?
> >>
> >> Best regards,
> >>
> >> Martijn
> >>
> >> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <ch...@gmail.com> wrote:
> >>
> >> > Fyi.
> >> >
> >> > This flip using index as child source option prefix because we may use
> >> the
> >> > same connector as hybrid child sources.
> >> > e.g.
> >> >
> >> > create table hybrid_source(
> >> >  f0 varchar,
> >> >  f1 varchar,
> >> >  f2 bigint
> >> > ) with(
> >> >  'connector'='hybrid',
> >> >  'sources'='filesystem,filesystem',
> >> >  '0.path' = '/tmp/a.csv',
> >> >  '0.format' = 'csv',
> >> >  '1.path' = '/tmp/b.csv',
> >> >  '1.format' = 'csv'"
> >> > );
> >> >
> >> > In this case, we must distinguish the format and path option
> belonging to
> >> > which filesystem connector. But as Timo says, it's not clear. He
> suggest
> >> > another way like this:
> >> >
> >> > CREATE TABLE hybrid_source WITH (
> >> >    'sources'='historical;realtime',   -- Config option of type string
> >> list
> >> >    'historical.connector' = 'filesystem',
> >> >    'historical.path' = '/tmp/a.csv',
> >> >    'historcal.format' = 'csv',
> >> >    'realtime.path' = '/tmp/b.csv',
> >> >    'realtime.format' = 'csv'"
> >> > )
> >> >
> >> > `sources` option is user-custom name instead of the concrete connector
> >> > type. And we use this user-custom name as prefix, and using
> >> > prefix.connector to call concrete connector impl.
> >> >
> >>
> >
> >
> > --
> > Best Regards,
> > Ran Tao
> > https://github.com/chucheng92
>


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by John Roesler <vv...@apache.org>.
Hello all,

Thanks for the FLIP, Ran!

The HybridSource is a really cool feature, and I was glad to see a proposal to expose it in the Table and SQL APIs.

My main question is also about the switching control (question 2). It seems like the existing Kafka connector has all the options you'd want to define the switching point[1], and the issue is only how to specify a "handoff" from one source to the next. It seems like you could propose to add a reference to an extracted field or property of the first source to be used in the second one.

However, the more I think about it, the more I wonder whether a "handoff" operation ought to be necessary. For example, the use case I have in mind is to bootstrap the table using a snapshot of the data and then have it seamlessly switch over to consuming all the records since that snapshot. In order to support this use case with no loss or duplicates, timestamp isn't sufficient; I'd need to know the exact vector of offsets represented in that snapshot. Then again, if I control the snapshotting process, this should be trivial to compute and store next to the snapshots.

Further, when I register the table, I ought to know which exact snapshot I'm selecting, and therefore can just populate the `specific-offsets` as desired. Backing off to timestamp, if I again am naming a path to a specific snapshot of the data, it seems like I have enough information already to also specify the correct `timestamp` option.

With this in mind, my question is whether it's necessary to specify some kind of dynamic property, like the DataStream API does[2]. If a fixed property is sufficient, it seems like the current proposal is actually sufficient as well. I think I just don't see the use case for dynamic configuration here.

Side question, out of curiosity: would this source support chaining together more than two sources? It seems like the proposed syntax would allow it. It seems like some snapshot-rollup strategies could benefit from it (eg if you want to combine your 2021 yearly rollup with your Jan-Nov monthly rollups, then you first two weekly rollups from Dec, and finally switch over to live data from Kafka or something).

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time

Thanks again,
-John

On Fri, Dec 16, 2022, at 06:20, Ran Tao wrote:
> Hi, Martijn, thanks for your comments.
>
> Using identifier as child source prefix may be a good way instead of index.
> i will update the flip to illustrate how we can read from hybrid schema to
> generate child schemas for the question1.
>
> question2 is start position for the next kafka source.  But currently we
> can not get the end timestamp for the first bounded source.  In the
> datastream api end timestamp can be found from previous enumerator. We need
> to offer bounded source(e.g. filesystem) end timestamp support.
> if we can get end timestamp then kafka will start from this offset. I think
> here we need a option, allow user to start next kafka source from previous
> one automatically or from user custom start offset (by using with option in
> sql ddl).  Not every second source need binding will previous one, for
> example, the next source is already a file, then it not need a start
> position.
>
> question3 about table api, i haven't added to flip yet. I will try to fix
> some current  issues and update the flip and add  more details.  Thanks for
> your comments.
>
>
> Martijn Visser <ma...@apache.org> 于2022年12月16日周五 16:59写道:
>
>> Hi Ran,
>>
>> For completeness, this is a new thread that was already previously started
>> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. I'm
>> linking them because I think Timo's comments are relevant to be kept with
>> this discussion thread.
>>
>> I agree with Timo's comments from there that having an index key isn't the
>> best option, I would rather have an identifier.
>>
>> I do wonder how this would work when you want to specify sources from a
>> catalog: could you elaborate on that?
>>
>> What I'm also missing in the FLIP is an example of how to specify the
>> starting offset from Kafka. In the DataStream API, there
>> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
>> specify that in the SQL landscape?
>>
>> Last but not least: your examples are all SQL only. How do you propose that
>> this works in the Table API?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <ch...@gmail.com> wrote:
>>
>> > Fyi.
>> >
>> > This flip using index as child source option prefix because we may use
>> the
>> > same connector as hybrid child sources.
>> > e.g.
>> >
>> > create table hybrid_source(
>> >  f0 varchar,
>> >  f1 varchar,
>> >  f2 bigint
>> > ) with(
>> >  'connector'='hybrid',
>> >  'sources'='filesystem,filesystem',
>> >  '0.path' = '/tmp/a.csv',
>> >  '0.format' = 'csv',
>> >  '1.path' = '/tmp/b.csv',
>> >  '1.format' = 'csv'"
>> > );
>> >
>> > In this case, we must distinguish the format and path option belonging to
>> > which filesystem connector. But as Timo says, it's not clear. He suggest
>> > another way like this:
>> >
>> > CREATE TABLE hybrid_source WITH (
>> >    'sources'='historical;realtime',   -- Config option of type string
>> list
>> >    'historical.connector' = 'filesystem',
>> >    'historical.path' = '/tmp/a.csv',
>> >    'historcal.format' = 'csv',
>> >    'realtime.path' = '/tmp/b.csv',
>> >    'realtime.format' = 'csv'"
>> > )
>> >
>> > `sources` option is user-custom name instead of the concrete connector
>> > type. And we use this user-custom name as prefix, and using
>> > prefix.connector to call concrete connector impl.
>> >
>>
>
>
> -- 
> Best Regards,
> Ran Tao
> https://github.com/chucheng92

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Ran Tao <ch...@gmail.com>.
Hi, Martijn, thanks for your comments.

Using identifier as child source prefix may be a good way instead of index.
i will update the flip to illustrate how we can read from hybrid schema to
generate child schemas for the question1.

question2 is start position for the next kafka source.  But currently we
can not get the end timestamp for the first bounded source.  In the
datastream api end timestamp can be found from previous enumerator. We need
to offer bounded source(e.g. filesystem) end timestamp support.
if we can get end timestamp then kafka will start from this offset. I think
here we need a option, allow user to start next kafka source from previous
one automatically or from user custom start offset (by using with option in
sql ddl).  Not every second source need binding will previous one, for
example, the next source is already a file, then it not need a start
position.

question3 about table api, i haven't added to flip yet. I will try to fix
some current  issues and update the flip and add  more details.  Thanks for
your comments.


Martijn Visser <ma...@apache.org> 于2022年12月16日周五 16:59写道:

> Hi Ran,
>
> For completeness, this is a new thread that was already previously started
> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. I'm
> linking them because I think Timo's comments are relevant to be kept with
> this discussion thread.
>
> I agree with Timo's comments from there that having an index key isn't the
> best option, I would rather have an identifier.
>
> I do wonder how this would work when you want to specify sources from a
> catalog: could you elaborate on that?
>
> What I'm also missing in the FLIP is an example of how to specify the
> starting offset from Kafka. In the DataStream API, there
> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
> specify that in the SQL landscape?
>
> Last but not least: your examples are all SQL only. How do you propose that
> this works in the Table API?
>
> Best regards,
>
> Martijn
>
> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <ch...@gmail.com> wrote:
>
> > Fyi.
> >
> > This flip using index as child source option prefix because we may use
> the
> > same connector as hybrid child sources.
> > e.g.
> >
> > create table hybrid_source(
> >  f0 varchar,
> >  f1 varchar,
> >  f2 bigint
> > ) with(
> >  'connector'='hybrid',
> >  'sources'='filesystem,filesystem',
> >  '0.path' = '/tmp/a.csv',
> >  '0.format' = 'csv',
> >  '1.path' = '/tmp/b.csv',
> >  '1.format' = 'csv'"
> > );
> >
> > In this case, we must distinguish the format and path option belonging to
> > which filesystem connector. But as Timo says, it's not clear. He suggest
> > another way like this:
> >
> > CREATE TABLE hybrid_source WITH (
> >    'sources'='historical;realtime',   -- Config option of type string
> list
> >    'historical.connector' = 'filesystem',
> >    'historical.path' = '/tmp/a.csv',
> >    'historcal.format' = 'csv',
> >    'realtime.path' = '/tmp/b.csv',
> >    'realtime.format' = 'csv'"
> > )
> >
> > `sources` option is user-custom name instead of the concrete connector
> > type. And we use this user-custom name as prefix, and using
> > prefix.connector to call concrete connector impl.
> >
>


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Martijn Visser <ma...@apache.org>.
Hi Ran,

For completeness, this is a new thread that was already previously started
at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. I'm
linking them because I think Timo's comments are relevant to be kept with
this discussion thread.

I agree with Timo's comments from there that having an index key isn't the
best option, I would rather have an identifier.

I do wonder how this would work when you want to specify sources from a
catalog: could you elaborate on that?

What I'm also missing in the FLIP is an example of how to specify the
starting offset from Kafka. In the DataStream API, there
is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
specify that in the SQL landscape?

Last but not least: your examples are all SQL only. How do you propose that
this works in the Table API?

Best regards,

Martijn

On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <ch...@gmail.com> wrote:

> Fyi.
>
> This flip using index as child source option prefix because we may use the
> same connector as hybrid child sources.
> e.g.
>
> create table hybrid_source(
>  f0 varchar,
>  f1 varchar,
>  f2 bigint
> ) with(
>  'connector'='hybrid',
>  'sources'='filesystem,filesystem',
>  '0.path' = '/tmp/a.csv',
>  '0.format' = 'csv',
>  '1.path' = '/tmp/b.csv',
>  '1.format' = 'csv'"
> );
>
> In this case, we must distinguish the format and path option belonging to
> which filesystem connector. But as Timo says, it's not clear. He suggest
> another way like this:
>
> CREATE TABLE hybrid_source WITH (
>    'sources'='historical;realtime',   -- Config option of type string list
>    'historical.connector' = 'filesystem',
>    'historical.path' = '/tmp/a.csv',
>    'historcal.format' = 'csv',
>    'realtime.path' = '/tmp/b.csv',
>    'realtime.format' = 'csv'"
> )
>
> `sources` option is user-custom name instead of the concrete connector
> type. And we use this user-custom name as prefix, and using
> prefix.connector to call concrete connector impl.
>

Re: [DISCUSS] FLIP-278: Hybrid Source Connector

Posted by Ran Tao <ch...@gmail.com>.
Fyi.

This flip using index as child source option prefix because we may use the
same connector as hybrid child sources.
e.g.

create table hybrid_source(
 f0 varchar,
 f1 varchar,
 f2 bigint
) with(
 'connector'='hybrid',
 'sources'='filesystem,filesystem',
 '0.path' = '/tmp/a.csv',
 '0.format' = 'csv',
 '1.path' = '/tmp/b.csv',
 '1.format' = 'csv'"
);

In this case, we must distinguish the format and path option belonging to
which filesystem connector. But as Timo says, it's not clear. He suggest
another way like this:

CREATE TABLE hybrid_source WITH (
   'sources'='historical;realtime',   -- Config option of type string list
   'historical.connector' = 'filesystem',
   'historical.path' = '/tmp/a.csv',
   'historcal.format' = 'csv',
   'realtime.path' = '/tmp/b.csv',
   'realtime.format' = 'csv'"
)

`sources` option is user-custom name instead of the concrete connector
type. And we use this user-custom name as prefix, and using
prefix.connector to call concrete connector impl.