You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Giannis Polyzos <ip...@gmail.com> on 2023/04/26 15:19:00 UTC

Flink SQL State

I have two input kafka topics - a compacted one (with upsert-kafka) and a
normal one.
When I perform a temporal join I notice the following state being created
in rocksdb and was hoping someone could help me better understand what
everything means


*> deduplicate-state:* does it refer to duplicate keys found by the
kafka-upsert-connector?
*> timers:* what *timer* and *_timer_state/event_timers *refer to and whats
their difference? Is it to keep track on when the join results need to be
materialised or state to be expired?
*> next-index: *what does it refer to?
*> left: *also I'm curious why the *left *cf has 407 entries. Is it records
that are being buffered because there is no match on the right table?

Thanks

Re: Flink SQL State

Posted by Giannis Polyzos <ip...@gmail.com>.
Will definitely do as it's going to be part of a wider Flink course / book
(haven't decided yet on the format) Im putting together.
but I can share before that If you want

On Thu, Apr 27, 2023 at 6:11 PM Yaroslav Tkachenko <ya...@goldsky.com>
wrote:

> Got it! Any chance you can open-source some of that? I think it can be
> extremely useful for the community.
>
> Thank you.
>
> On Thu, Apr 27, 2023 at 8:08 AM Giannis Polyzos <ip...@gmail.com>
> wrote:
>
>> Correct, its some custom code i put together to investigate what gets
>> written in rocksdb
>>
>> On Thu, Apr 27, 2023 at 6:06 PM Yaroslav Tkachenko <ya...@goldsky.com>
>> wrote:
>>
>>> Hi Giannis,
>>>
>>> I'm curious, what tool did you use for this analysis (what the
>>> screenshot shows)? Is it something custom?
>>>
>>> Thank you.
>>>
>>> On Wed, Apr 26, 2023 at 10:38 PM Giannis Polyzos <ip...@gmail.com>
>>> wrote:
>>>
>>>> This is really helpful,
>>>>
>>>> Thanks
>>>>
>>>> On Thu, Apr 27, 2023 at 5:46 AM Yanfei Lei <fr...@gmail.com> wrote:
>>>>
>>>>> Hi Giannis,
>>>>>
>>>>> Except “default” Colume Family(CF), all other CFs represent the state
>>>>> in rocksdb state backend, the name of a CF is the name of a
>>>>> StateDescriptor.
>>>>>
>>>>> - deduplicate-state is a value state, you can find it in
>>>>> DeduplicateFunctionBase.java and
>>>>> MiniBatchDeduplicateFunctionBase.java, they are used for
>>>>> deduplication.
>>>>> - _timer_state/event_user-timers, _timer_state/event_timers ,
>>>>> _timer_state/processing_timers and _timer_state/processing_user-timers
>>>>>  are created by internal time service, which can be found in
>>>>> InternalTimeServiceManagerImpl.java. Here is a blog post[1] on best
>>>>> practices for using timers.
>>>>> - timer, next-index, left and right can be found in
>>>>> TemporalRowTimeJoinOperator.java, TemporalRowTimeJoinOperator
>>>>> implements the logic of temporal join, this post[2] might be helpful
>>>>> in understanding what happened to temporal join.
>>>>>
>>>>> [1]
>>>>> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/datastream-timer
>>>>> [2]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins
>>>>>
>>>>> Giannis Polyzos <ip...@gmail.com> 于2023年4月26日周三 23:19写道:
>>>>> >
>>>>> > I have two input kafka topics - a compacted one (with upsert-kafka)
>>>>> and a normal one.
>>>>> > When I perform a temporal join I notice the following state being
>>>>> created in rocksdb and was hoping someone could help me better understand
>>>>> what everything means
>>>>> >
>>>>> >
>>>>> > > deduplicate-state: does it refer to duplicate keys found by the
>>>>> kafka-upsert-connector?
>>>>> > > timers: what timer and _timer_state/event_timers refer to and
>>>>> whats their difference? Is it to keep track on when the join results need
>>>>> to be materialised or state to be expired?
>>>>> > > next-index: what does it refer to?
>>>>> > > left: also I'm curious why the left cf has 407 entries. Is it
>>>>> records that are being buffered because there is no match on the right
>>>>> table?
>>>>> >
>>>>> > Thanks
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best,
>>>>> Yanfei
>>>>>
>>>>

Re: Flink SQL State

Posted by Yaroslav Tkachenko <ya...@goldsky.com>.
Got it! Any chance you can open-source some of that? I think it can be
extremely useful for the community.

Thank you.

On Thu, Apr 27, 2023 at 8:08 AM Giannis Polyzos <ip...@gmail.com>
wrote:

> Correct, its some custom code i put together to investigate what gets
> written in rocksdb
>
> On Thu, Apr 27, 2023 at 6:06 PM Yaroslav Tkachenko <ya...@goldsky.com>
> wrote:
>
>> Hi Giannis,
>>
>> I'm curious, what tool did you use for this analysis (what the screenshot
>> shows)? Is it something custom?
>>
>> Thank you.
>>
>> On Wed, Apr 26, 2023 at 10:38 PM Giannis Polyzos <ip...@gmail.com>
>> wrote:
>>
>>> This is really helpful,
>>>
>>> Thanks
>>>
>>> On Thu, Apr 27, 2023 at 5:46 AM Yanfei Lei <fr...@gmail.com> wrote:
>>>
>>>> Hi Giannis,
>>>>
>>>> Except “default” Colume Family(CF), all other CFs represent the state
>>>> in rocksdb state backend, the name of a CF is the name of a
>>>> StateDescriptor.
>>>>
>>>> - deduplicate-state is a value state, you can find it in
>>>> DeduplicateFunctionBase.java and
>>>> MiniBatchDeduplicateFunctionBase.java, they are used for
>>>> deduplication.
>>>> - _timer_state/event_user-timers, _timer_state/event_timers ,
>>>> _timer_state/processing_timers and _timer_state/processing_user-timers
>>>>  are created by internal time service, which can be found in
>>>> InternalTimeServiceManagerImpl.java. Here is a blog post[1] on best
>>>> practices for using timers.
>>>> - timer, next-index, left and right can be found in
>>>> TemporalRowTimeJoinOperator.java, TemporalRowTimeJoinOperator
>>>> implements the logic of temporal join, this post[2] might be helpful
>>>> in understanding what happened to temporal join.
>>>>
>>>> [1]
>>>> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/datastream-timer
>>>> [2]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins
>>>>
>>>> Giannis Polyzos <ip...@gmail.com> 于2023年4月26日周三 23:19写道:
>>>> >
>>>> > I have two input kafka topics - a compacted one (with upsert-kafka)
>>>> and a normal one.
>>>> > When I perform a temporal join I notice the following state being
>>>> created in rocksdb and was hoping someone could help me better understand
>>>> what everything means
>>>> >
>>>> >
>>>> > > deduplicate-state: does it refer to duplicate keys found by the
>>>> kafka-upsert-connector?
>>>> > > timers: what timer and _timer_state/event_timers refer to and whats
>>>> their difference? Is it to keep track on when the join results need to be
>>>> materialised or state to be expired?
>>>> > > next-index: what does it refer to?
>>>> > > left: also I'm curious why the left cf has 407 entries. Is it
>>>> records that are being buffered because there is no match on the right
>>>> table?
>>>> >
>>>> > Thanks
>>>>
>>>>
>>>>
>>>> --
>>>> Best,
>>>> Yanfei
>>>>
>>>

Re: Flink SQL State

Posted by Giannis Polyzos <ip...@gmail.com>.
Correct, its some custom code i put together to investigate what gets
written in rocksdb

On Thu, Apr 27, 2023 at 6:06 PM Yaroslav Tkachenko <ya...@goldsky.com>
wrote:

> Hi Giannis,
>
> I'm curious, what tool did you use for this analysis (what the screenshot
> shows)? Is it something custom?
>
> Thank you.
>
> On Wed, Apr 26, 2023 at 10:38 PM Giannis Polyzos <ip...@gmail.com>
> wrote:
>
>> This is really helpful,
>>
>> Thanks
>>
>> On Thu, Apr 27, 2023 at 5:46 AM Yanfei Lei <fr...@gmail.com> wrote:
>>
>>> Hi Giannis,
>>>
>>> Except “default” Colume Family(CF), all other CFs represent the state
>>> in rocksdb state backend, the name of a CF is the name of a
>>> StateDescriptor.
>>>
>>> - deduplicate-state is a value state, you can find it in
>>> DeduplicateFunctionBase.java and
>>> MiniBatchDeduplicateFunctionBase.java, they are used for
>>> deduplication.
>>> - _timer_state/event_user-timers, _timer_state/event_timers ,
>>> _timer_state/processing_timers and _timer_state/processing_user-timers
>>>  are created by internal time service, which can be found in
>>> InternalTimeServiceManagerImpl.java. Here is a blog post[1] on best
>>> practices for using timers.
>>> - timer, next-index, left and right can be found in
>>> TemporalRowTimeJoinOperator.java, TemporalRowTimeJoinOperator
>>> implements the logic of temporal join, this post[2] might be helpful
>>> in understanding what happened to temporal join.
>>>
>>> [1]
>>> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/datastream-timer
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins
>>>
>>> Giannis Polyzos <ip...@gmail.com> 于2023年4月26日周三 23:19写道:
>>> >
>>> > I have two input kafka topics - a compacted one (with upsert-kafka)
>>> and a normal one.
>>> > When I perform a temporal join I notice the following state being
>>> created in rocksdb and was hoping someone could help me better understand
>>> what everything means
>>> >
>>> >
>>> > > deduplicate-state: does it refer to duplicate keys found by the
>>> kafka-upsert-connector?
>>> > > timers: what timer and _timer_state/event_timers refer to and whats
>>> their difference? Is it to keep track on when the join results need to be
>>> materialised or state to be expired?
>>> > > next-index: what does it refer to?
>>> > > left: also I'm curious why the left cf has 407 entries. Is it
>>> records that are being buffered because there is no match on the right
>>> table?
>>> >
>>> > Thanks
>>>
>>>
>>>
>>> --
>>> Best,
>>> Yanfei
>>>
>>

Re: Flink SQL State

Posted by Yaroslav Tkachenko <ya...@goldsky.com>.
Hi Giannis,

I'm curious, what tool did you use for this analysis (what the screenshot
shows)? Is it something custom?

Thank you.

On Wed, Apr 26, 2023 at 10:38 PM Giannis Polyzos <ip...@gmail.com>
wrote:

> This is really helpful,
>
> Thanks
>
> On Thu, Apr 27, 2023 at 5:46 AM Yanfei Lei <fr...@gmail.com> wrote:
>
>> Hi Giannis,
>>
>> Except “default” Colume Family(CF), all other CFs represent the state
>> in rocksdb state backend, the name of a CF is the name of a
>> StateDescriptor.
>>
>> - deduplicate-state is a value state, you can find it in
>> DeduplicateFunctionBase.java and
>> MiniBatchDeduplicateFunctionBase.java, they are used for
>> deduplication.
>> - _timer_state/event_user-timers, _timer_state/event_timers ,
>> _timer_state/processing_timers and _timer_state/processing_user-timers
>>  are created by internal time service, which can be found in
>> InternalTimeServiceManagerImpl.java. Here is a blog post[1] on best
>> practices for using timers.
>> - timer, next-index, left and right can be found in
>> TemporalRowTimeJoinOperator.java, TemporalRowTimeJoinOperator
>> implements the logic of temporal join, this post[2] might be helpful
>> in understanding what happened to temporal join.
>>
>> [1]
>> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/datastream-timer
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins
>>
>> Giannis Polyzos <ip...@gmail.com> 于2023年4月26日周三 23:19写道:
>> >
>> > I have two input kafka topics - a compacted one (with upsert-kafka) and
>> a normal one.
>> > When I perform a temporal join I notice the following state being
>> created in rocksdb and was hoping someone could help me better understand
>> what everything means
>> >
>> >
>> > > deduplicate-state: does it refer to duplicate keys found by the
>> kafka-upsert-connector?
>> > > timers: what timer and _timer_state/event_timers refer to and whats
>> their difference? Is it to keep track on when the join results need to be
>> materialised or state to be expired?
>> > > next-index: what does it refer to?
>> > > left: also I'm curious why the left cf has 407 entries. Is it records
>> that are being buffered because there is no match on the right table?
>> >
>> > Thanks
>>
>>
>>
>> --
>> Best,
>> Yanfei
>>
>

Re: Flink SQL State

Posted by Giannis Polyzos <ip...@gmail.com>.
This is really helpful,

Thanks

On Thu, Apr 27, 2023 at 5:46 AM Yanfei Lei <fr...@gmail.com> wrote:

> Hi Giannis,
>
> Except “default” Colume Family(CF), all other CFs represent the state
> in rocksdb state backend, the name of a CF is the name of a
> StateDescriptor.
>
> - deduplicate-state is a value state, you can find it in
> DeduplicateFunctionBase.java and
> MiniBatchDeduplicateFunctionBase.java, they are used for
> deduplication.
> - _timer_state/event_user-timers, _timer_state/event_timers ,
> _timer_state/processing_timers and _timer_state/processing_user-timers
>  are created by internal time service, which can be found in
> InternalTimeServiceManagerImpl.java. Here is a blog post[1] on best
> practices for using timers.
> - timer, next-index, left and right can be found in
> TemporalRowTimeJoinOperator.java, TemporalRowTimeJoinOperator
> implements the logic of temporal join, this post[2] might be helpful
> in understanding what happened to temporal join.
>
> [1]
> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/datastream-timer
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins
>
> Giannis Polyzos <ip...@gmail.com> 于2023年4月26日周三 23:19写道:
> >
> > I have two input kafka topics - a compacted one (with upsert-kafka) and
> a normal one.
> > When I perform a temporal join I notice the following state being
> created in rocksdb and was hoping someone could help me better understand
> what everything means
> >
> >
> > > deduplicate-state: does it refer to duplicate keys found by the
> kafka-upsert-connector?
> > > timers: what timer and _timer_state/event_timers refer to and whats
> their difference? Is it to keep track on when the join results need to be
> materialised or state to be expired?
> > > next-index: what does it refer to?
> > > left: also I'm curious why the left cf has 407 entries. Is it records
> that are being buffered because there is no match on the right table?
> >
> > Thanks
>
>
>
> --
> Best,
> Yanfei
>

Re: Flink SQL State

Posted by Yanfei Lei <fr...@gmail.com>.
Hi Giannis,

Except “default” Colume Family(CF), all other CFs represent the state
in rocksdb state backend, the name of a CF is the name of a
StateDescriptor.

- deduplicate-state is a value state, you can find it in
DeduplicateFunctionBase.java and
MiniBatchDeduplicateFunctionBase.java, they are used for
deduplication.
- _timer_state/event_user-timers, _timer_state/event_timers ,
_timer_state/processing_timers and _timer_state/processing_user-timers
 are created by internal time service, which can be found in
InternalTimeServiceManagerImpl.java. Here is a blog post[1] on best
practices for using timers.
- timer, next-index, left and right can be found in
TemporalRowTimeJoinOperator.java, TemporalRowTimeJoinOperator
implements the logic of temporal join, this post[2] might be helpful
in understanding what happened to temporal join.

[1] https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/datastream-timer
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins

Giannis Polyzos <ip...@gmail.com> 于2023年4月26日周三 23:19写道:
>
> I have two input kafka topics - a compacted one (with upsert-kafka) and a normal one.
> When I perform a temporal join I notice the following state being created in rocksdb and was hoping someone could help me better understand what everything means
>
>
> > deduplicate-state: does it refer to duplicate keys found by the kafka-upsert-connector?
> > timers: what timer and _timer_state/event_timers refer to and whats their difference? Is it to keep track on when the join results need to be materialised or state to be expired?
> > next-index: what does it refer to?
> > left: also I'm curious why the left cf has 407 entries. Is it records that are being buffered because there is no match on the right table?
>
> Thanks



-- 
Best,
Yanfei