You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dmitry Minaev <mi...@gmail.com> on 2020/03/19 01:54:12 UTC
Can't create a savepoint with State Processor API
Hi everyone,
I'm looking for a way to modify state inside an operator in Flink. I found
State Processor API
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html#modifying-savepoints>
that allows to modify savepoints, which looks great. But I can't make it
work.
I can read an existing state from savepoint but if I try to create (or
modify) a savepoint it doesn't write it by some reason.
Questions:
1. Is State Processor API the right way to achieve what I'm looking for? Are
there any other approaches?
2. can I ran this as a standalone java program or it has to be a part of a
Flink job?
3. I expect to have a new savepoint in the provided location after running
the code below, is that the right expectation?
```
public static void main( String[] args ) throws Exception
{
ExecutionEnvironment bEnv =
ExecutionEnvironment.getExecutionEnvironment();
BootstrapTransformation<Integer> transform =
OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
.keyBy(String::valueOf)
.transform(new SimplestTransform());
Savepoint.create(new MemoryStateBackend(),
16).withOperator("my-operator-uid",
transform).write("file:///tmp/savepoints/");
}
public class SimplestTransform extends KeyedStateBootstrapFunction<String,
Integer>
{
ValueState<Integer> state;
@Override
public void open( Configuration parameters) {
ValueStateDescriptor<Integer> descriptor = new
ValueStateDescriptor<>("total", Types.INT);
state = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Integer value, Context ctx) throws Exception
{
state.update(value);
}
}
```
It finishes successfully but it doesn't write anything to the specified
folder. I tried folder format with "file://" prefix and without it.
I feel I'm missing something.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Can't create a savepoint with State Processor API
Posted by Dmitry Minaev <mi...@gmail.com>.
Yep, that works! Many thanks David, really appreciate it!
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Can't create a savepoint with State Processor API
Posted by David Anderson <da...@ververica.com>.
You are very close. I got your example to work by switching from the
MemoryStateBackend to the FsStateBackend, and adding
bEnv.execute();
at the end of main().
I'm not sure why either of those might be necessary, but it doesn't seem to
work without both changes.
See https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf for
my version.
*David Anderson* | Training Coordinator
Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
On Thu, Mar 19, 2020 at 2:54 AM Dmitry Minaev <mi...@gmail.com> wrote:
> Hi everyone,
>
> I'm looking for a way to modify state inside an operator in Flink. I found
> State Processor API
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html#modifying-savepoints>
>
> that allows to modify savepoints, which looks great. But I can't make it
> work.
>
> I can read an existing state from savepoint but if I try to create (or
> modify) a savepoint it doesn't write it by some reason.
>
> Questions:
> 1. Is State Processor API the right way to achieve what I'm looking for?
> Are
> there any other approaches?
> 2. can I ran this as a standalone java program or it has to be a part of a
> Flink job?
> 3. I expect to have a new savepoint in the provided location after running
> the code below, is that the right expectation?
> ```
> public static void main( String[] args ) throws Exception
> {
> ExecutionEnvironment bEnv =
> ExecutionEnvironment.getExecutionEnvironment();
>
> BootstrapTransformation<Integer> transform =
> OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
> .keyBy(String::valueOf)
> .transform(new SimplestTransform());
>
> Savepoint.create(new MemoryStateBackend(),
> 16).withOperator("my-operator-uid",
> transform).write("file:///tmp/savepoints/");
> }
>
> public class SimplestTransform extends KeyedStateBootstrapFunction<String,
> Integer>
> {
> ValueState<Integer> state;
>
> @Override
> public void open( Configuration parameters) {
> ValueStateDescriptor<Integer> descriptor = new
> ValueStateDescriptor<>("total", Types.INT);
> state = getRuntimeContext().getState(descriptor);
> }
>
> @Override
> public void processElement(Integer value, Context ctx) throws Exception
> {
> state.update(value);
> }
> }
> ```
>
> It finishes successfully but it doesn't write anything to the specified
> folder. I tried folder format with "file://" prefix and without it.
>
> I feel I'm missing something.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>