You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sriram Ganesh <sr...@gmail.com> on 2022/10/21 08:03:44 UTC

Modify savepoints in Flink

Hi All,

I am working on a scenario where I need to modify the existing savepoint
operator state. Ex: Wanted to remove some offset of the savepoint.

What is the better practice for these scenarios?. Could you please help me
with any example as such?

Thanks in advance.

-- 
*Sriram G*
*Tech*

Re: Modify savepoints in Flink

Posted by Sriram Ganesh <sr...@gmail.com>.
Thanks, I'll check it out.

On Fri, Oct 21, 2022, 18:20 Piotr Nowojski <pn...@apache.org> wrote:

> Hi,
>
> Yes and no. StateProcessor API can read any Flink state, but you have to
> describe the state you want it to access. Take a look at the example in the
> docs [1].
>
> First you have an example of a theoretical production function
> `StatefulFunctionWithTime`, which state you want to modify. Note the
> `ValueState` and `ListState` fields and their descriptors. That's the state
> of that particular function. Descriptors determine how the state is
> serialised. Usually they are pretty simple.
> Below is the `ReaderFunction`, that you want to use to access/modify the
> state via the StateProcessor API. To do so, you have to specify the state
> you want to access and effectively mimic/copy paste the state descriptors
> from the production code.
>
> If you want to modify the state of a source/sink function, you would have
> to first take a look into the source code of such a connector to know what
> to modify and copy its descriptors. Also note that for source/sink the
> state is most likely non-keyed.
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state
>
> pt., 21 paź 2022 o 14:37 Sriram Ganesh <sr...@gmail.com> napisał(a):
>
>> I have question on this. Different connector can have different
>> serialisation and de-serlisation technique right?. Wouldn't that impact?.
>> If I use StateProcessor API, would that be agnostic to all the sources and
>> sinks?.
>>
>> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski <pn...@apache.org> wrote:
>>
>>> ops
>>>
>>> > Alternatively, you can modify a code of your function/operator for
>>> which you want to modify the state. For example in the
>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>> method you could add some code that would do a migration of your old state
>>> to a new one.
>>> > And you can drop such code later, in the next savepoint.
>>>
>>> That was not entirely true. This would work for the non-keyed state. For
>>> the keyed state there is no easy alternative (you would have to iterate
>>> through all of the keys, which I think is not exposed via Public API) -
>>> best to use StateProcessor API.
>>>
>>> Best,
>>> Piotrek
>>>
>>> pt., 21 paź 2022 o 10:54 Sriram Ganesh <sr...@gmail.com> napisał(a):
>>>
>>>> Thanks !. Will try this.
>>>>
>>>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski <pn...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Sriram,
>>>>>
>>>>> You can read and modify savepoints using StateProcessor API [1].
>>>>>
>>>>> Alternatively, you can modify a code of your function/operator for
>>>>> which you want to modify the state. For example in the
>>>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>>>> method you could add some code that would do a migration of your old state
>>>>> to a new one.
>>>>>
>>>>> ```
>>>>> private transient ValueState<Foo> old;
>>>>> private transient ValueState<Foo> new;
>>>>> (...)
>>>>> initializeState(...) {
>>>>>   (...)
>>>>>   if (new.value() == null && old.value() != null) {
>>>>>     // code to migrate from old to new one
>>>>>     new.update(migrate(old.value());
>>>>>     old.update(null);
>>>>>   }
>>>>> }
>>>>> ```
>>>>>
>>>>> And you can drop such code later, in the next savepoint.
>>>>>
>>>>> Best,
>>>>> Piotrek
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>>>>
>>>>> pt., 21 paź 2022 o 10:05 Sriram Ganesh <sr...@gmail.com>
>>>>> napisał(a):
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am working on a scenario where I need to modify the existing
>>>>>> savepoint operator state. Ex: Wanted to remove some offset of the
>>>>>> savepoint.
>>>>>>
>>>>>> What is the better practice for these scenarios?. Could you please
>>>>>> help me with any example as such?
>>>>>>
>>>>>> Thanks in advance.
>>>>>>
>>>>>> --
>>>>>> *Sriram G*
>>>>>> *Tech*
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> *Sriram G*
>>>> *Tech*
>>>>
>>>>

Re: Modify savepoints in Flink

Posted by Piotr Nowojski <pn...@apache.org>.
Hi,

Yes and no. StateProcessor API can read any Flink state, but you have to
describe the state you want it to access. Take a look at the example in the
docs [1].

First you have an example of a theoretical production function
`StatefulFunctionWithTime`, which state you want to modify. Note the
`ValueState` and `ListState` fields and their descriptors. That's the state
of that particular function. Descriptors determine how the state is
serialised. Usually they are pretty simple.
Below is the `ReaderFunction`, that you want to use to access/modify the
state via the StateProcessor API. To do so, you have to specify the state
you want to access and effectively mimic/copy paste the state descriptors
from the production code.

If you want to modify the state of a source/sink function, you would have
to first take a look into the source code of such a connector to know what
to modify and copy its descriptors. Also note that for source/sink the
state is most likely non-keyed.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state

pt., 21 paź 2022 o 14:37 Sriram Ganesh <sr...@gmail.com> napisał(a):

> I have question on this. Different connector can have different
> serialisation and de-serlisation technique right?. Wouldn't that impact?.
> If I use StateProcessor API, would that be agnostic to all the sources and
> sinks?.
>
> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski <pn...@apache.org> wrote:
>
>> ops
>>
>> > Alternatively, you can modify a code of your function/operator for
>> which you want to modify the state. For example in the
>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>> method you could add some code that would do a migration of your old state
>> to a new one.
>> > And you can drop such code later, in the next savepoint.
>>
>> That was not entirely true. This would work for the non-keyed state. For
>> the keyed state there is no easy alternative (you would have to iterate
>> through all of the keys, which I think is not exposed via Public API) -
>> best to use StateProcessor API.
>>
>> Best,
>> Piotrek
>>
>> pt., 21 paź 2022 o 10:54 Sriram Ganesh <sr...@gmail.com> napisał(a):
>>
>>> Thanks !. Will try this.
>>>
>>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski <pn...@apache.org>
>>> wrote:
>>>
>>>> Hi Sriram,
>>>>
>>>> You can read and modify savepoints using StateProcessor API [1].
>>>>
>>>> Alternatively, you can modify a code of your function/operator for
>>>> which you want to modify the state. For example in the
>>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>>> method you could add some code that would do a migration of your old state
>>>> to a new one.
>>>>
>>>> ```
>>>> private transient ValueState<Foo> old;
>>>> private transient ValueState<Foo> new;
>>>> (...)
>>>> initializeState(...) {
>>>>   (...)
>>>>   if (new.value() == null && old.value() != null) {
>>>>     // code to migrate from old to new one
>>>>     new.update(migrate(old.value());
>>>>     old.update(null);
>>>>   }
>>>> }
>>>> ```
>>>>
>>>> And you can drop such code later, in the next savepoint.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>>>
>>>> pt., 21 paź 2022 o 10:05 Sriram Ganesh <sr...@gmail.com>
>>>> napisał(a):
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am working on a scenario where I need to modify the existing
>>>>> savepoint operator state. Ex: Wanted to remove some offset of the
>>>>> savepoint.
>>>>>
>>>>> What is the better practice for these scenarios?. Could you please
>>>>> help me with any example as such?
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> --
>>>>> *Sriram G*
>>>>> *Tech*
>>>>>
>>>>>
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>

Re: Modify savepoints in Flink

Posted by Sriram Ganesh <sr...@gmail.com>.
I have question on this. Different connector can have different
serialisation and de-serlisation technique right?. Wouldn't that impact?.
If I use StateProcessor API, would that be agnostic to all the sources and
sinks?.

On Fri, Oct 21, 2022, 18:00 Piotr Nowojski <pn...@apache.org> wrote:

> ops
>
> > Alternatively, you can modify a code of your function/operator for which
> you want to modify the state. For example in the
> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
> method you could add some code that would do a migration of your old state
> to a new one.
> > And you can drop such code later, in the next savepoint.
>
> That was not entirely true. This would work for the non-keyed state. For
> the keyed state there is no easy alternative (you would have to iterate
> through all of the keys, which I think is not exposed via Public API) -
> best to use StateProcessor API.
>
> Best,
> Piotrek
>
> pt., 21 paź 2022 o 10:54 Sriram Ganesh <sr...@gmail.com> napisał(a):
>
>> Thanks !. Will try this.
>>
>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski <pn...@apache.org>
>> wrote:
>>
>>> Hi Sriram,
>>>
>>> You can read and modify savepoints using StateProcessor API [1].
>>>
>>> Alternatively, you can modify a code of your function/operator for which
>>> you want to modify the state. For example in the
>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>> method you could add some code that would do a migration of your old state
>>> to a new one.
>>>
>>> ```
>>> private transient ValueState<Foo> old;
>>> private transient ValueState<Foo> new;
>>> (...)
>>> initializeState(...) {
>>>   (...)
>>>   if (new.value() == null && old.value() != null) {
>>>     // code to migrate from old to new one
>>>     new.update(migrate(old.value());
>>>     old.update(null);
>>>   }
>>> }
>>> ```
>>>
>>> And you can drop such code later, in the next savepoint.
>>>
>>> Best,
>>> Piotrek
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>>
>>> pt., 21 paź 2022 o 10:05 Sriram Ganesh <sr...@gmail.com> napisał(a):
>>>
>>>> Hi All,
>>>>
>>>> I am working on a scenario where I need to modify the existing
>>>> savepoint operator state. Ex: Wanted to remove some offset of the
>>>> savepoint.
>>>>
>>>> What is the better practice for these scenarios?. Could you please help
>>>> me with any example as such?
>>>>
>>>> Thanks in advance.
>>>>
>>>> --
>>>> *Sriram G*
>>>> *Tech*
>>>>
>>>>
>>
>> --
>> *Sriram G*
>> *Tech*
>>
>>

Re: Modify savepoints in Flink

Posted by Piotr Nowojski <pn...@apache.org>.
ops

> Alternatively, you can modify a code of your function/operator for which
you want to modify the state. For example in the
`org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
method you could add some code that would do a migration of your old state
to a new one.
> And you can drop such code later, in the next savepoint.

That was not entirely true. This would work for the non-keyed state. For
the keyed state there is no easy alternative (you would have to iterate
through all of the keys, which I think is not exposed via Public API) -
best to use StateProcessor API.

Best,
Piotrek

pt., 21 paź 2022 o 10:54 Sriram Ganesh <sr...@gmail.com> napisał(a):

> Thanks !. Will try this.
>
> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski <pn...@apache.org>
> wrote:
>
>> Hi Sriram,
>>
>> You can read and modify savepoints using StateProcessor API [1].
>>
>> Alternatively, you can modify a code of your function/operator for which
>> you want to modify the state. For example in the
>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>> method you could add some code that would do a migration of your old state
>> to a new one.
>>
>> ```
>> private transient ValueState<Foo> old;
>> private transient ValueState<Foo> new;
>> (...)
>> initializeState(...) {
>>   (...)
>>   if (new.value() == null && old.value() != null) {
>>     // code to migrate from old to new one
>>     new.update(migrate(old.value());
>>     old.update(null);
>>   }
>> }
>> ```
>>
>> And you can drop such code later, in the next savepoint.
>>
>> Best,
>> Piotrek
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>
>> pt., 21 paź 2022 o 10:05 Sriram Ganesh <sr...@gmail.com> napisał(a):
>>
>>> Hi All,
>>>
>>> I am working on a scenario where I need to modify the existing savepoint
>>> operator state. Ex: Wanted to remove some offset of the savepoint.
>>>
>>> What is the better practice for these scenarios?. Could you please help
>>> me with any example as such?
>>>
>>> Thanks in advance.
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>
>
> --
> *Sriram G*
> *Tech*
>
>

Re: Modify savepoints in Flink

Posted by Sriram Ganesh <sr...@gmail.com>.
Thanks !. Will try this.

On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski <pn...@apache.org> wrote:

> Hi Sriram,
>
> You can read and modify savepoints using StateProcessor API [1].
>
> Alternatively, you can modify a code of your function/operator for which
> you want to modify the state. For example in the
> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
> method you could add some code that would do a migration of your old state
> to a new one.
>
> ```
> private transient ValueState<Foo> old;
> private transient ValueState<Foo> new;
> (...)
> initializeState(...) {
>   (...)
>   if (new.value() == null && old.value() != null) {
>     // code to migrate from old to new one
>     new.update(migrate(old.value());
>     old.update(null);
>   }
> }
> ```
>
> And you can drop such code later, in the next savepoint.
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>
> pt., 21 paź 2022 o 10:05 Sriram Ganesh <sr...@gmail.com> napisał(a):
>
>> Hi All,
>>
>> I am working on a scenario where I need to modify the existing savepoint
>> operator state. Ex: Wanted to remove some offset of the savepoint.
>>
>> What is the better practice for these scenarios?. Could you please help
>> me with any example as such?
>>
>> Thanks in advance.
>>
>> --
>> *Sriram G*
>> *Tech*
>>
>>

-- 
*Sriram G*
*Tech*

Re: Modify savepoints in Flink

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Sriram,

You can read and modify savepoints using StateProcessor API [1].

Alternatively, you can modify a code of your function/operator for which
you want to modify the state. For example in the
`org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
method you could add some code that would do a migration of your old state
to a new one.

```
private transient ValueState<Foo> old;
private transient ValueState<Foo> new;
(...)
initializeState(...) {
  (...)
  if (new.value() == null && old.value() != null) {
    // code to migrate from old to new one
    new.update(migrate(old.value());
    old.update(null);
  }
}
```

And you can drop such code later, in the next savepoint.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/

pt., 21 paź 2022 o 10:05 Sriram Ganesh <sr...@gmail.com> napisał(a):

> Hi All,
>
> I am working on a scenario where I need to modify the existing savepoint
> operator state. Ex: Wanted to remove some offset of the savepoint.
>
> What is the better practice for these scenarios?. Could you please help me
> with any example as such?
>
> Thanks in advance.
>
> --
> *Sriram G*
> *Tech*
>
>