You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/28 15:09:07 UTC
[2/8] flink git commit: [FLINK-5619] Add numStateEntries() method for
all keyed backends
[FLINK-5619] Add numStateEntries() method for all keyed backends
This also adds a test for this in StateBackendTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc9bc2fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc9bc2fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc9bc2fc
Branch: refs/heads/master
Commit: fc9bc2fc6d4a0a8ed0f14d37d397efab52381ae5
Parents: 2b66186
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 4 10:17:55 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:02:54 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 21 +++++++++++
.../state/AbstractKeyedStateBackend.java | 11 +++++-
.../state/heap/HeapKeyedStateBackend.java | 1 +
.../runtime/state/MemoryStateBackendTest.java | 38 -------------------
.../runtime/state/StateBackendTestBase.java | 39 +++++++++++++++++++-
.../runtime/state/heap/HeapListStateTest.java | 1 +
6 files changed, 69 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f6ed87d..8236e5b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -17,6 +17,7 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -2000,6 +2001,26 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return true;
}
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ @Override
+ public int numStateEntries() {
+ int count = 0;
+
+ for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
+ try (RocksIterator rocksIterator = db.newIterator(column.f0)) {
+ rocksIterator.seekToFirst();
+
+ while (rocksIterator.isValid()) {
+ count++;
+ rocksIterator.next();
+ }
+ }
+ }
+
+ return count;
+ }
+
private static class RocksIteratorWrapper<K> implements Iterator<K> {
private final RocksIterator iterator;
private final String state;
http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 30ca22e..fea537b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -111,7 +111,7 @@ public abstract class AbstractKeyedStateBackend<K>
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig) {
- this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
+ this.kvStateRegistry = kvStateRegistry; //Preconditions.checkNotNull(kvStateRegistry);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
@@ -335,7 +335,7 @@ public abstract class AbstractKeyedStateBackend<K>
public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
}
-
+
@Override
public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
@@ -413,4 +413,11 @@ public abstract class AbstractKeyedStateBackend<K>
public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
return keyGroupCompressionDecorator;
}
+
+ /**
+ * Returns the total number of state entries across all keys/namespaces.
+ */
+ @VisibleForTesting
+ public abstract int numStateEntries();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 28c623f..5255b7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -599,6 +599,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
@VisibleForTesting
@SuppressWarnings("unchecked")
+ @Override
public int numStateEntries() {
int sum = 0;
for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index f1f0406..ed6bef4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -91,44 +91,6 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
public void testMapStateRestoreWithWrongSerializers() {}
@Test
- @SuppressWarnings("unchecked")
- public void testNumStateEntries() throws Exception {
- KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
-
- ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
- HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend;
-
- assertEquals(0, heapBackend.numStateEntries());
-
- ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
- backend.setCurrentKey(0);
- state.update("hello");
- state.update("ciao");
-
- assertEquals(1, heapBackend.numStateEntries());
-
- backend.setCurrentKey(42);
- state.update("foo");
-
- assertEquals(2, heapBackend.numStateEntries());
-
- backend.setCurrentKey(0);
- state.clear();
-
- assertEquals(1, heapBackend.numStateEntries());
-
- backend.setCurrentKey(42);
- state.clear();
-
- assertEquals(0, heapBackend.numStateEntries());
-
- backend.dispose();
- }
-
- @Test
public void testOversizedState() {
try {
MemoryStateBackend backend = new MemoryStateBackend(10);
http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7dd652c..9f8136c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -494,7 +494,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// ====================================== restore snapshot ======================================
env.getExecutionConfig().registerKryoType(TestPojo.class);
-
+
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
snapshot.discardState();
@@ -864,7 +864,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// this is only available after the backend initialized the serializer
TypeSerializer<String> valueSerializer = kvId.getSerializer();
-
+
// some modifications to the state
backend.setCurrentKey(1);
assertNull(state.value());
@@ -2348,6 +2348,41 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testNumStateEntries() throws Exception {
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
+
+ assertEquals(0, backend.numStateEntries());
+
+ ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(0);
+ state.update("hello");
+ state.update("ciao");
+
+ assertEquals(1, backend.numStateEntries());
+
+ backend.setCurrentKey(42);
+ state.update("foo");
+
+ assertEquals(2, backend.numStateEntries());
+
+ backend.setCurrentKey(0);
+ state.clear();
+
+ assertEquals(1, backend.numStateEntries());
+
+ backend.setCurrentKey(42);
+ state.clear();
+
+ assertEquals(0, backend.numStateEntries());
+
+ backend.dispose();
+ }
+
private static class AppendingReduce implements ReduceFunction<String> {
@Override
public String reduce(String value1, String value2) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/fc9bc2fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 7705c19..affbf75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -214,6 +214,7 @@ public class HeapListStateTest extends HeapStateBackendTestBase {
((HeapListState<String, Integer, Long>) state).stateTable;
assertTrue(stateTable.isEmpty());
+ assertEquals(0, keyedBackend.numStateEntries());
}
finally {
keyedBackend.close();