You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jin Yi <el...@gmail.com> on 2020/01/21 00:07:28 UTC

Question regarding checkpoint/savepoint and State Processor API

Hi there,

1. in my job, I have a broadcast stream, initially there is no savepoint
can be used as bootstrap values for the broadcast stream states.
  BootstrapTransformation transform =
OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);

Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
    .withOperator(OPERATOR_UID, transform)
    .write("file:///tmp/new_savepoints");*/

Question: bootstrapWith(dataSet) is required, normally, the dataSet comes
from the old savepoint, in this case, I dont have one, how should I deal
with it? Or it is must required?

2. As messages coming through broadcast stream, the state gets updated

3. I would like to periodically save the broadcast state to a file via
savepoints
Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
.withOperator(OPERATOR_UID, transform)
.write("file:///tmp/new_savepoints");

4. when the job gets cancelled, and next time when re-start the job, the
broadcast initial state can be loaded from the previous savepoint.

ExistingSavepoint existingSavepoint = Savepoint.load(environment,
"file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5",
new MemoryStateBackend());

dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID,
OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);

Question: now assume I got the old state as dataSet, how can I use it
in the BroadcastProcessFunction as the initial state of the broadcast
state?

Thanks a lot for the help!

Eleanore

Re: Question regarding checkpoint/savepoint and State Processor API

Posted by Jin Yi <el...@gmail.com>.
Hi Seth,

Thanks for the prompt response! Regarding my second question, once I have
converted the existing savepoint to dataset, how can I convert the dataset
into BroadcastState?

For example, in my BroadcastProcessFunction:

@Override
public void processBroadcastElement(String key, Context context,
Collector<JsonNode> collector) throws Exception {

// Todo how to add existing BroadcastState from savepoint beforehand?
  BroadcastState<String, String> broadcastState =
context.getBroadcastState(keySetStateDescriptor);
  broadcastState.put(key, key);
}


Thanks a lot!
Eleanore

On Tue, Jan 21, 2020 at 7:12 AM Seth Wiesman <se...@ververica.com> wrote:

> Hi Eleanore,
>
> Bootstrap data is not required to come from an existing savepoint. It can
> come from any DataSet which could be backed by a file, database, or any
> other system. The state processor api is also not a tool you are going to
> use between every start and stop of your job. It is just to bootstrap the
> initial state of your application. After that, you will use savepoints to
> carry over the current state of your applications between runs.
>
>
>
> On Mon, Jan 20, 2020 at 6:07 PM Jin Yi <el...@gmail.com> wrote:
>
>> Hi there,
>>
>> 1. in my job, I have a broadcast stream, initially there is no savepoint
>> can be used as bootstrap values for the broadcast stream states.
>>   BootstrapTransformation transform =
>> OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);
>>
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>>     .withOperator(OPERATOR_UID, transform)
>>     .write("file:///tmp/new_savepoints");*/
>>
>> Question: bootstrapWith(dataSet) is required, normally, the dataSet comes
>> from the old savepoint, in this case, I dont have one, how should I deal
>> with it? Or it is must required?
>>
>> 2. As messages coming through broadcast stream, the state gets updated
>>
>> 3. I would like to periodically save the broadcast state to a file via
>> savepoints
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>> .withOperator(OPERATOR_UID, transform)
>> .write("file:///tmp/new_savepoints");
>>
>> 4. when the job gets cancelled, and next time when re-start the job, the
>> broadcast initial state can be loaded from the previous savepoint.
>>
>> ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend());
>>
>> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
>>
>> Question: now assume I got the old state as dataSet, how can I use it in the BroadcastProcessFunction as the initial state of the broadcast state?
>>
>> Thanks a lot for the help!
>>
>> Eleanore
>>
>
>
> --
>
> Seth Wiesman | Solutions Architect
>
> +1 314 387 1463
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Re: Question regarding checkpoint/savepoint and State Processor API

Posted by Jin Yi <el...@gmail.com>.
Hi Seth,

Thanks for the prompt response! Regarding my second question, once I have
converted the existing savepoint to dataset, how can I convert the dataset
into BroadcastState?

For example, in my BroadcastProcessFunction:

@Override
public void processBroadcastElement(String key, Context context,
Collector<JsonNode> collector) throws Exception {

// Todo how to add existing BroadcastState from savepoint beforehand?
  BroadcastState<String, String> broadcastState =
context.getBroadcastState(keySetStateDescriptor);
  broadcastState.put(key, key);
}


Thanks a lot!
Eleanore

On Tue, Jan 21, 2020 at 7:12 AM Seth Wiesman <se...@ververica.com> wrote:

> Hi Eleanore,
>
> Bootstrap data is not required to come from an existing savepoint. It can
> come from any DataSet which could be backed by a file, database, or any
> other system. The state processor api is also not a tool you are going to
> use between every start and stop of your job. It is just to bootstrap the
> initial state of your application. After that, you will use savepoints to
> carry over the current state of your applications between runs.
>
>
>
> On Mon, Jan 20, 2020 at 6:07 PM Jin Yi <el...@gmail.com> wrote:
>
>> Hi there,
>>
>> 1. in my job, I have a broadcast stream, initially there is no savepoint
>> can be used as bootstrap values for the broadcast stream states.
>>   BootstrapTransformation transform =
>> OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);
>>
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>>     .withOperator(OPERATOR_UID, transform)
>>     .write("file:///tmp/new_savepoints");*/
>>
>> Question: bootstrapWith(dataSet) is required, normally, the dataSet comes
>> from the old savepoint, in this case, I dont have one, how should I deal
>> with it? Or it is must required?
>>
>> 2. As messages coming through broadcast stream, the state gets updated
>>
>> 3. I would like to periodically save the broadcast state to a file via
>> savepoints
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>> .withOperator(OPERATOR_UID, transform)
>> .write("file:///tmp/new_savepoints");
>>
>> 4. when the job gets cancelled, and next time when re-start the job, the
>> broadcast initial state can be loaded from the previous savepoint.
>>
>> ExistingSavepoint existingSavepoint = Savepoint.load(environment, "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new MemoryStateBackend());
>>
>> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
>>
>> Question: now assume I got the old state as dataSet, how can I use it in the BroadcastProcessFunction as the initial state of the broadcast state?
>>
>> Thanks a lot for the help!
>>
>> Eleanore
>>
>
>
> --
>
> Seth Wiesman | Solutions Architect
>
> +1 314 387 1463
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>