You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Navneeth Krishnan <re...@gmail.com> on 2017/09/09 00:32:16 UTC

State Issue

Hi,

I'm experiencing a wired issue where any data put into map state when
retrieved with the same key is returning as null and hence it puts the same
value again and again. I used rocksdb state backend but tried with Memory
state backend too but the issue still exist.

Each time when I set the key and value into MapState it creates a new map I
couldn't access the previous value. But when I iterate over the MapState
keys and values, I can see the same key added multiple times.

Each put operation goes through the code lines marked in red.

*NestedMapsStateTable.java*

S get(K key, int keyGroupIndex, N namespace) {

   checkKeyNamespacePreconditions(key, namespace);

   Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);



* if (namespaceMap == null) {      return null;   }*

   Map<K, S> keyedMap = namespaceMap.get(namespace);

   if (keyedMap == null) {
      return null;
   }

   return keyedMap.get(key);
}


*HeapMapState.java*

@Override
public void put(UK userKey, UV userValue) {

   HashMap<UK, UV> userMap = stateTable.get(currentNamespace);



* if (userMap == null) {      userMap = new HashMap<>();
stateTable.put(currentNamespace, userMap);   }*

   userMap.put(userKey, userValue);
}


*My Code:*

*open()*

MapStateDescriptor<String, String> testStateDescriptor = new
MapStateDescriptor<>("test-state",
        TypeInformation.of(new TypeHint<String>() {}),
TypeInformation.of(new TypeHint<String>() {}));

testState = getRuntimeContext().getMapState(testStateDescriptor);


*flatMap:*

if(testState.contains(user)){
    *// DO Something*
} else {
    testState.put(user, userInfo);
}


streamEnv.setStateBackend(new MemoryStateBackend());

streamEnv.setParallelism(1);


Thanks

Re: State Issue

Posted by Navneeth Krishnan <re...@gmail.com>.
Sorry my bad, figured out it was a change done at our end which created
different keys. Thanks.

On Fri, Sep 8, 2017 at 5:32 PM, Navneeth Krishnan <re...@gmail.com>
wrote:

> Hi,
>
> I'm experiencing a wired issue where any data put into map state when
> retrieved with the same key is returning as null and hence it puts the same
> value again and again. I used rocksdb state backend but tried with Memory
> state backend too but the issue still exist.
>
> Each time when I set the key and value into MapState it creates a new map
> I couldn't access the previous value. But when I iterate over the MapState
> keys and values, I can see the same key added multiple times.
>
> Each put operation goes through the code lines marked in red.
>
> *NestedMapsStateTable.java*
>
> S get(K key, int keyGroupIndex, N namespace) {
>
>    checkKeyNamespacePreconditions(key, namespace);
>
>    Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
>
>
>
> * if (namespaceMap == null) {      return null;   }*
>
>    Map<K, S> keyedMap = namespaceMap.get(namespace);
>
>    if (keyedMap == null) {
>       return null;
>    }
>
>    return keyedMap.get(key);
> }
>
>
> *HeapMapState.java*
>
> @Override
> public void put(UK userKey, UV userValue) {
>
>    HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
>
>
>
> * if (userMap == null) {      userMap = new HashMap<>();      stateTable.put(currentNamespace, userMap);   }*
>
>    userMap.put(userKey, userValue);
> }
>
>
> *My Code:*
>
> *open()*
>
> MapStateDescriptor<String, String> testStateDescriptor = new MapStateDescriptor<>("test-state",
>         TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<String>() {}));
>
> testState = getRuntimeContext().getMapState(testStateDescriptor);
>
>
> *flatMap:*
>
> if(testState.contains(user)){
>     *// DO Something*
> } else {
>     testState.put(user, userInfo);
> }
>
>
> streamEnv.setStateBackend(new MemoryStateBackend());
>
> streamEnv.setParallelism(1);
>
>
> Thanks
>
>