You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/01/31 15:05:20 UTC

flink git commit: Revert "[FLINK-8411] [State Backends] HeapListState#add(null) will wipe out entire list state"

Repository: flink
Updated Branches:
  refs/heads/release-1.4 110b86dd2 -> 4219572a8


Revert "[FLINK-8411] [State Backends] HeapListState#add(null) will wipe out entire list state"

This reverts commit df0526172334ec619ee9a5a70006f4ad2f3e2167.

I'm reverting because while this is fixing a bug it is changing
behaviour that has been in place for a long while which might break
existing user programs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4219572a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4219572a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4219572a

Branch: refs/heads/release-1.4
Commit: 4219572a879c3628e203637c2592b4e73388c00a
Parents: 110b86d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jan 30 17:56:45 2018 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jan 31 16:04:52 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/contrib/streaming/state/RocksDBListState.java  | 4 ----
 .../java/org/apache/flink/api/common/state/AppendingState.java  | 2 --
 .../java/org/apache/flink/runtime/state/heap/HeapListState.java | 5 +++--
 .../org/apache/flink/runtime/state/StateBackendTestBase.java    | 3 ---
 4 files changed, 3 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4219572a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index ccd7e64..f8ed244 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -107,10 +107,6 @@ public class RocksDBListState<K, N, V>
 
 	@Override
 	public void add(V value) throws IOException {
-		if (value == null) {
-			return;
-		}
-
 		try {
 			writeCurrentKeyWithGroupAndNamespace();
 			byte[] key = keySerializationStream.toByteArray();

http://git-wip-us.apache.org/repos/asf/flink/blob/4219572a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
index c17a849..dd070a9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -63,8 +63,6 @@ public interface AppendingState<IN, OUT> extends State {
 	 * Updates the operator state accessible by {@link #get()} by adding the given value
 	 * to the list of values. The next time {@link #get()} is called (for the same state
 	 * partition) the returned state will represent the updated list.
-	 *
-	 * If `null` is passed in, the state value will remain unchanged
 	 * 
 	 * @param value
 	 *            The new value for the state.

http://git-wip-us.apache.org/repos/asf/flink/blob/4219572a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 737897c..d3f67f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -66,12 +66,13 @@ public class HeapListState<K, N, V>
 
 	@Override
 	public void add(V value) {
+		final N namespace = currentNamespace;
+
 		if (value == null) {
+			clear();
 			return;
 		}
 
-		final N namespace = currentNamespace;
-
 		final StateTable<K, N, ArrayList<V>> map = stateTable;
 		ArrayList<V> list = map.get(namespace);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4219572a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3b63846..0ac607f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1288,8 +1288,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			keyedBackend.setCurrentKey("abc");
 			assertNull(state.get());
-			state.add(null);
-			assertNull(state.get());
 
 			keyedBackend.setCurrentKey("def");
 			assertNull(state.get());
@@ -1314,7 +1312,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			keyedBackend.setCurrentKey("g");
 			state.add(3L);
 			state.add(2L);
-			state.add(null);
 			state.add(1L);
 
 			keyedBackend.setCurrentKey("def");