You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/28 15:09:07 UTC

[2/8] flink git commit: [FLINK-5619] Add numStateEntries() method for all keyed backends

[FLINK-5619] Add numStateEntries() method for all keyed backends

This also adds a test for this in StateBackendTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc9bc2fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc9bc2fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc9bc2fc

Branch: refs/heads/master
Commit: fc9bc2fc6d4a0a8ed0f14d37d397efab52381ae5
Parents: 2b66186
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 4 10:17:55 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:02:54 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 21 +++++++++++
 .../state/AbstractKeyedStateBackend.java        | 11 +++++-
 .../state/heap/HeapKeyedStateBackend.java       |  1 +
 .../runtime/state/MemoryStateBackendTest.java   | 38 -------------------
 .../runtime/state/StateBackendTestBase.java     | 39 +++++++++++++++++++-
 .../runtime/state/heap/HeapListStateTest.java   |  1 +
 6 files changed, 69 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f6ed87d..8236e5b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -2000,6 +2001,26 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return true;
 	}
 
+	@VisibleForTesting
+	@SuppressWarnings("unchecked")
+	@Override
+	public int numStateEntries() {
+		int count = 0;
+
+		for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
+			try (RocksIterator rocksIterator = db.newIterator(column.f0)) {
+				rocksIterator.seekToFirst();
+
+				while (rocksIterator.isValid()) {
+					count++;
+					rocksIterator.next();
+				}
+			}
+		}
+
+		return count;
+	}
+
 	private static class RocksIteratorWrapper<K> implements Iterator<K> {
 		private final RocksIterator iterator;
 		private final String state;

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 30ca22e..fea537b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -111,7 +111,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		KeyGroupRange keyGroupRange,
 		ExecutionConfig executionConfig) {
 
-		this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
+		this.kvStateRegistry = kvStateRegistry; //Preconditions.checkNotNull(kvStateRegistry);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
@@ -335,7 +335,7 @@ public abstract class AbstractKeyedStateBackend<K>
 			public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 				return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
 			}
-			
+
 			@Override
 			public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 				return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
@@ -413,4 +413,11 @@ public abstract class AbstractKeyedStateBackend<K>
 	public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
 		return keyGroupCompressionDecorator;
 	}
+
+	/**
+	 * Returns the total number of state entries across all keys/namespaces.
+	 */
+	@VisibleForTesting
+	public abstract int numStateEntries();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 28c623f..5255b7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -599,6 +599,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 */
 	@VisibleForTesting
 	@SuppressWarnings("unchecked")
+	@Override
 	public int numStateEntries() {
 		int sum = 0;
 		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index f1f0406..ed6bef4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -91,44 +91,6 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 	public void testMapStateRestoreWithWrongSerializers() {}
 
 	@Test
-	@SuppressWarnings("unchecked")
-	public void testNumStateEntries() throws Exception {
-		KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
-
-		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend;
-
-		assertEquals(0, heapBackend.numStateEntries());
-
-		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-		backend.setCurrentKey(0);
-		state.update("hello");
-		state.update("ciao");
-
-		assertEquals(1, heapBackend.numStateEntries());
-
-		backend.setCurrentKey(42);
-		state.update("foo");
-
-		assertEquals(2, heapBackend.numStateEntries());
-
-		backend.setCurrentKey(0);
-		state.clear();
-
-		assertEquals(1, heapBackend.numStateEntries());
-
-		backend.setCurrentKey(42);
-		state.clear();
-
-		assertEquals(0, heapBackend.numStateEntries());
-
-		backend.dispose();
-	}
-
-	@Test
 	public void testOversizedState() {
 		try {
 			MemoryStateBackend backend = new MemoryStateBackend(10);

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7dd652c..9f8136c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -494,7 +494,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		// ====================================== restore snapshot  ======================================
 
 		env.getExecutionConfig().registerKryoType(TestPojo.class);
-		
+
 		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
 
 		snapshot.discardState();
@@ -864,7 +864,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		// this is only available after the backend initialized the serializer
 		TypeSerializer<String> valueSerializer = kvId.getSerializer();
-		
+
 		// some modifications to the state
 		backend.setCurrentKey(1);
 		assertNull(state.value());
@@ -2348,6 +2348,41 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 	}
 
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testNumStateEntries() throws Exception {
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
+
+		assertEquals(0, backend.numStateEntries());
+
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(0);
+		state.update("hello");
+		state.update("ciao");
+
+		assertEquals(1, backend.numStateEntries());
+
+		backend.setCurrentKey(42);
+		state.update("foo");
+
+		assertEquals(2, backend.numStateEntries());
+
+		backend.setCurrentKey(0);
+		state.clear();
+
+		assertEquals(1, backend.numStateEntries());
+
+		backend.setCurrentKey(42);
+		state.clear();
+
+		assertEquals(0, backend.numStateEntries());
+
+		backend.dispose();
+	}
+
 	private static class AppendingReduce implements ReduceFunction<String> {
 		@Override
 		public String reduce(String value1, String value2) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 7705c19..affbf75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -214,6 +214,7 @@ public class HeapListStateTest extends HeapStateBackendTestBase {
 					((HeapListState<String, Integer, Long>) state).stateTable;
 
 			assertTrue(stateTable.isEmpty());
+			assertEquals(0, keyedBackend.numStateEntries());
 		}
 		finally {
 			keyedBackend.close();