You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Bowen Li (JIRA)" <ji...@apache.org> on 2018/01/11 09:00:03 UTC

[jira] [Created] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()

Bowen Li created FLINK-8411:
-------------------------------

             Summary: inconsistent behavior between HeapListState#add() and RocksDBListState#add()
                 Key: FLINK-8411
                 URL: https://issues.apache.org/jira/browse/FLINK-8411
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.4.0
            Reporter: Bowen Li
             Fix For: 1.5.0, 1.4.1


You can see that HeapListState#add(null) will result in the whole state being cleared or wiped out.

{code:java}
// HeapListState
@Override
	public void add(V value) {
		final N namespace = currentNamespace;

		if (value == null) {
			clear();
			return;
		}

		final StateTable<K, N, ArrayList<V>> map = stateTable;
		ArrayList<V> list = map.get(namespace);

		if (list == null) {
			list = new ArrayList<>();
			map.put(namespace, list);
		}
		list.add(value);
	}
{code}



{code:java}
// RocksDBListState
@Override
	public void add(V value) throws IOException {
		try {
			writeCurrentKeyWithGroupAndNamespace();
			byte[] key = keySerializationStream.toByteArray();
			keySerializationStream.reset();
			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
			valueSerializer.serialize(value, out);
			backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
		} catch (Exception e) {
			throw new RuntimeException("Error while adding data to RocksDB", e);
		}
	}
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)