You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/01/31 15:05:20 UTC
flink git commit: Revert "[FLINK-8411] [State Backends]
HeapListState#add(null) will wipe out entire list state"
Repository: flink
Updated Branches:
refs/heads/release-1.4 110b86dd2 -> 4219572a8
Revert "[FLINK-8411] [State Backends] HeapListState#add(null) will wipe out entire list state"
This reverts commit df0526172334ec619ee9a5a70006f4ad2f3e2167.
I'm reverting because while this is fixing a bug it is changing
behaviour that has been in place for a long while which might break
existing user programs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4219572a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4219572a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4219572a
Branch: refs/heads/release-1.4
Commit: 4219572a879c3628e203637c2592b4e73388c00a
Parents: 110b86d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jan 30 17:56:45 2018 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jan 31 16:04:52 2018 +0100
----------------------------------------------------------------------
.../apache/flink/contrib/streaming/state/RocksDBListState.java | 4 ----
.../java/org/apache/flink/api/common/state/AppendingState.java | 2 --
.../java/org/apache/flink/runtime/state/heap/HeapListState.java | 5 +++--
.../org/apache/flink/runtime/state/StateBackendTestBase.java | 3 ---
4 files changed, 3 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4219572a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index ccd7e64..f8ed244 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -107,10 +107,6 @@ public class RocksDBListState<K, N, V>
@Override
public void add(V value) throws IOException {
- if (value == null) {
- return;
- }
-
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
http://git-wip-us.apache.org/repos/asf/flink/blob/4219572a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
index c17a849..dd070a9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -63,8 +63,6 @@ public interface AppendingState<IN, OUT> extends State {
* Updates the operator state accessible by {@link #get()} by adding the given value
* to the list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
- *
- * If `null` is passed in, the state value will remain unchanged
*
* @param value
* The new value for the state.
http://git-wip-us.apache.org/repos/asf/flink/blob/4219572a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 737897c..d3f67f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -66,12 +66,13 @@ public class HeapListState<K, N, V>
@Override
public void add(V value) {
+ final N namespace = currentNamespace;
+
if (value == null) {
+ clear();
return;
}
- final N namespace = currentNamespace;
-
final StateTable<K, N, ArrayList<V>> map = stateTable;
ArrayList<V> list = map.get(namespace);
http://git-wip-us.apache.org/repos/asf/flink/blob/4219572a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3b63846..0ac607f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1288,8 +1288,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
keyedBackend.setCurrentKey("abc");
assertNull(state.get());
- state.add(null);
- assertNull(state.get());
keyedBackend.setCurrentKey("def");
assertNull(state.get());
@@ -1314,7 +1312,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
keyedBackend.setCurrentKey("g");
state.add(3L);
state.add(2L);
- state.add(null);
state.add(1L);
keyedBackend.setCurrentKey("def");