You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shengjk1 <js...@163.com> on 2019/09/19 11:42:04 UTC

In yarn-cluster model, mapState not support transfer value

Hi,all


As we know java map support transfer value,such as :


               HashMap<String, List<String>> stringListHashMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
List<String> a = stringListHashMap.get("a"+i%2);
if (a==null){
a=new ArrayList<>();
stringListHashMap.put("a"+i%2,a);
}
a.add("a"+i);
}
stringListHashMap.keySet().forEach(x-> System.out.println("keys========= "+x));
stringListHashMap.get("a0").forEach(x-> System.out.println("========x=== "+x));


 Result:
   keys========= a1
           keys========= a0
           ========x=== a0
           ========x=== a2  
           ========x=== a4
           ========x=== a6
           ========x=== a8




But flink on cluster,  mapstate not support transfer value, such as :


    .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
            ...
                @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
"kuduError",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<Object>>() {
})
);
...
}
...
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
...
List<Object> objectList = new ArrayList<>();
if (mapState.contains(key)) {
objectList = mapState.get(key);
}else{
mapState.put(key,objectList);
}
logger.info("key{} add before objectList.size {}",key,objectList.size());
objectList.add(stringObjectTuple2.f1);
logger.info("key{} add after objectList.size {}",key,objectList.size());
objectList = mapState.get(key);
logger.info("key{} mapState objectList.size {}",key,objectList.size());
        ...
                        }


If  I run this code in IDEA support transfer value, this log detail:
 
     keyorder2infos add before objectList.size 0
     keyorder2infos  add after objectList.size  1
     keyorder2infos mapState objectList.size 1   
                                           
    keyorder2infos add before objectList.size 1
    keyorder2infos  add after objectList.size  2
    keyorder2infos mapState objectList.size 2
     …  




But if run it on yarn-cluster not support transfer value ,this log detail:


    keyorder2infos add before objectList.size 0
    keyorder2infos  add after objectList.size  1
    keyorder2infos mapState objectList.size 0    
                                          
    keyorder2infos add before objectList.size 0
    keyorder2infos  add after objectList.size  1
    keyorder2infos mapState objectList.size 0
    …




So, I want to konw :
1.This is a bug or my code has some wrong?
2.Why mapstate on yarn-cluster  not support transfer value?  I have seen flink  source code ,but not find.




It runs on:
    Flink 1.9.0
    Java 1.8 
    Hadoop 2.6.0




Best,
Shengjk1




Re: In yarn-cluster model, mapState not support transfer value

Posted by shengjk1 <js...@163.com>.
Hi, Dian
Thanks for your reminder.  I saw HeapMaoState.java and TtlMapState.java, You are right. 


But  with objectList get large, every time call "mapState.put(key,objectList);”  is very influencing performance, even lead to checkpoint timeout. Now I am not have a better method to improve performance.  Maybe I need to redesign my program.


Best,
Shengjk1




On 09/19/2019 20:08,Dian Fu<di...@gmail.com> wrote:


Hi Shengjk1,


You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated.


I guess the result you see is because heap statebackend is used when running in IDE and rocksdb statebackend is used when running in YARN mode. You can check the configuration if it's the case.


Regards,
Dian


在 2019年9月19日,下午7:42,shengjk1 <js...@163.com> 写道:


Hi,all


As we know java map support transfer value,such as :


               HashMap<String, List<String>> stringListHashMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
List<String> a = stringListHashMap.get("a"+i%2);
if (a==null){
a=new ArrayList<>();
stringListHashMap.put("a"+i%2,a);
}
a.add("a"+i);
}
stringListHashMap.keySet().forEach(x-> System.out.println("keys========= "+x));
stringListHashMap.get("a0").forEach(x-> System.out.println("========x=== "+x));


 Result:
   keys========= a1
           keys========= a0
           ========x=== a0
           ========x=== a2  
           ========x=== a4
           ========x=== a6
           ========x=== a8




But flink on cluster,  mapstate not support transfer value, such as :


    .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
            ...
                @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
"kuduError",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<Object>>() {
})
);
...
}
...
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
...
List<Object> objectList = new ArrayList<>();
if (mapState.contains(key)) {
objectList = mapState.get(key);
}else{
mapState.put(key,objectList);
}
logger.info("key{} add before objectList.size {}",key,objectList.size());
objectList.add(stringObjectTuple2.f1);
logger.info("key{} add after objectList.size {}",key,objectList.size());
objectList = mapState.get(key);
logger.info("key{} mapState objectList.size {}",key,objectList.size());
        ...
                        }


If  I run this code in IDEA support transfer value, this log detail:
 
     keyorder2infos add before objectList.size 0
     keyorder2infos  add after objectList.size  1
     keyorder2infos mapState objectList.size 1   
                                           
    keyorder2infos add before objectList.size 1
    keyorder2infos  add after objectList.size  2
    keyorder2infos mapState objectList.size 2
     …  




But if run it on yarn-cluster not support transfer value ,this log detail:


    keyorder2infos add before objectList.size 0
    keyorder2infos  add after objectList.size  1
    keyorder2infos mapState objectList.size 0    
                                          
    keyorder2infos add before objectList.size 0
    keyorder2infos  add after objectList.size  1
    keyorder2infos mapState objectList.size 0
    …




So, I want to konw :
1.This is a bug or my code has some wrong?
2.Why mapstate on yarn-cluster  not support transfer value?  I have seen flink  source code ,but not find.




It runs on:
    Flink 1.9.0
    Java 1.8 
    Hadoop 2.6.0




Best,
Shengjk1








Re: In yarn-cluster model, mapState not support transfer value

Posted by Dian Fu <di...@gmail.com>.
Hi Shengjk1,

You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated.

I guess the result you see is because heap statebackend is used when running in IDE and rocksdb statebackend is used when running in YARN mode. You can check the configuration if it's the case.

Regards,
Dian

> 在 2019年9月19日,下午7:42,shengjk1 <js...@163.com> 写道:
> 
> Hi,all
> 
> As we know java map support transfer value,such as :
> 
>                HashMap<String, List<String>> stringListHashMap = new HashMap<>();
> 		for (int i = 0; i < 10; i++) {
> 			List<String> a = stringListHashMap.get("a"+i%2);
> 				if (a==null){
> 					a=new ArrayList<>();
> 					stringListHashMap.put("a"+i%2,a);
> 			}
> 			a.add("a"+i);
> 		}
> 		stringListHashMap.keySet().forEach(x-> System.out.println("keys========= "+x));
> 		stringListHashMap.get("a0").forEach(x-> System.out.println("========x=== "+x));
> 
>  Result:
> 	   keys========= a1
>            keys========= a0
>            ========x=== a0
>            ========x=== a2  
>            ========x=== a4
>            ========x=== a6
>            ========x=== a8
> 
> 
> But flink on cluster,  mapstate not support transfer value, such as :
> 
>     .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
>             ...
>                 	@Override
> 			public void open(Configuration parameters) throws Exception {
> 				super.open(parameters);
> 				
> 				StateTtlConfig ttlConfig = StateTtlConfig
> 						.newBuilder(Time.days(7))
> 						.cleanupInBackground()
> 						.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
> 						.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> 						.build();
> 				
> 				MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
> 						"kuduError",
> 						BasicTypeInfo.STRING_TYPE_INFO,
> 						TypeInformation.of(new TypeHint<List<Object>>() {
> 						})
> 				);
> 				...
> 			}
> 			...
> 			
> 			@Override
> 			public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
> 				...
> 				List<Object> objectList = new ArrayList<>();
> 				if (mapState.contains(key)) {
> 					objectList = mapState.get(key);
> 				}else{
> 					mapState.put(key,objectList);
> 				}
> 				logger.info <http://logger.info/>("key{} add before objectList.size {}",key,objectList.size());
> 				objectList.add(stringObjectTuple2.f1);
> 				logger.info <http://logger.info/>("key{} add after objectList.size {}",key,objectList.size());
> 				objectList = mapState.get(key);
> 				logger.info <http://logger.info/>("key{} mapState objectList.size {}",key,objectList.size());
> 			        ...	
>                         }
> 
> If  I run this code in IDEA support transfer value, this log detail:
>  
>      keyorder2infos add before objectList.size 0
>      keyorder2infos  add after objectList.size  1
>      keyorder2infos mapState objectList.size 1   
>                                            
>     keyorder2infos add before objectList.size 1
>     keyorder2infos  add after objectList.size  2
>     keyorder2infos mapState objectList.size 2
>      …  
> 
> 
> But if run it on yarn-cluster not support transfer value ,this log detail:
> 
>     keyorder2infos add before objectList.size 0
>     keyorder2infos  add after objectList.size  1
>     keyorder2infos mapState objectList.size 0    
>                                           
>     keyorder2infos add before objectList.size 0
>     keyorder2infos  add after objectList.size  1
>     keyorder2infos mapState objectList.size 0
>     …
> 
> 
> So, I want to konw :
> 1.This is a bug or my code has some wrong?
> 2.Why mapstate on yarn-cluster  not support transfer value?  I have seen flink  source code ,but not find.
> 
> 
> It runs on:
>     Flink 1.9.0
>     Java 1.8 
>     Hadoop 2.6.0
> 
> 
> Best,
> Shengjk1
> 
> 
>