You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Shammon FY <zj...@gmail.com> on 2022/12/08 08:11:07 UTC

[DISCUSSION] Scan mode in Table Store for Flink Stream and Batch job

Hi devs:

I'm an engineer from ByteDance, and here I'd link to discuss "Scan mode in
Table Store for Flink Stream and Batch job".

Users can execute Flink Steam and Batch jobs on Table Store. In Table Store
0.2 there're two items which determine how the Stream and Batch jobs'
sources read data: StartupMode and config in Options.
1. StartupMode
  a) DEFAULT. Determines actual startup mode according to other table
properties.  If \"scan.timestamp-millis\" is set, the actual startup mode
will be \"from-timestamp\" mode. Otherwise, the actual startup mode will be
\"full\".
  b) FULL. For streaming sources, read the latest snapshot on the table
upon first startup, and continue to read the latest changes. For batch
sources, just consume the latest snapshot but do not read new changes.
  c) LATEST. For streaming sources, continuously reads the latest changes
without reading a snapshot at the beginning.  For batch sources, behaves
the same as the \"full\" startup mode.
  d) FROM_TIMESTAMP. For streaming sources, continuously reads changes
starting from timestamp specified by \"scan.timestamp-millis\", without
reading a snapshot at the beginning. For batch sources, read a snapshot at
timestamp specified by \"scan.timestamp-millis\" but do not read new
changes.
2. Config in Options
  a) scan.timestamp-millis, log.scan.timestamp-millis. Optional timestamp
used in case of \"from-timestamp\" scan mode.
  b) read.compacted. Read the latest compact snapshots only.

After discussing with @Jingsong Li and @Caizhi wen, we found that the
config in Options and StartupMode are not orthogonal. For example,
read.compacted and FROM_TIMESTAMP mode and its behavior in Stream and Batch
sources. We want to improve StartupMode to unify the data reading mode of
Stream and Batch jobs, and add the following StartupMode item:
COMPACTED: For streaming sources, read a snapshot after the latest
compaction on the table upon first startup, and continue to read the latest
changes. For batch sources, just read a snapshot after the latest
compaction but do not read new changes.
The advantage is that for Stream and Batch jobs, we only need to determine
their behavior through StartupMode, but we also found two main problems:
1. The behaviors of some StartupModes in Stream and Batch jobs are
inconsistent, which may cause user misunderstanding, such as FROM_
TIMESTAMP: streaming job reads incremental data, while batch job reads full
data
2. StartupMode does not define all data reading modes. For example,
streaming jobs read snapshots according to timestamp, and then read
incremental data.

To support all data reading modes in Table Store such as time travel, we
try to divide data reading into two orthogonal dimensions: data reading
range and startup position.
1. Data reading range
  a) Incremental, read delta data
  b) Full, read a snapshot first, then read the incremental data according
to different job types (Streaming job).
2. Startup position
  a) Latest, read the latest snapshot.
  b) From-timestamp,read a snapshot according to the timestamp.
  c) From-snapshot-id,read a snapshot according to the snapshot id.
  d) Compacted,read the latest compacted snapshot.

Then there're two questions:

Q1: Does it need to divide into two configuration items, or combine and
unify them in StartupMode?
In StartupMode, it can be unified into eight modes. The advantage is that
through StartupMode, we can clearly explain the behavior of stream and
batch jobs, but there are many defined items, some of which are meaningless
There can be two configurations, users may need to understand the different
meanings of the two configurations. After combination, stream and batch
jobs will have different processing logic.

Q2: In Table Store 0.2, there are some conflicting definitions in Stream
and Batch. Does Table Store 0.3 need to be fully compatible or directly
implemented according to the new definitions? E.g. behavior in
FROM_TIMESTAMP mode.
Fully compatible with the definition of Table Store 0.2, users can directly
execute queries in 0.2 on 0.3, but semantic conflicts in the original
definition will always exist in the next versions.
Only limited compatibility is available, and the default behavior is
implemented according to the new definition, which may result in the query
error of 0.3 when the user queries on 0.2 successfully.

Look forward to your feedback, thanks

Best,
Shammon

Re: [DISCUSSION] Scan mode in Table Store for Flink Stream and Batch job

Posted by Caizhi Weng <ts...@gmail.com>.
Hi all.

It seems that we've reached at an agreement. We'll rename "full" scan.mode
to "latest-full" and "compacted" scan.mode to "compacted-full".

I've created a ticket about this [1] and will work on it soon.

[1] https://issues.apache.org/jira/browse/FLINK-30410

Shammon FY <zj...@gmail.com> 于2022年12月12日周一 16:00写道:

> Hi jinsong, caizhi
>
> Thank you for your reply.
>
> I found that what really confused me was the starting position of the
> streaming job to read data. As @caizhi mentioned in kafka, streaming jobs
> only read incremental data and its definition is very clear. In the Table
> Store, streaming jobs also have the mode of reading full data first, we
> need special consideration for it.
>
> So I really agree with @jinsong's "Solution 3", it looks good to me. We
> define that streaming and batch jobs have different read modes by default,
> and read data according to the starting position defined by scan.mode.
> Personally, I think it is easy for users to understand. For reading modes
> beyond the default, for example, full reading for streaming jobs, add
> suffixes such as "full" in scan.mode looks nice. In this way, the
> definition will be clearer, and the read mode will be unified through
> scan.mode. Thanks
>
>
> Best,
> Shammon
>
>
> On Mon, Dec 12, 2022 at 2:49 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Thanks Shammon for bringing up the discussion.
>>
>> My opinion is to combine everything into scan.mode so that we don't
>> have orthogonal options.
>>
>> You first mention that there are two disadvantages for this solution.
>>
>> *1. The behaviors of some StartupModes in Stream and Batch jobs are
>> inconsistent*
>> This is acceptable from my point of view, because streaming jobs and
>> batch jobs are different after all. Streaming jobs should monitor new
>> changes while batch jobs should only see the records when it is started.
>> This behavior is expected by the users.
>>
>> *2. StartupMode does not define all data reading modes.*
>> Not all reading modes are useful. For example, you mention a reading mode
>> where user first want to read the snapshot at a timestamp, then read all
>> incremental changes. Could you explain in what scenario a user will need
>> this?
>>
>> About the solution you proposed, I think the biggest problem is the
>> default value. That is, if we should read full snapshots by default.
>>
>> When user runs a streaming job with "from-timestamp", he expects to just
>> read incremental changes after the timestamp, just like he is using a
>> "from-timestamp" starting mode of Kafka.
>>
>> However, when user runs a streaming job without a starting timestamp, he
>> expects the result to be correct. In order to provide correct result, we
>> have to produce full snapshots first.
>>
>> So you can see that, for different streaming jobs, the default behavior
>> about whether to read full snapshots is different. It is hard to pick a
>> default value without disturbing the users.
>>
>> Jingsong Li <ji...@gmail.com> 于2022年12月9日周五 17:33写道:
>>
>>> Thanks Shammon!
>>>
>>> Your summary was very good and very detailed.
>>>
>>> I thought about it again.
>>>
>>> ## Solution 1
>>> Actually, according to what you said, there should be so many modes in
>>> theory.
>>> - Runtime-mode: streaming or batch.
>>> - Range: full or incremental.
>>> - Position: Latest, timestamp, snapshot-id, compacted.
>>>
>>> Advantages: The disassembly is very detailed, and every action is very
>>> clear.
>>> Disadvantages: There are many combinations from orthogonality. In
>>> combination with runtime-mode stream or batch, we can say that there
>>> are 16 modes from orthogonality, many of which are meaningless. As you
>>> said, default behavior is also a problem.
>>>
>>> ## Solution 2
>>> Currently [1]:
>>> - The environment determines the runtime-mode whether it is streaming
>>> or a batch.
>>> - The `scan.mode` determines position.
>>> - No specific option determines `range`, but it is determined by
>>> runtime-mode. However, it is not completely determined by runtime
>>> mode, such as `full` and `compacted`, which are also read in
>>> full-range under the stream.
>>>
>>> Advantages: Simple. The default values of options are what we want for
>>> streaming and batch.
>>> Disadvantages:
>>> 1. The semantics of from timestamp are different in the case of
>>> streaming and batch.
>>> 2. `full` and `compacted` are special.
>>>
>>> ## Solution 3
>>>
>>> I understand that the core problem of solution2 may be more problem 2:
>>> `full` and `compacted` are special.
>>> How about:
>>> - the runtime mode determines whether to read incremental only or full
>>> data.
>>> - `scan.mode` contains: Latest, timestamp, snapshot-id.
>>> The default is full in batch mode and incremental in stream mode.
>>>
>>> However, we have two other choices for `scan.mode`: `latest-full`,
>>> `compacted-full`. Regardless of the runtime-mode, the two choices
>>> force full range to read.
>>>
>>> I think solution 3 is a compromise solution. It can also ensure the
>>> availability of default values. Conceptually, it can at least explain
>>> the current options.
>>>
>>> What do you think?
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/configuration/
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Thu, Dec 8, 2022 at 4:11 PM Shammon FY <zj...@gmail.com> wrote:
>>> >
>>> > Hi devs:
>>> >
>>> > I'm an engineer from ByteDance, and here I'd link to discuss "Scan
>>> mode in Table Store for Flink Stream and Batch job".
>>> >
>>> > Users can execute Flink Steam and Batch jobs on Table Store. In Table
>>> Store 0.2 there're two items which determine how the Stream and Batch jobs'
>>> sources read data: StartupMode and config in Options.
>>> > 1. StartupMode
>>> >   a) DEFAULT. Determines actual startup mode according to other table
>>> properties.  If \"scan.timestamp-millis\" is set, the actual startup mode
>>> will be \"from-timestamp\" mode. Otherwise, the actual startup mode will be
>>> \"full\".
>>> >   b) FULL. For streaming sources, read the latest snapshot on the
>>> table upon first startup, and continue to read the latest changes. For
>>> batch sources, just consume the latest snapshot but do not read new changes.
>>> >   c) LATEST. For streaming sources, continuously reads the latest
>>> changes without reading a snapshot at the beginning.  For batch sources,
>>> behaves the same as the \"full\" startup mode.
>>> >   d) FROM_TIMESTAMP. For streaming sources, continuously reads changes
>>> starting from timestamp specified by \"scan.timestamp-millis\", without
>>> reading a snapshot at the beginning. For batch sources, read a snapshot at
>>> timestamp specified by \"scan.timestamp-millis\" but do not read new
>>> changes.
>>> > 2. Config in Options
>>> >   a) scan.timestamp-millis, log.scan.timestamp-millis. Optional
>>> timestamp used in case of \"from-timestamp\" scan mode.
>>> >   b) read.compacted. Read the latest compact snapshots only.
>>> >
>>> > After discussing with @Jingsong Li and @Caizhi wen, we found that the
>>> config in Options and StartupMode are not orthogonal. For example,
>>> read.compacted and FROM_TIMESTAMP mode and its behavior in Stream and Batch
>>> sources. We want to improve StartupMode to unify the data reading mode of
>>> Stream and Batch jobs, and add the following StartupMode item:
>>> > COMPACTED: For streaming sources, read a snapshot after the latest
>>> compaction on the table upon first startup, and continue to read the latest
>>> changes. For batch sources, just read a snapshot after the latest
>>> compaction but do not read new changes.
>>> > The advantage is that for Stream and Batch jobs, we only need to
>>> determine their behavior through StartupMode, but we also found two main
>>> problems:
>>> > 1. The behaviors of some StartupModes in Stream and Batch jobs are
>>> inconsistent, which may cause user misunderstanding, such as FROM_
>>> TIMESTAMP: streaming job reads incremental data, while batch job reads full
>>> data
>>> > 2. StartupMode does not define all data reading modes. For example,
>>> streaming jobs read snapshots according to timestamp, and then read
>>> incremental data.
>>> >
>>> > To support all data reading modes in Table Store such as time travel,
>>> we try to divide data reading into two orthogonal dimensions: data reading
>>> range and startup position.
>>> > 1. Data reading range
>>> >   a) Incremental, read delta data
>>> >   b) Full, read a snapshot first, then read the incremental data
>>> according to different job types (Streaming job).
>>> > 2. Startup position
>>> >   a) Latest, read the latest snapshot.
>>> >   b) From-timestamp,read a snapshot according to the timestamp.
>>> >   c) From-snapshot-id,read a snapshot according to the snapshot id.
>>> >   d) Compacted,read the latest compacted snapshot.
>>> >
>>> > Then there're two questions:
>>> >
>>> > Q1: Does it need to divide into two configuration items, or combine
>>> and unify them in StartupMode?
>>> > In StartupMode, it can be unified into eight modes. The advantage is
>>> that through StartupMode, we can clearly explain the behavior of stream and
>>> batch jobs, but there are many defined items, some of which are meaningless
>>> > There can be two configurations, users may need to understand the
>>> different meanings of the two configurations. After combination, stream and
>>> batch jobs will have different processing logic.
>>> >
>>> > Q2: In Table Store 0.2, there are some conflicting definitions in
>>> Stream and Batch. Does Table Store 0.3 need to be fully compatible or
>>> directly implemented according to the new definitions? E.g. behavior in
>>> FROM_TIMESTAMP mode.
>>> > Fully compatible with the definition of Table Store 0.2, users can
>>> directly execute queries in 0.2 on 0.3, but semantic conflicts in the
>>> original definition will always exist in the next versions.
>>> > Only limited compatibility is available, and the default behavior is
>>> implemented according to the new definition, which may result in the query
>>> error of 0.3 when the user queries on 0.2 successfully.
>>> >
>>> > Look forward to your feedback, thanks
>>> >
>>> > Best,
>>> > Shammon
>>> >
>>> >
>>> >
>>>
>>

Re: [DISCUSSION] Scan mode in Table Store for Flink Stream and Batch job

Posted by Shammon FY <zj...@gmail.com>.
Hi jinsong, caizhi

Thank you for your reply.

I found that what really confused me was the starting position of the
streaming job to read data. As @caizhi mentioned in kafka, streaming jobs
only read incremental data and its definition is very clear. In the Table
Store, streaming jobs also have the mode of reading full data first, we
need special consideration for it.

So I really agree with @jinsong's "Solution 3", it looks good to me. We
define that streaming and batch jobs have different read modes by default,
and read data according to the starting position defined by scan.mode.
Personally, I think it is easy for users to understand. For reading modes
beyond the default, for example, full reading for streaming jobs, add
suffixes such as "full" in scan.mode looks nice. In this way, the
definition will be clearer, and the read mode will be unified through
scan.mode. Thanks


Best,
Shammon


On Mon, Dec 12, 2022 at 2:49 PM Caizhi Weng <ts...@gmail.com> wrote:

> Thanks Shammon for bringing up the discussion.
>
> My opinion is to combine everything into scan.mode so that we don't
> have orthogonal options.
>
> You first mention that there are two disadvantages for this solution.
>
> *1. The behaviors of some StartupModes in Stream and Batch jobs are
> inconsistent*
> This is acceptable from my point of view, because streaming jobs and batch
> jobs are different after all. Streaming jobs should monitor new changes
> while batch jobs should only see the records when it is started. This
> behavior is expected by the users.
>
> *2. StartupMode does not define all data reading modes.*
> Not all reading modes are useful. For example, you mention a reading mode
> where user first want to read the snapshot at a timestamp, then read all
> incremental changes. Could you explain in what scenario a user will need
> this?
>
> About the solution you proposed, I think the biggest problem is the
> default value. That is, if we should read full snapshots by default.
>
> When user runs a streaming job with "from-timestamp", he expects to just
> read incremental changes after the timestamp, just like he is using a
> "from-timestamp" starting mode of Kafka.
>
> However, when user runs a streaming job without a starting timestamp, he
> expects the result to be correct. In order to provide correct result, we
> have to produce full snapshots first.
>
> So you can see that, for different streaming jobs, the default behavior
> about whether to read full snapshots is different. It is hard to pick a
> default value without disturbing the users.
>
> Jingsong Li <ji...@gmail.com> 于2022年12月9日周五 17:33写道:
>
>> Thanks Shammon!
>>
>> Your summary was very good and very detailed.
>>
>> I thought about it again.
>>
>> ## Solution 1
>> Actually, according to what you said, there should be so many modes in
>> theory.
>> - Runtime-mode: streaming or batch.
>> - Range: full or incremental.
>> - Position: Latest, timestamp, snapshot-id, compacted.
>>
>> Advantages: The disassembly is very detailed, and every action is very
>> clear.
>> Disadvantages: There are many combinations from orthogonality. In
>> combination with runtime-mode stream or batch, we can say that there
>> are 16 modes from orthogonality, many of which are meaningless. As you
>> said, default behavior is also a problem.
>>
>> ## Solution 2
>> Currently [1]:
>> - The environment determines the runtime-mode whether it is streaming
>> or a batch.
>> - The `scan.mode` determines position.
>> - No specific option determines `range`, but it is determined by
>> runtime-mode. However, it is not completely determined by runtime
>> mode, such as `full` and `compacted`, which are also read in
>> full-range under the stream.
>>
>> Advantages: Simple. The default values of options are what we want for
>> streaming and batch.
>> Disadvantages:
>> 1. The semantics of from timestamp are different in the case of
>> streaming and batch.
>> 2. `full` and `compacted` are special.
>>
>> ## Solution 3
>>
>> I understand that the core problem of solution2 may be more problem 2:
>> `full` and `compacted` are special.
>> How about:
>> - the runtime mode determines whether to read incremental only or full
>> data.
>> - `scan.mode` contains: Latest, timestamp, snapshot-id.
>> The default is full in batch mode and incremental in stream mode.
>>
>> However, we have two other choices for `scan.mode`: `latest-full`,
>> `compacted-full`. Regardless of the runtime-mode, the two choices
>> force full range to read.
>>
>> I think solution 3 is a compromise solution. It can also ensure the
>> availability of default values. Conceptually, it can at least explain
>> the current options.
>>
>> What do you think?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/configuration/
>>
>> Best,
>> Jingsong
>>
>> On Thu, Dec 8, 2022 at 4:11 PM Shammon FY <zj...@gmail.com> wrote:
>> >
>> > Hi devs:
>> >
>> > I'm an engineer from ByteDance, and here I'd link to discuss "Scan mode
>> in Table Store for Flink Stream and Batch job".
>> >
>> > Users can execute Flink Steam and Batch jobs on Table Store. In Table
>> Store 0.2 there're two items which determine how the Stream and Batch jobs'
>> sources read data: StartupMode and config in Options.
>> > 1. StartupMode
>> >   a) DEFAULT. Determines actual startup mode according to other table
>> properties.  If \"scan.timestamp-millis\" is set, the actual startup mode
>> will be \"from-timestamp\" mode. Otherwise, the actual startup mode will be
>> \"full\".
>> >   b) FULL. For streaming sources, read the latest snapshot on the table
>> upon first startup, and continue to read the latest changes. For batch
>> sources, just consume the latest snapshot but do not read new changes.
>> >   c) LATEST. For streaming sources, continuously reads the latest
>> changes without reading a snapshot at the beginning.  For batch sources,
>> behaves the same as the \"full\" startup mode.
>> >   d) FROM_TIMESTAMP. For streaming sources, continuously reads changes
>> starting from timestamp specified by \"scan.timestamp-millis\", without
>> reading a snapshot at the beginning. For batch sources, read a snapshot at
>> timestamp specified by \"scan.timestamp-millis\" but do not read new
>> changes.
>> > 2. Config in Options
>> >   a) scan.timestamp-millis, log.scan.timestamp-millis. Optional
>> timestamp used in case of \"from-timestamp\" scan mode.
>> >   b) read.compacted. Read the latest compact snapshots only.
>> >
>> > After discussing with @Jingsong Li and @Caizhi wen, we found that the
>> config in Options and StartupMode are not orthogonal. For example,
>> read.compacted and FROM_TIMESTAMP mode and its behavior in Stream and Batch
>> sources. We want to improve StartupMode to unify the data reading mode of
>> Stream and Batch jobs, and add the following StartupMode item:
>> > COMPACTED: For streaming sources, read a snapshot after the latest
>> compaction on the table upon first startup, and continue to read the latest
>> changes. For batch sources, just read a snapshot after the latest
>> compaction but do not read new changes.
>> > The advantage is that for Stream and Batch jobs, we only need to
>> determine their behavior through StartupMode, but we also found two main
>> problems:
>> > 1. The behaviors of some StartupModes in Stream and Batch jobs are
>> inconsistent, which may cause user misunderstanding, such as FROM_
>> TIMESTAMP: streaming job reads incremental data, while batch job reads full
>> data
>> > 2. StartupMode does not define all data reading modes. For example,
>> streaming jobs read snapshots according to timestamp, and then read
>> incremental data.
>> >
>> > To support all data reading modes in Table Store such as time travel,
>> we try to divide data reading into two orthogonal dimensions: data reading
>> range and startup position.
>> > 1. Data reading range
>> >   a) Incremental, read delta data
>> >   b) Full, read a snapshot first, then read the incremental data
>> according to different job types (Streaming job).
>> > 2. Startup position
>> >   a) Latest, read the latest snapshot.
>> >   b) From-timestamp,read a snapshot according to the timestamp.
>> >   c) From-snapshot-id,read a snapshot according to the snapshot id.
>> >   d) Compacted,read the latest compacted snapshot.
>> >
>> > Then there're two questions:
>> >
>> > Q1: Does it need to divide into two configuration items, or combine and
>> unify them in StartupMode?
>> > In StartupMode, it can be unified into eight modes. The advantage is
>> that through StartupMode, we can clearly explain the behavior of stream and
>> batch jobs, but there are many defined items, some of which are meaningless
>> > There can be two configurations, users may need to understand the
>> different meanings of the two configurations. After combination, stream and
>> batch jobs will have different processing logic.
>> >
>> > Q2: In Table Store 0.2, there are some conflicting definitions in
>> Stream and Batch. Does Table Store 0.3 need to be fully compatible or
>> directly implemented according to the new definitions? E.g. behavior in
>> FROM_TIMESTAMP mode.
>> > Fully compatible with the definition of Table Store 0.2, users can
>> directly execute queries in 0.2 on 0.3, but semantic conflicts in the
>> original definition will always exist in the next versions.
>> > Only limited compatibility is available, and the default behavior is
>> implemented according to the new definition, which may result in the query
>> error of 0.3 when the user queries on 0.2 successfully.
>> >
>> > Look forward to your feedback, thanks
>> >
>> > Best,
>> > Shammon
>> >
>> >
>> >
>>
>

Re: [DISCUSSION] Scan mode in Table Store for Flink Stream and Batch job

Posted by Caizhi Weng <ts...@gmail.com>.
Thanks Shammon for bringing up the discussion.

My opinion is to combine everything into scan.mode so that we don't
have orthogonal options.

You first mention that there are two disadvantages for this solution.

*1. The behaviors of some StartupModes in Stream and Batch jobs are
inconsistent*
This is acceptable from my point of view, because streaming jobs and batch
jobs are different after all. Streaming jobs should monitor new changes
while batch jobs should only see the records when it is started. This
behavior is expected by the users.

*2. StartupMode does not define all data reading modes.*
Not all reading modes are useful. For example, you mention a reading mode
where user first want to read the snapshot at a timestamp, then read all
incremental changes. Could you explain in what scenario a user will need
this?

About the solution you proposed, I think the biggest problem is the default
value. That is, if we should read full snapshots by default.

When user runs a streaming job with "from-timestamp", he expects to just
read incremental changes after the timestamp, just like he is using a
"from-timestamp" starting mode of Kafka.

However, when user runs a streaming job without a starting timestamp, he
expects the result to be correct. In order to provide correct result, we
have to produce full snapshots first.

So you can see that, for different streaming jobs, the default behavior
about whether to read full snapshots is different. It is hard to pick a
default value without disturbing the users.

Jingsong Li <ji...@gmail.com> 于2022年12月9日周五 17:33写道:

> Thanks Shammon!
>
> Your summary was very good and very detailed.
>
> I thought about it again.
>
> ## Solution 1
> Actually, according to what you said, there should be so many modes in
> theory.
> - Runtime-mode: streaming or batch.
> - Range: full or incremental.
> - Position: Latest, timestamp, snapshot-id, compacted.
>
> Advantages: The disassembly is very detailed, and every action is very
> clear.
> Disadvantages: There are many combinations from orthogonality. In
> combination with runtime-mode stream or batch, we can say that there
> are 16 modes from orthogonality, many of which are meaningless. As you
> said, default behavior is also a problem.
>
> ## Solution 2
> Currently [1]:
> - The environment determines the runtime-mode whether it is streaming
> or a batch.
> - The `scan.mode` determines position.
> - No specific option determines `range`, but it is determined by
> runtime-mode. However, it is not completely determined by runtime
> mode, such as `full` and `compacted`, which are also read in
> full-range under the stream.
>
> Advantages: Simple. The default values of options are what we want for
> streaming and batch.
> Disadvantages:
> 1. The semantics of from timestamp are different in the case of
> streaming and batch.
> 2. `full` and `compacted` are special.
>
> ## Solution 3
>
> I understand that the core problem of solution2 may be more problem 2:
> `full` and `compacted` are special.
> How about:
> - the runtime mode determines whether to read incremental only or full
> data.
> - `scan.mode` contains: Latest, timestamp, snapshot-id.
> The default is full in batch mode and incremental in stream mode.
>
> However, we have two other choices for `scan.mode`: `latest-full`,
> `compacted-full`. Regardless of the runtime-mode, the two choices
> force full range to read.
>
> I think solution 3 is a compromise solution. It can also ensure the
> availability of default values. Conceptually, it can at least explain
> the current options.
>
> What do you think?
>
> [1]
> https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/configuration/
>
> Best,
> Jingsong
>
> On Thu, Dec 8, 2022 at 4:11 PM Shammon FY <zj...@gmail.com> wrote:
> >
> > Hi devs:
> >
> > I'm an engineer from ByteDance, and here I'd link to discuss "Scan mode
> in Table Store for Flink Stream and Batch job".
> >
> > Users can execute Flink Steam and Batch jobs on Table Store. In Table
> Store 0.2 there're two items which determine how the Stream and Batch jobs'
> sources read data: StartupMode and config in Options.
> > 1. StartupMode
> >   a) DEFAULT. Determines actual startup mode according to other table
> properties.  If \"scan.timestamp-millis\" is set, the actual startup mode
> will be \"from-timestamp\" mode. Otherwise, the actual startup mode will be
> \"full\".
> >   b) FULL. For streaming sources, read the latest snapshot on the table
> upon first startup, and continue to read the latest changes. For batch
> sources, just consume the latest snapshot but do not read new changes.
> >   c) LATEST. For streaming sources, continuously reads the latest
> changes without reading a snapshot at the beginning.  For batch sources,
> behaves the same as the \"full\" startup mode.
> >   d) FROM_TIMESTAMP. For streaming sources, continuously reads changes
> starting from timestamp specified by \"scan.timestamp-millis\", without
> reading a snapshot at the beginning. For batch sources, read a snapshot at
> timestamp specified by \"scan.timestamp-millis\" but do not read new
> changes.
> > 2. Config in Options
> >   a) scan.timestamp-millis, log.scan.timestamp-millis. Optional
> timestamp used in case of \"from-timestamp\" scan mode.
> >   b) read.compacted. Read the latest compact snapshots only.
> >
> > After discussing with @Jingsong Li and @Caizhi wen, we found that the
> config in Options and StartupMode are not orthogonal. For example,
> read.compacted and FROM_TIMESTAMP mode and its behavior in Stream and Batch
> sources. We want to improve StartupMode to unify the data reading mode of
> Stream and Batch jobs, and add the following StartupMode item:
> > COMPACTED: For streaming sources, read a snapshot after the latest
> compaction on the table upon first startup, and continue to read the latest
> changes. For batch sources, just read a snapshot after the latest
> compaction but do not read new changes.
> > The advantage is that for Stream and Batch jobs, we only need to
> determine their behavior through StartupMode, but we also found two main
> problems:
> > 1. The behaviors of some StartupModes in Stream and Batch jobs are
> inconsistent, which may cause user misunderstanding, such as FROM_
> TIMESTAMP: streaming job reads incremental data, while batch job reads full
> data
> > 2. StartupMode does not define all data reading modes. For example,
> streaming jobs read snapshots according to timestamp, and then read
> incremental data.
> >
> > To support all data reading modes in Table Store such as time travel, we
> try to divide data reading into two orthogonal dimensions: data reading
> range and startup position.
> > 1. Data reading range
> >   a) Incremental, read delta data
> >   b) Full, read a snapshot first, then read the incremental data
> according to different job types (Streaming job).
> > 2. Startup position
> >   a) Latest, read the latest snapshot.
> >   b) From-timestamp,read a snapshot according to the timestamp.
> >   c) From-snapshot-id,read a snapshot according to the snapshot id.
> >   d) Compacted,read the latest compacted snapshot.
> >
> > Then there're two questions:
> >
> > Q1: Does it need to divide into two configuration items, or combine and
> unify them in StartupMode?
> > In StartupMode, it can be unified into eight modes. The advantage is
> that through StartupMode, we can clearly explain the behavior of stream and
> batch jobs, but there are many defined items, some of which are meaningless
> > There can be two configurations, users may need to understand the
> different meanings of the two configurations. After combination, stream and
> batch jobs will have different processing logic.
> >
> > Q2: In Table Store 0.2, there are some conflicting definitions in Stream
> and Batch. Does Table Store 0.3 need to be fully compatible or directly
> implemented according to the new definitions? E.g. behavior in
> FROM_TIMESTAMP mode.
> > Fully compatible with the definition of Table Store 0.2, users can
> directly execute queries in 0.2 on 0.3, but semantic conflicts in the
> original definition will always exist in the next versions.
> > Only limited compatibility is available, and the default behavior is
> implemented according to the new definition, which may result in the query
> error of 0.3 when the user queries on 0.2 successfully.
> >
> > Look forward to your feedback, thanks
> >
> > Best,
> > Shammon
> >
> >
> >
>

Re: [DISCUSSION] Scan mode in Table Store for Flink Stream and Batch job

Posted by Jingsong Li <ji...@gmail.com>.
Thanks Shammon!

Your summary was very good and very detailed.

I thought about it again.

## Solution 1
Actually, according to what you said, there should be so many modes in theory.
- Runtime-mode: streaming or batch.
- Range: full or incremental.
- Position: Latest, timestamp, snapshot-id, compacted.

Advantages: The disassembly is very detailed, and every action is very clear.
Disadvantages: There are many combinations from orthogonality. In
combination with runtime-mode stream or batch, we can say that there
are 16 modes from orthogonality, many of which are meaningless. As you
said, default behavior is also a problem.

## Solution 2
Currently [1]:
- The environment determines the runtime-mode whether it is streaming
or a batch.
- The `scan.mode` determines position.
- No specific option determines `range`, but it is determined by
runtime-mode. However, it is not completely determined by runtime
mode, such as `full` and `compacted`, which are also read in
full-range under the stream.

Advantages: Simple. The default values of options are what we want for
streaming and batch.
Disadvantages:
1. The semantics of from timestamp are different in the case of
streaming and batch.
2. `full` and `compacted` are special.

## Solution 3

I understand that the core problem of solution2 may be more problem 2:
`full` and `compacted` are special.
How about:
- the runtime mode determines whether to read incremental only or full data.
- `scan.mode` contains: Latest, timestamp, snapshot-id.
The default is full in batch mode and incremental in stream mode.

However, we have two other choices for `scan.mode`: `latest-full`,
`compacted-full`. Regardless of the runtime-mode, the two choices
force full range to read.

I think solution 3 is a compromise solution. It can also ensure the
availability of default values. Conceptually, it can at least explain
the current options.

What do you think?

[1] https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/configuration/

Best,
Jingsong

On Thu, Dec 8, 2022 at 4:11 PM Shammon FY <zj...@gmail.com> wrote:
>
> Hi devs:
>
> I'm an engineer from ByteDance, and here I'd link to discuss "Scan mode in Table Store for Flink Stream and Batch job".
>
> Users can execute Flink Steam and Batch jobs on Table Store. In Table Store 0.2 there're two items which determine how the Stream and Batch jobs' sources read data: StartupMode and config in Options.
> 1. StartupMode
>   a) DEFAULT. Determines actual startup mode according to other table properties.  If \"scan.timestamp-millis\" is set, the actual startup mode will be \"from-timestamp\" mode. Otherwise, the actual startup mode will be \"full\".
>   b) FULL. For streaming sources, read the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just consume the latest snapshot but do not read new changes.
>   c) LATEST. For streaming sources, continuously reads the latest changes without reading a snapshot at the beginning.  For batch sources, behaves the same as the \"full\" startup mode.
>   d) FROM_TIMESTAMP. For streaming sources, continuously reads changes starting from timestamp specified by \"scan.timestamp-millis\", without reading a snapshot at the beginning. For batch sources, read a snapshot at timestamp specified by \"scan.timestamp-millis\" but do not read new changes.
> 2. Config in Options
>   a) scan.timestamp-millis, log.scan.timestamp-millis. Optional timestamp used in case of \"from-timestamp\" scan mode.
>   b) read.compacted. Read the latest compact snapshots only.
>
> After discussing with @Jingsong Li and @Caizhi wen, we found that the config in Options and StartupMode are not orthogonal. For example, read.compacted and FROM_TIMESTAMP mode and its behavior in Stream and Batch sources. We want to improve StartupMode to unify the data reading mode of Stream and Batch jobs, and add the following StartupMode item:
> COMPACTED: For streaming sources, read a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just read a snapshot after the latest compaction but do not read new changes.
> The advantage is that for Stream and Batch jobs, we only need to determine their behavior through StartupMode, but we also found two main problems:
> 1. The behaviors of some StartupModes in Stream and Batch jobs are inconsistent, which may cause user misunderstanding, such as FROM_ TIMESTAMP: streaming job reads incremental data, while batch job reads full data
> 2. StartupMode does not define all data reading modes. For example, streaming jobs read snapshots according to timestamp, and then read incremental data.
>
> To support all data reading modes in Table Store such as time travel, we try to divide data reading into two orthogonal dimensions: data reading range and startup position.
> 1. Data reading range
>   a) Incremental, read delta data
>   b) Full, read a snapshot first, then read the incremental data according to different job types (Streaming job).
> 2. Startup position
>   a) Latest, read the latest snapshot.
>   b) From-timestamp,read a snapshot according to the timestamp.
>   c) From-snapshot-id,read a snapshot according to the snapshot id.
>   d) Compacted,read the latest compacted snapshot.
>
> Then there're two questions:
>
> Q1: Does it need to divide into two configuration items, or combine and unify them in StartupMode?
> In StartupMode, it can be unified into eight modes. The advantage is that through StartupMode, we can clearly explain the behavior of stream and batch jobs, but there are many defined items, some of which are meaningless
> There can be two configurations, users may need to understand the different meanings of the two configurations. After combination, stream and batch jobs will have different processing logic.
>
> Q2: In Table Store 0.2, there are some conflicting definitions in Stream and Batch. Does Table Store 0.3 need to be fully compatible or directly implemented according to the new definitions? E.g. behavior in FROM_TIMESTAMP mode.
> Fully compatible with the definition of Table Store 0.2, users can directly execute queries in 0.2 on 0.3, but semantic conflicts in the original definition will always exist in the next versions.
> Only limited compatibility is available, and the default behavior is implemented according to the new definition, which may result in the query error of 0.3 when the user queries on 0.2 successfully.
>
> Look forward to your feedback, thanks
>
> Best,
> Shammon
>
>
>