You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/02 08:19:41 UTC

[6/7] flink git commit: [FLINK-7700] Fix RocksDB AggregatingState merging

[FLINK-7700] Fix RocksDB AggregatingState merging

Before, the merged state was not cleared.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a7c091
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a7c091
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a7c091

Branch: refs/heads/release-1.3
Commit: b0a7c091856aa2a99715a9fddcc801f660414cfd
Parents: 0ffca86
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 13:01:55 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:26:29 2017 +0200

----------------------------------------------------------------------
 .../flink/contrib/streaming/state/RocksDBAggregatingState.java      | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7c091/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 1f306b4..c72b94e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -157,6 +157,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 					
 					final byte[] sourceKey = keySerializationStream.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+					backend.db.remove(columnFamily, sourceKey);
 
 					if (valueBytes != null) {
 						ACC value = valueSerializer.deserialize(