You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shengjk1 <js...@163.com> on 2019/09/19 10:59:26 UTC

MapState not support transfer value

Hi,all


This is my code 


.process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
        ...
      @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
"kuduError",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<Object>>() {
})
);
...
}
...
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
...
List<Object> objectList = new ArrayList<>();
if (mapState.contains(key)) {
objectList = mapState.get(key);
}else{
mapState.put(key,objectList);
}
logger.info("key{} add before objectList.size{}",key,objectList.size());
objectList.add(stringObjectTuple2.f1);
logger.info("key{} add after objectList.size{}",key,objectList.size());
objectList = mapState.get(key);
logger.info("key{} mapState objectList.size{}",key,objectList.size());
                                ...
}




If I run this in idea  the  log detail:




Best,
Shengjk1




Re: MapState not support transfer value

Posted by Dian Fu <di...@gmail.com>.
Hi Shengjk1,

You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated.

Regards,
Dian

> 在 2019年9月19日,下午6:59,shengjk1 <js...@163.com> 写道:
> 
> Hi,all
> 
> This is my code 
> 
> .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
>         ...
>       	@Override
> public void open(Configuration parameters) throws Exception {
> 				super.open(parameters);
> 				
> 				StateTtlConfig ttlConfig = StateTtlConfig
> 						.newBuilder(Time.days(7))
> 						.cleanupInBackground()
> 						.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
> 						.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> 						.build();
> 				
> 				MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
> 						"kuduError",
> 						BasicTypeInfo.STRING_TYPE_INFO,
> 						TypeInformation.of(new TypeHint<List<Object>>() {
> 						})
> 				);
> 				...
> 			}
> 			...
> 			
> 			@Override
> 	public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
> 				...
> 				List<Object> objectList = new ArrayList<>();
> 				if (mapState.contains(key)) {
> 					objectList = mapState.get(key);
> 				}else{
> 					mapState.put(key,objectList);
> 				}
> 				logger.info <http://logger.info/>("key{} add before objectList.size{}",key,objectList.size());
> 				objectList.add(stringObjectTuple2.f1);
> 				logger.info <http://logger.info/>("key{} add after objectList.size{}",key,objectList.size());
> 				objectList = mapState.get(key);
> 				logger.info <http://logger.info/>("key{} mapState objectList.size{}",key,objectList.size());
>                                 ...
> 		}
> 
> 
> If I run this in idea  the  log detail:
> 
> 
> Best,
> Shengjk1
> 
> 
>