You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Krzysztof Chmielewski <kr...@gmail.com> on 2021/12/22 22:32:22 UTC

Operator state in New Source API

Hi,
Is it possible to use managed operator state like MapState in an
implementation of new unified source interface [1]. I'm especially
interested with using Managed State in SplitEnumerator implementation.

I have a use case that is a variation of File Source where I will have a
great number of files that I need to process, for example a million. I know
that FileSource maintains a collection of already processed paths
in ContinuousFileSplitEnumerator object.

In my case I cannot afford to have all million Strings sitting on my heap.
I'm hoping to use an operator state for this and build splits in batches,
periodically adding new files to the alreadyProcessedPaths collection.

Regards,
Krzysztof Chmielewski


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

Re: Operator state in New Source API

Posted by Yun Tang <my...@live.com>.
Hi Krzysztof,

Non-keyed operator state only supports list-like state [1] as there exist no primary key in operator state. That is to say you cannot use map state in source operator.


[1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-operator-state


Best,
Yun Tang
________________________________
From: Krzysztof Chmielewski <kr...@gmail.com>
Sent: Thursday, December 23, 2021 6:32
To: user <us...@flink.apache.org>
Subject: Operator state in New Source API

Hi,
Is it possible to use managed operator state like MapState in an implementation of new unified source interface [1]. I'm especially interested with using Managed State in SplitEnumerator implementation.

I have a use case that is a variation of File Source where I will have a great number of files that I need to process, for example a million. I know that FileSource maintains a collection of already processed paths in ContinuousFileSplitEnumerator object.

In my case I cannot afford to have all million Strings sitting on my heap. I'm hoping to use an operator state for this and build splits in batches, periodically adding new files to the alreadyProcessedPaths collection.

Regards,
Krzysztof Chmielewski


[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

Re: Re: Operator state in New Source API

Posted by Yun Gao <yu...@aliyun.com>.
Hi Krzysztof,

Sorry there are indeed no document said that the operator state is only kept in memory, but based on the
current implementation it is indeed the case.

And I might also need to fix one point: the Split Enumerate should be executed in the JM side inside the OperatorCoordinator, 
and its functionality is to coordinate the source readers running in the TM side, thus there are even no operator state in 
SplitEnumerator. It only support stores the states by implement the snapshotState method to generate the state object to store
in a special operator coordinator state. Thus it should be not possible to use operator state in the SplitEnumerator. 

Best,
Yun



 ------------------Original Mail ------------------
Sender:Krzysztof Chmielewski <kr...@gmail.com>
Send Date:Thu Dec 23 19:11:55 2021
Recipients:user <us...@flink.apache.org>
Subject:Re: Operator state in New Source API

Thank you both,
yes seems that the only option on a non keyed operate would be List State, my bad.

Yun Gao,
I'm wondering from where you get the information that " Flink only support in-memory operator state", can you point me to the documentation that says that?
I cannot find any mention in the documentation about it regarding regular operator state. 
I know that Broadcast State which is special type of an Operator State is kept in-memory [1].

What I was hoping to do is something similar to what is described here [2] - Statefulf Source Functions. The List State in that example is really always kept in memory?

Additionally I'm wondering Is it even possible to do something like [2] in source that is implementing the new Source API [3]? Especially in Source Enumerator implementation. 

[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations
[2] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#stateful-source-functions
[3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

Thanks,
Krzysztof Chmielewski




czw., 23 gru 2021 o 07:58 Yun Gao <yu...@aliyun.com> napisał(a):

Hi Krzysztof,

If I understand right, I think managed operator state might not help here since currently Flink
only support in-memory operator state.

Is it possible currently we first have a customized SplitEnumerator to skip the processed files
in some other way? For example, if these files have different created time, we may process them
in time order, and only maintains the latest file created time and the list of processed files with the
same time. 

Best,
Yun


 ------------------Original Mail ------------------
Sender:Krzysztof Chmielewski <kr...@gmail.com>
Send Date:Thu Dec 23 06:33:07 2021
Recipients:user <us...@flink.apache.org>
Subject:Operator state in New Source API

Hi,
Is it possible to use managed operator state like MapState in an implementation of new unified source interface [1]. I'm especially interested with using Managed State in SplitEnumerator implementation. 

I have a use case that is a variation of File Source where I will have a great number of files that I need to process, for example a million. I know that FileSource maintains a collection of already processed paths in ContinuousFileSplitEnumerator object.

In my case I cannot afford to have all million Strings sitting on my heap. I'm hoping to use an operator state for this and build splits in batches, periodically adding new files to the alreadyProcessedPaths collection.

Regards,
Krzysztof Chmielewski


[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

Re: Operator state in New Source API

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Thank you both,
yes seems that the only option on a non keyed operate would be List State,
my bad.

Yun Gao,
I'm wondering from where you get the information that " Flink only support
in-memory operator state", can you point me to the documentation that says
that?
I cannot find any mention in the documentation about it regarding regular
operator state.
I know that Broadcast State which is special type of an Operator State is
kept in-memory [1].

What I was hoping to do is something similar to what is described here [2]
- Statefulf Source Functions. The List State in that example is really
always kept in memory?

Additionally I'm wondering Is it even possible to do something like [2] in
source that is implementing the new Source API [3]? Especially in Source
Enumerator implementation.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#stateful-source-functions
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

Thanks,
Krzysztof Chmielewski




czw., 23 gru 2021 o 07:58 Yun Gao <yu...@aliyun.com> napisał(a):

> Hi Krzysztof,
>
> If I understand right, I think managed operator state might not help here
> since currently Flink
> only support in-memory operator state.
>
> Is it possible currently we first have a customized SplitEnumerator to
> skip the processed files
> in some other way? For example, if these files have different created
> time, we may process them
> in time order, and only maintains the latest file created time and the
> list of processed files with the
> same time.
>
> Best,
> Yun
>
> ------------------Original Mail ------------------
> *Sender:*Krzysztof Chmielewski <kr...@gmail.com>
> *Send Date:*Thu Dec 23 06:33:07 2021
> *Recipients:*user <us...@flink.apache.org>
> *Subject:*Operator state in New Source API
>
>> Hi,
>> Is it possible to use managed operator state like MapState in an
>> implementation of new unified source interface [1]. I'm especially
>> interested with using Managed State in SplitEnumerator implementation.
>>
>> I have a use case that is a variation of File Source where I will have a
>> great number of files that I need to process, for example a million. I know
>> that FileSource maintains a collection of already processed paths
>> in ContinuousFileSplitEnumerator object.
>>
>> In my case I cannot afford to have all million Strings sitting on my
>> heap. I'm hoping to use an operator state for this and build splits in
>> batches, periodically adding new files to the alreadyProcessedPaths
>> collection.
>>
>> Regards,
>> Krzysztof Chmielewski
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
>>
>

Re: Operator state in New Source API

Posted by Yun Gao <yu...@aliyun.com>.
Hi Krzysztof,

If I understand right, I think managed operator state might not help here since currently Flink
only support in-memory operator state.

Is it possible currently we first have a customized SplitEnumerator to skip the processed files
in some other way? For example, if these files have different created time, we may process them
in time order, and only maintains the latest file created time and the list of processed files with the
same time. 

Best,
Yun


 ------------------Original Mail ------------------
Sender:Krzysztof Chmielewski <kr...@gmail.com>
Send Date:Thu Dec 23 06:33:07 2021
Recipients:user <us...@flink.apache.org>
Subject:Operator state in New Source API

Hi,
Is it possible to use managed operator state like MapState in an implementation of new unified source interface [1]. I'm especially interested with using Managed State in SplitEnumerator implementation. 

I have a use case that is a variation of File Source where I will have a great number of files that I need to process, for example a million. I know that FileSource maintains a collection of already processed paths in ContinuousFileSplitEnumerator object.

In my case I cannot afford to have all million Strings sitting on my heap. I'm hoping to use an operator state for this and build splits in batches, periodically adding new files to the alreadyProcessedPaths collection.

Regards,
Krzysztof Chmielewski


[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/