You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/02 08:19:39 UTC

[4/7] flink git commit: [FLINK-7700] Fix RocksDB ReducingState merging

[FLINK-7700] Fix RocksDB ReducingState merging

Before, the merged state was not cleared.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0bd8c06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0bd8c06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0bd8c06

Branch: refs/heads/release-1.3
Commit: e0bd8c060da01e5defe528964bf638868cbc13d5
Parents: 5e3d49c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 12:53:08 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:26:12 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/contrib/streaming/state/RocksDBReducingState.java  | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0bd8c06/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index ccc98a7..9bc6fb9 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -140,6 +140,7 @@ public class RocksDBReducingState<K, N, V>
 
 					final byte[] sourceKey = keySerializationStream.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+					backend.db.remove(columnFamily, sourceKey);
 
 					if (valueBytes != null) {
 						V value = valueSerializer.deserialize(