You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 阿华田 <a1...@163.com> on 2021/06/01 09:36:37 UTC
自定义带有状态的udf
自定义UDF 实现CheckpointedFunction
伪代码如下 发现并没有执行initializeState
public class ClusterInfoCollectUdf extends ScalarFunction implements CheckpointedFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient MapState<String,Integer > mapState;
private MapStateDescriptor<String,Integer> mapStateDescriptor;
。。。。。
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
LOGGER.info("the snapshotState is started ");
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);
mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeState is started ");
}
| |
阿华田
|
|
a15733178518@163.com
|
签名由网易邮箱大师定制