You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/28 15:09:06 UTC

[1/8] flink git commit: [hotfix] Add checkstyle suppressions for StateBackendTestBase

Repository: flink
Updated Branches:
  refs/heads/master 3ff059299 -> 9d7bc7bd6


[hotfix] Add checkstyle suppressions for StateBackendTestBase

The commits after this one move consolidated tests to this file which
breaks the 3000 lines limit.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b661868
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b661868
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b661868

Branch: refs/heads/master
Commit: 2b6618688ac9a5fc4e9df7a5abbb3c95ed84bd71
Parents: 3ff0592
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 13:45:13 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:02:19 2017 +0200

----------------------------------------------------------------------
 tools/maven/suppressions-runtime.xml | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b661868/tools/maven/suppressions-runtime.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 74fbeac..59f3019 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -198,4 +198,7 @@ under the License.
 	<suppress
 		files="(.*)test[/\\](.*)runtime[/\\]zookeeper[/\\](.*)"
 		checks="AvoidStarImport"/>
+	<suppress
+		files="(.*)StateBackendTestBase.java"
+		checks="FileLength"/>
 </suppressions>


[4/8] flink git commit: [FLINK-7700] Fix RocksDB ListState merging

Posted by al...@apache.org.
[FLINK-7700] Fix RocksDB ListState merging

Before, the merged state was not cleared.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93456ff7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93456ff7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93456ff7

Branch: refs/heads/master
Commit: 93456ff7c51a7184c385ce0326a8e190ecc9348b
Parents: fc9bc2f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 11:37:09 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:02:55 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/contrib/streaming/state/RocksDBListState.java  | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93456ff7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 9d3e97e..f8ed244 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -146,6 +146,7 @@ public class RocksDBListState<K, N, V>
 
 					byte[] sourceKey = keySerializationStream.toByteArray();
 					byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+					backend.db.delete(columnFamily, sourceKey);
 
 					if (valueBytes != null) {
 						backend.db.merge(columnFamily, writeOptions, targetKey, valueBytes);


[8/8] flink git commit: [FLINK-5619] Consolidate AggregatingState Tests in StateBackendTestBase

Posted by al...@apache.org.
[FLINK-5619] Consolidate AggregatingState Tests in StateBackendTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d7bc7bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d7bc7bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d7bc7bd

Branch: refs/heads/master
Commit: 9d7bc7bd66d60329dabb3cbd5d8be3a4f49e823b
Parents: ce0f83e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 13:02:57 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:03:54 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBAggregatingStateTest.java      | 253 ------------------
 .../runtime/state/StateBackendTestBase.java     | 206 +++++++++++++++
 .../state/heap/HeapAggregatingStateTest.java    | 256 -------------------
 3 files changed, 206 insertions(+), 509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d7bc7bd/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
deleted file mode 100644
index f3065ab..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the {@link InternalAggregatingState} implementation on top of RocksDB.
- */
-public class RocksDBAggregatingStateTest {
-
-	@Rule
-	public final TemporaryFolder tmp = new TemporaryFolder();
-
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testAddAndGet() throws Exception {
-
-		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
-				new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
-		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
-		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
-		try {
-			InternalAggregatingState<VoidNamespace, Long, Long> state =
-					keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
-			state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-			state.add(17L);
-			state.add(11L);
-			assertEquals(28L, state.get().longValue());
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertNull(state.get());
-			state.add(1L);
-			state.add(2L);
-
-			keyedBackend.setCurrentKey("def");
-			assertEquals(28L, state.get().longValue());
-			state.clear();
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			state.add(3L);
-			state.add(2L);
-			state.add(1L);
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertEquals(9L, state.get().longValue());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	@Test
-	public void testMerging() throws Exception {
-
-		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
-				new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final TimeWindow win1 = new TimeWindow(1000, 2000);
-		final TimeWindow win2 = new TimeWindow(2000, 3000);
-		final TimeWindow win3 = new TimeWindow(3000, 4000);
-
-		final Long expectedResult = 165L;
-
-		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
-		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
-		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
-		try {
-			InternalAggregatingState<TimeWindow, Long, Long> state =
-					keyedBackend.createAggregatingState(new TimeWindow.Serializer(), stateDescr);
-
-			// populate the different namespaces
-			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
-			//  - ghi is empty
-			//  - jkl has all elements already in the target namespace
-			//  - mno has all elements already in one source namespace
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(win1);
-			state.add(33L);
-			state.add(55L);
-
-			state.setCurrentNamespace(win2);
-			state.add(22L);
-			state.add(11L);
-
-			state.setCurrentNamespace(win3);
-			state.add(44L);
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(win1);
-			state.add(11L);
-			state.add(44L);
-
-			state.setCurrentNamespace(win3);
-			state.add(22L);
-			state.add(55L);
-			state.add(33L);
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(win1);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(win3);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("abc");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("def");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("ghi");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("jkl");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("mno");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
-		RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
-						new DummyEnvironment("TestTask", 1, 0),
-						new JobID(),
-						"test-op",
-						StringSerializer.INSTANCE,
-						16,
-						new KeyGroupRange(2, 3),
-						mock(TaskKvStateRegistry.class));
-
-		keyedBackend.restore(null);
-
-		return keyedBackend;
-	}
-
-	//  test functions
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
-
-		@Override
-		public MutableLong createAccumulator() {
-			return new MutableLong();
-		}
-
-		@Override
-		public void add(Long value, MutableLong accumulator) {
-			accumulator.value += value;
-		}
-
-		@Override
-		public Long getResult(MutableLong accumulator) {
-			return accumulator.value;
-		}
-
-		@Override
-		public MutableLong merge(MutableLong a, MutableLong b) {
-			a.value += b.value;
-			return a;
-		}
-	}
-
-	private static final class MutableLong {
-		long value;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7bc7bd/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 5b8e8aa..ed280a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -57,6 +60,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.heap.AbstractHeapState;
 import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
 import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -1710,6 +1714,179 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
+	public void testAggregatingStateAddAndGet() throws Exception {
+
+		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+			new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+		
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			AggregatingState<Long, Long> state =
+				keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(28L, state.get().longValue());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(28L, state.get().longValue());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(9L, state.get().longValue());
+			state.clear();
+
+			// make sure all lists / maps are cleared
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testAggregatingStateMerging() throws Exception {
+
+		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+			new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		final Long expectedResult = 165L;
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			InternalAggregatingState<Integer, Long, Long> state =
+				(InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(namespace2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			// make sure all lists / maps are cleared
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("ghi");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
 	@SuppressWarnings("unchecked,rawtypes")
 	public void testFoldingState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
@@ -3204,4 +3381,33 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			throw new ExpectedKryoTestException();
 		}
 	}
+
+	@SuppressWarnings("serial")
+	private static class AggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+
+		@Override
+		public MutableLong createAccumulator() {
+			return new MutableLong();
+		}
+
+		@Override
+		public void add(Long value, MutableLong accumulator) {
+			accumulator.value += value;
+		}
+
+		@Override
+		public Long getResult(MutableLong accumulator) {
+			return accumulator.value;
+		}
+
+		@Override
+		public MutableLong merge(MutableLong a, MutableLong b) {
+			a.value += b.value;
+			return a;
+		}
+	}
+
+	private static final class MutableLong {
+		long value;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7bc7bd/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
deleted file mode 100644
index cb4e403..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.junit.Test;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the simple Java heap objects implementation of the {@link AggregatingState}.
- */
-public class HeapAggregatingStateTest extends HeapStateBackendTestBase {
-
-	@Test
-	public void testAddAndGet() throws Exception {
-
-		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
-				new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
-		try {
-			InternalAggregatingState<VoidNamespace, Long, Long> state =
-					keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
-			state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-			state.add(17L);
-			state.add(11L);
-			assertEquals(28L, state.get().longValue());
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertNull(state.get());
-			state.add(1L);
-			state.add(2L);
-
-			keyedBackend.setCurrentKey("def");
-			assertEquals(28L, state.get().longValue());
-			state.clear();
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			state.add(3L);
-			state.add(2L);
-			state.add(1L);
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertEquals(9L, state.get().longValue());
-			state.clear();
-
-			// make sure all lists / maps are cleared
-
-			StateTable<String, VoidNamespace, MutableLong> stateTable =
-					((HeapAggregatingState<String, VoidNamespace, Long, MutableLong, Long>) state).stateTable;
-
-			assertTrue(stateTable.isEmpty());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	@Test
-	public void testMerging() throws Exception {
-
-		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
-				new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final Integer namespace1 = 1;
-		final Integer namespace2 = 2;
-		final Integer namespace3 = 3;
-
-		final Long expectedResult = 165L;
-
-		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
-		try {
-			InternalAggregatingState<Integer, Long, Long> state =
-					keyedBackend.createAggregatingState(IntSerializer.INSTANCE, stateDescr);
-
-			// populate the different namespaces
-			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
-			//  - ghi is empty
-			//  - jkl has all elements already in the target namespace
-			//  - mno has all elements already in one source namespace
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(namespace1);
-			state.add(33L);
-			state.add(55L);
-
-			state.setCurrentNamespace(namespace2);
-			state.add(22L);
-			state.add(11L);
-
-			state.setCurrentNamespace(namespace3);
-			state.add(44L);
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(namespace1);
-			state.add(11L);
-			state.add(44L);
-
-			state.setCurrentNamespace(namespace3);
-			state.add(22L);
-			state.add(55L);
-			state.add(33L);
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(namespace1);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(namespace3);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("abc");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("def");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("ghi");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("jkl");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("mno");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			// make sure all lists / maps are cleared
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("ghi");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			StateTable<String, Integer, MutableLong> stateTable =
-					((HeapAggregatingState<String, Integer, Long, MutableLong, Long>) state).stateTable;
-
-			assertTrue(stateTable.isEmpty());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test functions
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
-
-		@Override
-		public MutableLong createAccumulator() {
-			return new MutableLong();
-		}
-
-		@Override
-		public void add(Long value, MutableLong accumulator) {
-			accumulator.value += value;
-		}
-
-		@Override
-		public Long getResult(MutableLong accumulator) {
-			return accumulator.value;
-		}
-
-		@Override
-		public MutableLong merge(MutableLong a, MutableLong b) {
-			a.value += b.value;
-			return a;
-		}
-	}
-
-	private static final class MutableLong {
-		long value;
-	}
-}


[5/8] flink git commit: [FLINK-5619] Consolidate ListState Tests in StateBackendTestBase

Posted by al...@apache.org.
[FLINK-5619] Consolidate ListState Tests in StateBackendTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f03f8324
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f03f8324
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f03f8324

Branch: refs/heads/master
Commit: f03f8324268abab8bc9605d3786f6a63158cebbf
Parents: 93456ff
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 11:38:13 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:02:55 2017 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBListStateTest.java   | 236 -------------------
 .../runtime/state/StateBackendTestBase.java     | 173 ++++++++++++++
 .../runtime/state/heap/HeapListStateTest.java   | 234 ------------------
 3 files changed, 173 insertions(+), 470 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f03f8324/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
deleted file mode 100644
index c6ccd5d..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the {@link ListState} implementation on top of RocksDB.
- */
-public class RocksDBListStateTest {
-
-	@Rule
-	public final TemporaryFolder tmp = new TemporaryFolder();
-
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testAddAndGet() throws Exception {
-
-		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
-		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
-		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
-		try {
-			InternalListState<VoidNamespace, Long> state =
-					keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
-			state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-			state.add(17L);
-			state.add(11L);
-			assertEquals(asList(17L, 11L), state.get());
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertNull(state.get());
-			state.add(1L);
-			state.add(2L);
-
-			keyedBackend.setCurrentKey("def");
-			assertEquals(asList(17L, 11L), state.get());
-			state.clear();
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			state.add(3L);
-			state.add(2L);
-			state.add(1L);
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	@Test
-	public void testMerging() throws Exception {
-
-		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final TimeWindow win1 = new TimeWindow(1000, 2000);
-		final TimeWindow win2 = new TimeWindow(2000, 3000);
-		final TimeWindow win3 = new TimeWindow(3000, 4000);
-
-		final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 33L, 44L, 55L));
-
-		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
-		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
-		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
-		try {
-			InternalListState<TimeWindow, Long> state = keyedBackend.createListState(new TimeWindow.Serializer(), stateDescr);
-
-			// populate the different namespaces
-			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
-			//  - ghi is empty
-			//  - jkl has all elements already in the target namespace
-			//  - mno has all elements already in one source namespace
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(win1);
-			state.add(33L);
-			state.add(55L);
-
-			state.setCurrentNamespace(win2);
-			state.add(22L);
-			state.add(11L);
-
-			state.setCurrentNamespace(win3);
-			state.add(44L);
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(win1);
-			state.add(11L);
-			state.add(44L);
-
-			state.setCurrentNamespace(win3);
-			state.add(22L);
-			state.add(55L);
-			state.add(33L);
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(win1);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(win3);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("abc");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			validateResult(state.get(), expectedResult);
-
-			keyedBackend.setCurrentKey("def");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			validateResult(state.get(), expectedResult);
-
-			keyedBackend.setCurrentKey("ghi");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("jkl");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			validateResult(state.get(), expectedResult);
-
-			keyedBackend.setCurrentKey("mno");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			validateResult(state.get(), expectedResult);
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
-		RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
-				new DummyEnvironment("TestTask", 1, 0),
-				new JobID(),
-				"test-op",
-				StringSerializer.INSTANCE,
-				16,
-				new KeyGroupRange(2, 3),
-				mock(TaskKvStateRegistry.class));
-
-		keyedBackend.restore(null);
-
-		return keyedBackend;
-	}
-
-	private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
-		int num = 0;
-		for (T v : values) {
-			num++;
-			assertTrue(expected.contains(v));
-		}
-
-		assertEquals(expected.size(), num);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f03f8324/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 9f8136c..f6b79b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.state.heap.AbstractHeapState;
 import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
 import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.types.IntValue;
@@ -93,7 +94,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableFuture;
 import java.util.stream.Stream;
 
+import static java.util.Arrays.asList;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -1269,6 +1272,176 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
+	public void testListStateAddAndGet() throws Exception {
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+
+		try {
+			ListState<Long> state =
+				keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertThat(state.get(), containsInAnyOrder(17L, 11L));
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertThat(state.get(), containsInAnyOrder(11L, 17L));
+
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertThat(state.get(), containsInAnyOrder(1L, 2L, 3L, 2L, 1L));
+
+			state.clear();
+
+			// make sure all lists / maps are cleared
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		} finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testListStateMerging() throws Exception {
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		try {
+			InternalListState<Integer, Long> state =
+				(InternalListState<Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(namespace2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertThat(state.get(), containsInAnyOrder(11L, 22L, 33L, 44L, 55L));
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertThat(state.get(), containsInAnyOrder(11L, 22L, 33L, 44L, 55L));
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertThat(state.get(), containsInAnyOrder(11L, 22L, 33L, 44L, 55L));
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertThat(state.get(), containsInAnyOrder(11L, 22L, 33L, 44L, 55L));
+
+			// make sure all lists / maps are cleared
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("ghi");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
 	@SuppressWarnings("unchecked")
 	public void testReducingState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();

http://git-wip-us.apache.org/repos/asf/flink/blob/f03f8324/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
deleted file mode 100644
index affbf75..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the simple Java heap objects implementation of the {@link ListState}.
- */
-public class HeapListStateTest extends HeapStateBackendTestBase {
-
-	@Test
-	public void testAddAndGet() throws Exception {
-
-		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
-		try {
-			InternalListState<VoidNamespace, Long> state =
-					keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
-			state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-			state.add(17L);
-			state.add(11L);
-			assertEquals(asList(17L, 11L), state.get());
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertNull(state.get());
-			state.add(1L);
-			state.add(2L);
-
-			keyedBackend.setCurrentKey("def");
-			assertEquals(asList(17L, 11L), state.get());
-			state.clear();
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			state.add(3L);
-			state.add(2L);
-			state.add(1L);
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get());
-			state.clear();
-
-			// make sure all lists / maps are cleared
-
-			StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
-					((HeapListState<String, VoidNamespace, Long>) state).stateTable;
-
-			assertTrue(stateTable.isEmpty());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	@Test
-	public void testMerging() throws Exception {
-
-		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final Integer namespace1 = 1;
-		final Integer namespace2 = 2;
-		final Integer namespace3 = 3;
-
-		final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 33L, 44L, 55L));
-
-		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
-		try {
-			InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
-
-			// populate the different namespaces
-			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
-			//  - ghi is empty
-			//  - jkl has all elements already in the target namespace
-			//  - mno has all elements already in one source namespace
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(namespace1);
-			state.add(33L);
-			state.add(55L);
-
-			state.setCurrentNamespace(namespace2);
-			state.add(22L);
-			state.add(11L);
-
-			state.setCurrentNamespace(namespace3);
-			state.add(44L);
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(namespace1);
-			state.add(11L);
-			state.add(44L);
-
-			state.setCurrentNamespace(namespace3);
-			state.add(22L);
-			state.add(55L);
-			state.add(33L);
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(namespace1);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(namespace3);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("abc");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			validateResult(state.get(), expectedResult);
-
-			keyedBackend.setCurrentKey("def");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			validateResult(state.get(), expectedResult);
-
-			keyedBackend.setCurrentKey("ghi");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("jkl");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			validateResult(state.get(), expectedResult);
-
-			keyedBackend.setCurrentKey("mno");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			validateResult(state.get(), expectedResult);
-
-			// make sure all lists / maps are cleared
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("ghi");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			StateTable<String, Integer, ArrayList<Long>> stateTable =
-					((HeapListState<String, Integer, Long>) state).stateTable;
-
-			assertTrue(stateTable.isEmpty());
-			assertEquals(0, keyedBackend.numStateEntries());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-	
-	private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
-		int num = 0;
-		for (T v : values) {
-			num++;
-			assertTrue(expected.contains(v));
-		}
-
-		assertEquals(expected.size(), num);
-	}
-}


[7/8] flink git commit: [FLINK-7700] Fix RocksDB ReducingState merging

Posted by al...@apache.org.
[FLINK-7700] Fix RocksDB ReducingState merging

Before, the merged state was not cleared.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce0f83e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce0f83e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce0f83e2

Branch: refs/heads/master
Commit: ce0f83e20ace522441b293f8bd82062d1adeee0e
Parents: 560e5f3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 13:01:55 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:03:54 2017 +0200

----------------------------------------------------------------------
 .../flink/contrib/streaming/state/RocksDBAggregatingState.java      | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce0f83e2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index fc84456..8fce21c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -157,6 +157,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 
 					final byte[] sourceKey = keySerializationStream.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+					backend.db.delete(columnFamily, sourceKey);
 
 					if (valueBytes != null) {
 						ACC value = valueSerializer.deserialize(


[6/8] flink git commit: [FLINK-5619] Consolidate ReducingState Tests in StateBackendTestBase

Posted by al...@apache.org.
[FLINK-5619] Consolidate ReducingState Tests in StateBackendTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/560e5f3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/560e5f3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/560e5f3b

Branch: refs/heads/master
Commit: 560e5f3b4ad9ced5580553c208d341ebc0aa5a18
Parents: c691856
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 12:54:19 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:03:54 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBReducingStateTest.java         | 239 -------------------
 .../runtime/state/StateBackendTestBase.java     | 174 ++++++++++++++
 .../state/StateSnapshotCompressionTest.java     |   9 +-
 .../state/heap/HeapReducingStateTest.java       | 236 ------------------
 .../state/heap/HeapStateBackendTestBase.java    |   2 +-
 5 files changed, 179 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
deleted file mode 100644
index 0733dce..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the {@link ReducingState} implementation on top of RocksDB.
- */
-public class RocksDBReducingStateTest {
-
-	@Rule
-	public final TemporaryFolder tmp = new TemporaryFolder();
-
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testAddAndGet() throws Exception {
-
-		final ReducingStateDescriptor<Long> stateDescr =
-				new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
-		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
-		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
-		try {
-			InternalReducingState<VoidNamespace, Long> state =
-					keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
-			state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-			state.add(17L);
-			state.add(11L);
-			assertEquals(28L, state.get().longValue());
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertNull(state.get());
-			state.add(1L);
-			state.add(2L);
-
-			keyedBackend.setCurrentKey("def");
-			assertEquals(28L, state.get().longValue());
-			state.clear();
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			state.add(3L);
-			state.add(2L);
-			state.add(1L);
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertEquals(9L, state.get().longValue());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	@Test
-	public void testMerging() throws Exception {
-
-		final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>(
-				"my-state", new AddingFunction(), Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final TimeWindow win1 = new TimeWindow(1000, 2000);
-		final TimeWindow win2 = new TimeWindow(2000, 3000);
-		final TimeWindow win3 = new TimeWindow(3000, 4000);
-
-		final Long expectedResult = 165L;
-
-		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
-		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
-		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
-		try {
-			final InternalReducingState<TimeWindow, Long> state =
-					keyedBackend.createReducingState(new TimeWindow.Serializer(), stateDescr);
-
-			// populate the different namespaces
-			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
-			//  - ghi is empty
-			//  - jkl has all elements already in the target namespace
-			//  - mno has all elements already in one source namespace
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(win1);
-			state.add(33L);
-			state.add(55L);
-
-			state.setCurrentNamespace(win2);
-			state.add(22L);
-			state.add(11L);
-
-			state.setCurrentNamespace(win3);
-			state.add(44L);
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(win1);
-			state.add(11L);
-			state.add(44L);
-
-			state.setCurrentNamespace(win3);
-			state.add(22L);
-			state.add(55L);
-			state.add(33L);
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(win1);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(win3);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("abc");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("def");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("ghi");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("jkl");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("mno");
-			state.mergeNamespaces(win1, asList(win2, win3));
-			state.setCurrentNamespace(win1);
-			assertEquals(expectedResult, state.get());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
-		RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
-				new DummyEnvironment("TestTask", 1, 0),
-				new JobID(),
-				"test-op",
-				StringSerializer.INSTANCE,
-				16,
-				new KeyGroupRange(2, 3),
-				mock(TaskKvStateRegistry.class));
-
-		keyedBackend.restore(null);
-
-		return keyedBackend;
-	}
-
-	// ------------------------------------------------------------------------
-	//  test functions
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static class AddingFunction implements ReduceFunction<Long> {
-
-		@Override
-		public Long reduce(Long a, Long b)  {
-			return a + b;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f6b79b1..5b8e8aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -59,6 +59,7 @@ import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
 import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.types.IntValue;
@@ -1536,6 +1537,179 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
+	public void testReducingStateAddAndGet() throws Exception {
+
+		final ReducingStateDescriptor<Long> stateDescr =
+			new ReducingStateDescriptor<>("my-state", (a, b) -> a + b, Long.class);
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			ReducingState<Long> state =
+				keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(28L, state.get().longValue());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(28L, state.get().longValue());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(9L, state.get().longValue());
+			state.clear();
+
+			// make sure all lists / maps are cleared
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testReducingStateMerging() throws Exception {
+
+		final ReducingStateDescriptor<Long> stateDescr =
+			new ReducingStateDescriptor<>("my-state", (a, b) -> a + b, Long.class);
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		final Long expectedResult = 165L;
+
+		AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+		try {
+			final InternalReducingState<Integer, Long> state =
+				(InternalReducingState<Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(namespace2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			// make sure all lists / maps are cleared
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("ghi");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
 	@SuppressWarnings("unchecked,rawtypes")
 	public void testFoldingState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();

http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 63d2453..1aa6f63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
-import org.apache.flink.runtime.state.heap.HeapReducingStateTest;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.util.TestLogger;
@@ -49,7 +48,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 		AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			StringSerializer.INSTANCE,
-			HeapReducingStateTest.class.getClassLoader(),
+			StateSnapshotCompressionTest.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			true,
@@ -70,7 +69,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 		stateBackend = new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			StringSerializer.INSTANCE,
-			HeapReducingStateTest.class.getClassLoader(),
+			StateSnapshotCompressionTest.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			true,
@@ -109,7 +108,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 		AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			StringSerializer.INSTANCE,
-			HeapReducingStateTest.class.getClassLoader(),
+			StateSnapshotCompressionTest.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			true,
@@ -150,7 +149,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 		stateBackend = new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			StringSerializer.INSTANCE,
-			HeapReducingStateTest.class.getClassLoader(),
+			StateSnapshotCompressionTest.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			true,

http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
deleted file mode 100644
index 928eaec..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.junit.Test;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the simple Java heap objects implementation of the {@link ReducingState}.
- */
-public class HeapReducingStateTest extends HeapStateBackendTestBase {
-
-	@Test
-	public void testAddAndGet() throws Exception {
-
-		final ReducingStateDescriptor<Long> stateDescr =
-				new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
-		try {
-			InternalReducingState<VoidNamespace, Long> state =
-					keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
-			state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-			state.add(17L);
-			state.add(11L);
-			assertEquals(28L, state.get().longValue());
-
-			keyedBackend.setCurrentKey("abc");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertNull(state.get());
-			state.add(1L);
-			state.add(2L);
-
-			keyedBackend.setCurrentKey("def");
-			assertEquals(28L, state.get().longValue());
-			state.clear();
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			state.add(3L);
-			state.add(2L);
-			state.add(1L);
-
-			keyedBackend.setCurrentKey("def");
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("g");
-			assertEquals(9L, state.get().longValue());
-			state.clear();
-
-			// make sure all lists / maps are cleared
-
-			StateTable<String, VoidNamespace, Long> stateTable =
-					((HeapReducingState<String, VoidNamespace, Long>) state).stateTable;
-
-			assertTrue(stateTable.isEmpty());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	@Test
-	public void testMerging() throws Exception {
-
-		final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>(
-				"my-state", new AddingFunction(), Long.class);
-		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		final Integer namespace1 = 1;
-		final Integer namespace2 = 2;
-		final Integer namespace3 = 3;
-
-		final Long expectedResult = 165L;
-
-		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
-		try {
-			final InternalReducingState<Integer, Long> state =
-					keyedBackend.createReducingState(IntSerializer.INSTANCE, stateDescr);
-
-			// populate the different namespaces
-			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
-			//  - ghi is empty
-			//  - jkl has all elements already in the target namespace
-			//  - mno has all elements already in one source namespace
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(namespace1);
-			state.add(33L);
-			state.add(55L);
-
-			state.setCurrentNamespace(namespace2);
-			state.add(22L);
-			state.add(11L);
-
-			state.setCurrentNamespace(namespace3);
-			state.add(44L);
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(namespace1);
-			state.add(11L);
-			state.add(44L);
-
-			state.setCurrentNamespace(namespace3);
-			state.add(22L);
-			state.add(55L);
-			state.add(33L);
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(namespace1);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(namespace3);
-			state.add(11L);
-			state.add(22L);
-			state.add(33L);
-			state.add(44L);
-			state.add(55L);
-
-			keyedBackend.setCurrentKey("abc");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("def");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("ghi");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertNull(state.get());
-
-			keyedBackend.setCurrentKey("jkl");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			keyedBackend.setCurrentKey("mno");
-			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
-			state.setCurrentNamespace(namespace1);
-			assertEquals(expectedResult, state.get());
-
-			// make sure all lists / maps are cleared
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("ghi");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(namespace1);
-			state.clear();
-
-			StateTable<String, Integer, Long> stateTable =
-					((HeapReducingState<String, Integer, Long>) state).stateTable;
-
-			assertTrue(stateTable.isEmpty());
-		}
-		finally {
-			keyedBackend.close();
-			keyedBackend.dispose();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test functions
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static class AddingFunction implements ReduceFunction<Long> {
-
-		@Override
-		public Long reduce(Long a, Long b)  {
-			return a + b;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/560e5f3b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index 2136304..b10c2c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -50,7 +50,7 @@ public abstract class HeapStateBackendTestBase {
 		return new HeapKeyedStateBackend<>(
 			mock(TaskKvStateRegistry.class),
 			keySerializer,
-			HeapReducingStateTest.class.getClassLoader(),
+			HeapStateBackendTestBase.class.getClassLoader(),
 			16,
 			new KeyGroupRange(0, 15),
 			async,


[3/8] flink git commit: [FLINK-7700] Fix RocksDB ReducingState merging

Posted by al...@apache.org.
[FLINK-7700] Fix RocksDB ReducingState merging

Before, the merged state was not cleared.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6918564
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6918564
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6918564

Branch: refs/heads/master
Commit: c6918564f7c40730791b9d35d3703e3ba9fee385
Parents: f03f832
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 12:53:08 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:02:55 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/contrib/streaming/state/RocksDBReducingState.java  | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6918564/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index b5fe95f..b4c3f51 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -140,6 +140,7 @@ public class RocksDBReducingState<K, N, V>
 
 					final byte[] sourceKey = keySerializationStream.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+					backend.db.delete(columnFamily, sourceKey);
 
 					if (valueBytes != null) {
 						V value = valueSerializer.deserialize(


[2/8] flink git commit: [FLINK-5619] Add numStateEntries() method for all keyed backends

Posted by al...@apache.org.
[FLINK-5619] Add numStateEntries() method for all keyed backends

This also adds a test for this in StateBackendTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc9bc2fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc9bc2fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc9bc2fc

Branch: refs/heads/master
Commit: fc9bc2fc6d4a0a8ed0f14d37d397efab52381ae5
Parents: 2b66186
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 4 10:17:55 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:02:54 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 21 +++++++++++
 .../state/AbstractKeyedStateBackend.java        | 11 +++++-
 .../state/heap/HeapKeyedStateBackend.java       |  1 +
 .../runtime/state/MemoryStateBackendTest.java   | 38 -------------------
 .../runtime/state/StateBackendTestBase.java     | 39 +++++++++++++++++++-
 .../runtime/state/heap/HeapListStateTest.java   |  1 +
 6 files changed, 69 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f6ed87d..8236e5b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -2000,6 +2001,26 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return true;
 	}
 
+	@VisibleForTesting
+	@SuppressWarnings("unchecked")
+	@Override
+	public int numStateEntries() {
+		int count = 0;
+
+		for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
+			try (RocksIterator rocksIterator = db.newIterator(column.f0)) {
+				rocksIterator.seekToFirst();
+
+				while (rocksIterator.isValid()) {
+					count++;
+					rocksIterator.next();
+				}
+			}
+		}
+
+		return count;
+	}
+
 	private static class RocksIteratorWrapper<K> implements Iterator<K> {
 		private final RocksIterator iterator;
 		private final String state;

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 30ca22e..fea537b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -111,7 +111,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		KeyGroupRange keyGroupRange,
 		ExecutionConfig executionConfig) {
 
-		this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
+		this.kvStateRegistry = kvStateRegistry; //Preconditions.checkNotNull(kvStateRegistry);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
@@ -335,7 +335,7 @@ public abstract class AbstractKeyedStateBackend<K>
 			public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 				return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
 			}
-			
+
 			@Override
 			public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 				return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
@@ -413,4 +413,11 @@ public abstract class AbstractKeyedStateBackend<K>
 	public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
 		return keyGroupCompressionDecorator;
 	}
+
+	/**
+	 * Returns the total number of state entries across all keys/namespaces.
+	 */
+	@VisibleForTesting
+	public abstract int numStateEntries();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 28c623f..5255b7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -599,6 +599,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 */
 	@VisibleForTesting
 	@SuppressWarnings("unchecked")
+	@Override
 	public int numStateEntries() {
 		int sum = 0;
 		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index f1f0406..ed6bef4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -91,44 +91,6 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 	public void testMapStateRestoreWithWrongSerializers() {}
 
 	@Test
-	@SuppressWarnings("unchecked")
-	public void testNumStateEntries() throws Exception {
-		KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
-
-		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend;
-
-		assertEquals(0, heapBackend.numStateEntries());
-
-		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-		backend.setCurrentKey(0);
-		state.update("hello");
-		state.update("ciao");
-
-		assertEquals(1, heapBackend.numStateEntries());
-
-		backend.setCurrentKey(42);
-		state.update("foo");
-
-		assertEquals(2, heapBackend.numStateEntries());
-
-		backend.setCurrentKey(0);
-		state.clear();
-
-		assertEquals(1, heapBackend.numStateEntries());
-
-		backend.setCurrentKey(42);
-		state.clear();
-
-		assertEquals(0, heapBackend.numStateEntries());
-
-		backend.dispose();
-	}
-
-	@Test
 	public void testOversizedState() {
 		try {
 			MemoryStateBackend backend = new MemoryStateBackend(10);

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7dd652c..9f8136c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -494,7 +494,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		// ====================================== restore snapshot  ======================================
 
 		env.getExecutionConfig().registerKryoType(TestPojo.class);
-		
+
 		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
 
 		snapshot.discardState();
@@ -864,7 +864,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		// this is only available after the backend initialized the serializer
 		TypeSerializer<String> valueSerializer = kvId.getSerializer();
-		
+
 		// some modifications to the state
 		backend.setCurrentKey(1);
 		assertNull(state.value());
@@ -2348,6 +2348,41 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testNumStateEntries() throws Exception {
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
+
+		assertEquals(0, backend.numStateEntries());
+
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(0);
+		state.update("hello");
+		state.update("ciao");
+
+		assertEquals(1, backend.numStateEntries());
+
+		backend.setCurrentKey(42);
+		state.update("foo");
+
+		assertEquals(2, backend.numStateEntries());
+
+		backend.setCurrentKey(0);
+		state.clear();
+
+		assertEquals(1, backend.numStateEntries());
+
+		backend.setCurrentKey(42);
+		state.clear();
+
+		assertEquals(0, backend.numStateEntries());
+
+		backend.dispose();
+	}
+
 	private static class AppendingReduce implements ReduceFunction<String> {
 		@Override
 		public String reduce(String value1, String value2) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 7705c19..affbf75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -214,6 +214,7 @@ public class HeapListStateTest extends HeapStateBackendTestBase {
 					((HeapListState<String, Integer, Long>) state).stateTable;
 
 			assertTrue(stateTable.isEmpty());
+			assertEquals(0, keyedBackend.numStateEntries());
 		}
 		finally {
 			keyedBackend.close();