You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by sjf0115 <si...@163.com> on 2023/05/05 14:56:59 UTC
CheckpointedFunction 与 KeyedState
CheckpointedFunction 接口的 initializeState 方法提供了访问 FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问 OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过 CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState:
```java
context.getKeyedStateStore().getState(stateDescriptor);
```
想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗?
```java
public static class TemperatureAlertFlatMapFunction extends RichFlatMapFunction<Tuple2<String, Double>, Tuple3<String, Double, Double>> implements CheckpointedFunction {
// 温度差报警阈值
private double threshold;
// 上一次温度
private ValueState<Double> lastTemperatureState;
private Double lastTemperature;
public TemperatureAlertFlatMapFunction(double threshold) {
this.threshold = threshold;
}
@Override
public void flatMap(Tuple2<String, Double> sensor, Collector<Tuple3<String, Double, Double>> out) throws Exception {
String sensorId = sensor.f0;
// 当前温度
double temperature = sensor.f1;
// 保存当前温度
lastTemperature = temperature;
// 是否是第一次上报的温度
if (Objects.equals(lastTemperature, null)) {
return;
}
double diff = Math.abs(temperature - lastTemperature);
if (diff > threshold) {
// 温度变化超过阈值则输出
out.collect(Tuple3.of(sensorId, temperature, diff));
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 获取最新的温度之后更新保存上一次温度的状态
if (!Objects.equals(lastTemperature, null)) {
lastTemperatureState.update(lastTemperature);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ValueStateDescriptor<Double> stateDescriptor = new ValueStateDescriptor<>("lastTemperature", Double.class);
lastTemperatureState = context.getKeyedStateStore().getState(stateDescriptor);
if (context.isRestored()) {
lastTemperature = lastTemperatureState.value();
}
}
}
```
Re:Re: CheckpointedFunction 与 KeyedState
Posted by sjf0115 <si...@163.com>.
谢了
在 2023-05-06 10:36:02,"Hangxiang Yu" <ma...@gmail.com> 写道:
>Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和
>snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里;
>原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set
>Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的;
>而 initializeState 和 snapshotState 里是没有框架隐性 set 的,相当于会对某一个不确定的 key 去update
>value了,如果一定要在这里做就需要拿到 KeyContext 自己去 set,不过不建议这么做;
>
>On Fri, May 5, 2023 at 10:58 PM sjf0115 <si...@163.com> wrote:
>
>> CheckpointedFunction 接口的 initializeState 方法提供了访问
>> FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问
>> OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过
>> CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState:
>> ```java
>> context.getKeyedStateStore().getState(stateDescriptor);
>> ```
>> 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗?
>> ```java
>> public static class TemperatureAlertFlatMapFunction extends
>> RichFlatMapFunction<Tuple2<String, Double>, Tuple3<String, Double, Double>>
>> implements CheckpointedFunction {
>> // 温度差报警阈值
>> private double threshold;
>> // 上一次温度
>> private ValueState<Double> lastTemperatureState;
>> private Double lastTemperature;
>> public TemperatureAlertFlatMapFunction(double threshold) {
>> this.threshold = threshold;
>> }
>>
>>
>> @Override
>> public void flatMap(Tuple2<String, Double> sensor,
>> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>> String sensorId = sensor.f0;
>> // 当前温度
>> double temperature = sensor.f1;
>> // 保存当前温度
>> lastTemperature = temperature;
>> // 是否是第一次上报的温度
>> if (Objects.equals(lastTemperature, null)) {
>> return;
>> }
>> double diff = Math.abs(temperature - lastTemperature);
>> if (diff > threshold) {
>> // 温度变化超过阈值则输出
>> out.collect(Tuple3.of(sensorId, temperature, diff));
>> }
>> }
>>
>>
>> @Override
>> public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {
>> // 获取最新的温度之后更新保存上一次温度的状态
>> if (!Objects.equals(lastTemperature, null)) {
>> lastTemperatureState.update(lastTemperature);
>> }
>> }
>>
>>
>> @Override
>> public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>> ValueStateDescriptor<Double> stateDescriptor = new
>> ValueStateDescriptor<>("lastTemperature", Double.class);
>> lastTemperatureState =
>> context.getKeyedStateStore().getState(stateDescriptor);
>> if (context.isRestored()) {
>> lastTemperature = lastTemperatureState.value();
>> }
>> }
>> }
>> ```
>>
>>
>
>--
>Best,
>Hangxiang.
Re: CheckpointedFunction 与 KeyedState
Posted by Hangxiang Yu <ma...@gmail.com>.
Hi, 在 initializeState 里初始化 State 是 OK 的,但是尽量不要在 initializeState 和
snapshotState 里访问 KeyedState,最好是在实际的 Function 比如这里的 FlatMap 里;
原因是 KeyedState 的访问是和 Current Key 绑定的,实际的 Function 在 Process 之前会隐式的 set
Current key ,因此是会保证每次 KeyedState 的操作是对确定的 KV 进行的;
而 initializeState 和 snapshotState 里是没有框架隐性 set 的,相当于会对某一个不确定的 key 去update
value了,如果一定要在这里做就需要拿到 KeyContext 自己去 set,不过不建议这么做;
On Fri, May 5, 2023 at 10:58 PM sjf0115 <si...@163.com> wrote:
> CheckpointedFunction 接口的 initializeState 方法提供了访问
> FunctionInitializationContext 的能力,而 FunctionInitializationContext 不仅提供了访问
> OperatorStateStore 的能力,也提供了 KeyedStateStore 的能力。一般常见的是通过
> CheckpointedFunction 来实现操作 OperatorState,但是也可以通过如下代码来获取 KeyedState:
> ```java
> context.getKeyedStateStore().getState(stateDescriptor);
> ```
> 想问一下通过 CheckpointedFunction 来实现操作 KeyedState,如下代码所示。建议这样实现吗?会有什么问题吗?
> ```java
> public static class TemperatureAlertFlatMapFunction extends
> RichFlatMapFunction<Tuple2<String, Double>, Tuple3<String, Double, Double>>
> implements CheckpointedFunction {
> // 温度差报警阈值
> private double threshold;
> // 上一次温度
> private ValueState<Double> lastTemperatureState;
> private Double lastTemperature;
> public TemperatureAlertFlatMapFunction(double threshold) {
> this.threshold = threshold;
> }
>
>
> @Override
> public void flatMap(Tuple2<String, Double> sensor,
> Collector<Tuple3<String, Double, Double>> out) throws Exception {
> String sensorId = sensor.f0;
> // 当前温度
> double temperature = sensor.f1;
> // 保存当前温度
> lastTemperature = temperature;
> // 是否是第一次上报的温度
> if (Objects.equals(lastTemperature, null)) {
> return;
> }
> double diff = Math.abs(temperature - lastTemperature);
> if (diff > threshold) {
> // 温度变化超过阈值则输出
> out.collect(Tuple3.of(sensorId, temperature, diff));
> }
> }
>
>
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
> // 获取最新的温度之后更新保存上一次温度的状态
> if (!Objects.equals(lastTemperature, null)) {
> lastTemperatureState.update(lastTemperature);
> }
> }
>
>
> @Override
> public void initializeState(FunctionInitializationContext context)
> throws Exception {
> ValueStateDescriptor<Double> stateDescriptor = new
> ValueStateDescriptor<>("lastTemperature", Double.class);
> lastTemperatureState =
> context.getKeyedStateStore().getState(stateDescriptor);
> if (context.isRestored()) {
> lastTemperature = lastTemperatureState.value();
> }
> }
> }
> ```
>
>
--
Best,
Hangxiang.