You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Bowen Li (JIRA)" <ji...@apache.org> on 2018/01/11 09:00:03 UTC
[jira] [Created] (FLINK-8411) inconsistent behavior between
HeapListState#add() and RocksDBListState#add()
Bowen Li created FLINK-8411:
-------------------------------
Summary: inconsistent behavior between HeapListState#add() and RocksDBListState#add()
Key: FLINK-8411
URL: https://issues.apache.org/jira/browse/FLINK-8411
Project: Flink
Issue Type: Bug
Affects Versions: 1.4.0
Reporter: Bowen Li
Fix For: 1.5.0, 1.4.1
You can see that HeapListState#add(null) will result in the whole state being cleared or wiped out.
{code:java}
// HeapListState
@Override
public void add(V value) {
final N namespace = currentNamespace;
if (value == null) {
clear();
return;
}
final StateTable<K, N, ArrayList<V>> map = stateTable;
ArrayList<V> list = map.get(namespace);
if (list == null) {
list = new ArrayList<>();
map.put(namespace, list);
}
list.add(value);
}
{code}
{code:java}
// RocksDBListState
@Override
public void add(V value) throws IOException {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
keySerializationStream.reset();
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
valueSerializer.serialize(value, out);
backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)