You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 阿华田 <a1...@163.com> on 2021/06/01 09:36:37 UTC

自定义带有状态的udf

自定义UDF 实现CheckpointedFunction 
伪代码如下 发现并没有执行initializeState






public class ClusterInfoCollectUdf   extends ScalarFunction implements CheckpointedFunction {
private static final Logger                            LOGGER = LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient     MapState<String,Integer >          mapState;
private               MapStateDescriptor<String,Integer> mapStateDescriptor;
   。。。。。



@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {


LOGGER.info("the snapshotState    is  started ");


}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);

mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeState    is  started ");




}





| |
阿华田
|
|
a15733178518@163.com
|
签名由网易邮箱大师定制