You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shengjk1 <js...@163.com> on 2019/09/19 10:59:26 UTC
MapState not support transfer value
Hi,all
This is my code
.process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
...
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
"kuduError",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<Object>>() {
})
);
...
}
...
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
...
List<Object> objectList = new ArrayList<>();
if (mapState.contains(key)) {
objectList = mapState.get(key);
}else{
mapState.put(key,objectList);
}
logger.info("key{} add before objectList.size{}",key,objectList.size());
objectList.add(stringObjectTuple2.f1);
logger.info("key{} add after objectList.size{}",key,objectList.size());
objectList = mapState.get(key);
logger.info("key{} mapState objectList.size{}",key,objectList.size());
...
}
If I run this in idea the log detail:
Best,
Shengjk1
Re: MapState not support transfer value
Posted by Dian Fu <di...@gmail.com>.
Hi Shengjk1,
You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated.
Regards,
Dian
> 在 2019年9月19日,下午6:59,shengjk1 <js...@163.com> 写道:
>
> Hi,all
>
> This is my code
>
> .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
> ...
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
>
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.days(7))
> .cleanupInBackground()
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
>
> MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
> "kuduError",
> BasicTypeInfo.STRING_TYPE_INFO,
> TypeInformation.of(new TypeHint<List<Object>>() {
> })
> );
> ...
> }
> ...
>
> @Override
> public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
> ...
> List<Object> objectList = new ArrayList<>();
> if (mapState.contains(key)) {
> objectList = mapState.get(key);
> }else{
> mapState.put(key,objectList);
> }
> logger.info <http://logger.info/>("key{} add before objectList.size{}",key,objectList.size());
> objectList.add(stringObjectTuple2.f1);
> logger.info <http://logger.info/>("key{} add after objectList.size{}",key,objectList.size());
> objectList = mapState.get(key);
> logger.info <http://logger.info/>("key{} mapState objectList.size{}",key,objectList.size());
> ...
> }
>
>
> If I run this in idea the log detail:
>
>
> Best,
> Shengjk1
>
>
>