You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Navneeth Krishnan <re...@gmail.com> on 2017/09/09 00:32:16 UTC
State Issue
Hi,
I'm experiencing a wired issue where any data put into map state when
retrieved with the same key is returning as null and hence it puts the same
value again and again. I used rocksdb state backend but tried with Memory
state backend too but the issue still exist.
Each time when I set the key and value into MapState it creates a new map I
couldn't access the previous value. But when I iterate over the MapState
keys and values, I can see the same key added multiple times.
Each put operation goes through the code lines marked in red.
*NestedMapsStateTable.java*
S get(K key, int keyGroupIndex, N namespace) {
checkKeyNamespacePreconditions(key, namespace);
Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
* if (namespaceMap == null) { return null; }*
Map<K, S> keyedMap = namespaceMap.get(namespace);
if (keyedMap == null) {
return null;
}
return keyedMap.get(key);
}
*HeapMapState.java*
@Override
public void put(UK userKey, UV userValue) {
HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
* if (userMap == null) { userMap = new HashMap<>();
stateTable.put(currentNamespace, userMap); }*
userMap.put(userKey, userValue);
}
*My Code:*
*open()*
MapStateDescriptor<String, String> testStateDescriptor = new
MapStateDescriptor<>("test-state",
TypeInformation.of(new TypeHint<String>() {}),
TypeInformation.of(new TypeHint<String>() {}));
testState = getRuntimeContext().getMapState(testStateDescriptor);
*flatMap:*
if(testState.contains(user)){
*// DO Something*
} else {
testState.put(user, userInfo);
}
streamEnv.setStateBackend(new MemoryStateBackend());
streamEnv.setParallelism(1);
Thanks
Re: State Issue
Posted by Navneeth Krishnan <re...@gmail.com>.
Sorry my bad, figured out it was a change done at our end which created
different keys. Thanks.
On Fri, Sep 8, 2017 at 5:32 PM, Navneeth Krishnan <re...@gmail.com>
wrote:
> Hi,
>
> I'm experiencing a wired issue where any data put into map state when
> retrieved with the same key is returning as null and hence it puts the same
> value again and again. I used rocksdb state backend but tried with Memory
> state backend too but the issue still exist.
>
> Each time when I set the key and value into MapState it creates a new map
> I couldn't access the previous value. But when I iterate over the MapState
> keys and values, I can see the same key added multiple times.
>
> Each put operation goes through the code lines marked in red.
>
> *NestedMapsStateTable.java*
>
> S get(K key, int keyGroupIndex, N namespace) {
>
> checkKeyNamespacePreconditions(key, namespace);
>
> Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
>
>
>
> * if (namespaceMap == null) { return null; }*
>
> Map<K, S> keyedMap = namespaceMap.get(namespace);
>
> if (keyedMap == null) {
> return null;
> }
>
> return keyedMap.get(key);
> }
>
>
> *HeapMapState.java*
>
> @Override
> public void put(UK userKey, UV userValue) {
>
> HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
>
>
>
> * if (userMap == null) { userMap = new HashMap<>(); stateTable.put(currentNamespace, userMap); }*
>
> userMap.put(userKey, userValue);
> }
>
>
> *My Code:*
>
> *open()*
>
> MapStateDescriptor<String, String> testStateDescriptor = new MapStateDescriptor<>("test-state",
> TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<String>() {}));
>
> testState = getRuntimeContext().getMapState(testStateDescriptor);
>
>
> *flatMap:*
>
> if(testState.contains(user)){
> *// DO Something*
> } else {
> testState.put(user, userInfo);
> }
>
>
> streamEnv.setStateBackend(new MemoryStateBackend());
>
> streamEnv.setParallelism(1);
>
>
> Thanks
>
>