You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/02 08:19:36 UTC
[1/7] flink git commit: [FLINK-5619] Add numStateEntries() method for
all keyed backends
Repository: flink
Updated Branches:
refs/heads/release-1.3 566b7471d -> 43d76f008
[FLINK-5619] Add numStateEntries() method for all keyed backends
This also adds a test for this in StateBackendTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64858a78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64858a78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64858a78
Branch: refs/heads/release-1.3
Commit: 64858a7881d22fb665099e1b2cd8363e447ed572
Parents: 566b747
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 4 10:17:55 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:12:30 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 21 +++++++++++
.../state/AbstractKeyedStateBackend.java | 11 +++++-
.../state/heap/HeapKeyedStateBackend.java | 1 +
.../runtime/state/MemoryStateBackendTest.java | 38 -------------------
.../runtime/state/StateBackendTestBase.java | 39 +++++++++++++++++++-
.../runtime/state/heap/HeapListStateTest.java | 1 +
6 files changed, 69 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/64858a78/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 7e0910e..14ed0f9 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -17,6 +17,7 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -1996,4 +1997,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public boolean supportsAsynchronousSnapshots() {
return true;
}
+
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ @Override
+ public int numStateEntries() {
+ int count = 0;
+
+ for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
+ try (RocksIterator rocksIterator = db.newIterator(column.f0)) {
+ rocksIterator.seekToFirst();
+
+ while (rocksIterator.isValid()) {
+ count++;
+ rocksIterator.next();
+ }
+ }
+ }
+
+ return count;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64858a78/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 2b225df..1c51f39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -106,7 +106,7 @@ public abstract class AbstractKeyedStateBackend<K>
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig) {
- this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
+ this.kvStateRegistry = kvStateRegistry; //Preconditions.checkNotNull(kvStateRegistry);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
@@ -321,7 +321,7 @@ public abstract class AbstractKeyedStateBackend<K>
public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
}
-
+
@Override
public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
@@ -394,4 +394,11 @@ public abstract class AbstractKeyedStateBackend<K>
public boolean supportsAsynchronousSnapshots() {
return false;
}
+
+ /**
+ * Returns the total number of state entries across all keys/namespaces.
+ */
+ @VisibleForTesting
+ public abstract int numStateEntries();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64858a78/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 9bb5beb..003e69a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -589,6 +589,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
@VisibleForTesting
@SuppressWarnings("unchecked")
+ @Override
public int numStateEntries() {
int sum = 0;
for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/64858a78/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index f1f0406..ed6bef4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -91,44 +91,6 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
public void testMapStateRestoreWithWrongSerializers() {}
@Test
- @SuppressWarnings("unchecked")
- public void testNumStateEntries() throws Exception {
- KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
-
- ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
- kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
- HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend;
-
- assertEquals(0, heapBackend.numStateEntries());
-
- ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
- backend.setCurrentKey(0);
- state.update("hello");
- state.update("ciao");
-
- assertEquals(1, heapBackend.numStateEntries());
-
- backend.setCurrentKey(42);
- state.update("foo");
-
- assertEquals(2, heapBackend.numStateEntries());
-
- backend.setCurrentKey(0);
- state.clear();
-
- assertEquals(1, heapBackend.numStateEntries());
-
- backend.setCurrentKey(42);
- state.clear();
-
- assertEquals(0, heapBackend.numStateEntries());
-
- backend.dispose();
- }
-
- @Test
public void testOversizedState() {
try {
MemoryStateBackend backend = new MemoryStateBackend(10);
http://git-wip-us.apache.org/repos/asf/flink/blob/64858a78/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f08ad2d..57d8b0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -455,7 +455,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// ====================================== restore snapshot ======================================
env.getExecutionConfig().registerKryoType(TestPojo.class);
-
+
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
snapshot.discardState();
@@ -804,7 +804,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
// this is only available after the backend initialized the serializer
TypeSerializer<String> valueSerializer = kvId.getSerializer();
-
+
// some modifications to the state
backend.setCurrentKey(1);
assertNull(state.value());
@@ -2288,6 +2288,41 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testNumStateEntries() throws Exception {
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
+
+ assertEquals(0, backend.numStateEntries());
+
+ ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+ backend.setCurrentKey(0);
+ state.update("hello");
+ state.update("ciao");
+
+ assertEquals(1, backend.numStateEntries());
+
+ backend.setCurrentKey(42);
+ state.update("foo");
+
+ assertEquals(2, backend.numStateEntries());
+
+ backend.setCurrentKey(0);
+ state.clear();
+
+ assertEquals(1, backend.numStateEntries());
+
+ backend.setCurrentKey(42);
+ state.clear();
+
+ assertEquals(0, backend.numStateEntries());
+
+ backend.dispose();
+ }
+
private static class AppendingReduce implements ReduceFunction<String> {
@Override
public String reduce(String value1, String value2) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/64858a78/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 7705c19..affbf75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -214,6 +214,7 @@ public class HeapListStateTest extends HeapStateBackendTestBase {
((HeapListState<String, Integer, Long>) state).stateTable;
assertTrue(stateTable.isEmpty());
+ assertEquals(0, keyedBackend.numStateEntries());
}
finally {
keyedBackend.close();
[2/7] flink git commit: [FLINK-7700] Fix RocksDB ListState merging
Posted by al...@apache.org.
[FLINK-7700] Fix RocksDB ListState merging
Before, the merged state was not cleared.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31cd6db6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31cd6db6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31cd6db6
Branch: refs/heads/release-1.3
Commit: 31cd6db6a7d2f7d78aa05f6ff5fc82abbc79d042
Parents: 64858a7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 11:37:09 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:26:11 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/contrib/streaming/state/RocksDBListState.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/31cd6db6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index a8b20d1..486fa62 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -146,6 +146,7 @@ public class RocksDBListState<K, N, V>
byte[] sourceKey = keySerializationStream.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+ backend.db.remove(columnFamily, sourceKey);
if (valueBytes != null) {
backend.db.merge(columnFamily, writeOptions, targetKey, valueBytes);
[6/7] flink git commit: [FLINK-7700] Fix RocksDB AggregatingState
merging
Posted by al...@apache.org.
[FLINK-7700] Fix RocksDB AggregatingState merging
Before, the merged state was not cleared.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a7c091
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a7c091
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a7c091
Branch: refs/heads/release-1.3
Commit: b0a7c091856aa2a99715a9fddcc801f660414cfd
Parents: 0ffca86
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 13:01:55 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:26:29 2017 +0200
----------------------------------------------------------------------
.../flink/contrib/streaming/state/RocksDBAggregatingState.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7c091/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 1f306b4..c72b94e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -157,6 +157,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
final byte[] sourceKey = keySerializationStream.toByteArray();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+ backend.db.remove(columnFamily, sourceKey);
if (valueBytes != null) {
ACC value = valueSerializer.deserialize(
[4/7] flink git commit: [FLINK-7700] Fix RocksDB ReducingState merging
Posted by al...@apache.org.
[FLINK-7700] Fix RocksDB ReducingState merging
Before, the merged state was not cleared.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0bd8c06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0bd8c06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0bd8c06
Branch: refs/heads/release-1.3
Commit: e0bd8c060da01e5defe528964bf638868cbc13d5
Parents: 5e3d49c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 12:53:08 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:26:12 2017 +0200
----------------------------------------------------------------------
.../apache/flink/contrib/streaming/state/RocksDBReducingState.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0bd8c06/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index ccc98a7..9bc6fb9 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -140,6 +140,7 @@ public class RocksDBReducingState<K, N, V>
final byte[] sourceKey = keySerializationStream.toByteArray();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+ backend.db.remove(columnFamily, sourceKey);
if (valueBytes != null) {
V value = valueSerializer.deserialize(
[3/7] flink git commit: [FLINK-5619] Consolidate ReducingState Tests
in StateBackendTestBase
Posted by al...@apache.org.
[FLINK-5619] Consolidate ReducingState Tests in StateBackendTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ffca863
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ffca863
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ffca863
Branch: refs/heads/release-1.3
Commit: 0ffca863a4b189675d2f15ee61f9d20fe49e85f0
Parents: e0bd8c0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 12:54:19 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:26:12 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBReducingStateTest.java | 239 -------------------
.../runtime/state/StateBackendTestBase.java | 184 ++++++++++++++
.../state/heap/HeapReducingStateTest.java | 236 ------------------
.../state/heap/HeapStateBackendTestBase.java | 4 +-
4 files changed, 186 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0ffca863/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
deleted file mode 100644
index a8b4535..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the {@link ReducingState} implementation on top of RocksDB.
- */
-public class RocksDBReducingStateTest {
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testAddAndGet() throws Exception {
-
- final ReducingStateDescriptor<Long> stateDescr =
- new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
- backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
- final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
- try {
- InternalReducingState<VoidNamespace, Long> state =
- keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
- state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
- state.add(17L);
- state.add(11L);
- assertEquals(28L, state.get().longValue());
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertNull(state.get());
- state.add(1L);
- state.add(2L);
-
- keyedBackend.setCurrentKey("def");
- assertEquals(28L, state.get().longValue());
- state.clear();
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- state.add(3L);
- state.add(2L);
- state.add(1L);
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertEquals(9L, state.get().longValue());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- @Test
- public void testMerging() throws Exception {
-
- final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>(
- "my-state", new AddingFunction(), Long.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final TimeWindow win1 = new TimeWindow(1000, 2000);
- final TimeWindow win2 = new TimeWindow(2000, 3000);
- final TimeWindow win3 = new TimeWindow(3000, 4000);
-
- final Long expectedResult = 165L;
-
- final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
- backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
- final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
- try {
- final InternalReducingState<TimeWindow, Long> state =
- keyedBackend.createReducingState(new TimeWindow.Serializer(), stateDescr);
-
- // populate the different namespaces
- // - abc spreads the values over three namespaces
- // - def spreads teh values over two namespaces (one empty)
- // - ghi is empty
- // - jkl has all elements already in the target namespace
- // - mno has all elements already in one source namespace
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(win1);
- state.add(33L);
- state.add(55L);
-
- state.setCurrentNamespace(win2);
- state.add(22L);
- state.add(11L);
-
- state.setCurrentNamespace(win3);
- state.add(44L);
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(win1);
- state.add(11L);
- state.add(44L);
-
- state.setCurrentNamespace(win3);
- state.add(22L);
- state.add(55L);
- state.add(33L);
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(win1);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(win3);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("abc");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("def");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("ghi");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("jkl");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("mno");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- // ------------------------------------------------------------------------
- // utilities
- // ------------------------------------------------------------------------
-
- private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
- RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
- new DummyEnvironment("TestTask", 1, 0),
- new JobID(),
- "test-op",
- StringSerializer.INSTANCE,
- 16,
- new KeyGroupRange(2, 3),
- mock(TaskKvStateRegistry.class));
-
- keyedBackend.restore(null);
-
- return keyedBackend;
- }
-
- // ------------------------------------------------------------------------
- // test functions
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static class AddingFunction implements ReduceFunction<Long> {
-
- @Override
- public Long reduce(Long a, Long b) {
- return a + b;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ffca863/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 8e0f5eb..b47ae37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.types.IntValue;
@@ -1476,6 +1477,189 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
+ public void testReducingStateAddAndGet() throws Exception {
+
+ final ReducingStateDescriptor<Long> stateDescr =
+ new ReducingStateDescriptor<>("my-state", new ReduceFunction<Long>() {
+ @Override
+ public Long reduce(Long a, Long b) throws Exception {
+ return a + b;
+ }
+ }, Long.class);
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ try {
+ ReducingState<Long> state =
+ keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+ state.add(17L);
+ state.add(11L);
+ assertEquals(28L, state.get().longValue());
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertNull(state.get());
+ state.add(1L);
+ state.add(2L);
+
+ keyedBackend.setCurrentKey("def");
+ assertEquals(28L, state.get().longValue());
+ state.clear();
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ state.add(3L);
+ state.add(2L);
+ state.add(1L);
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertEquals(9L, state.get().longValue());
+ state.clear();
+
+ // make sure all lists / maps are cleared
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testReducingStateMerging() throws Exception {
+
+ final ReducingStateDescriptor<Long> stateDescr =
+ new ReducingStateDescriptor<>("my-state", new ReduceFunction<Long>() {
+ @Override
+ public Long reduce(Long a, Long b) throws Exception {
+ return a + b;
+ }
+ }, Long.class);
+
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ final Long expectedResult = 165L;
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ try {
+ final InternalReducingState<Integer, Long> state =
+ (InternalReducingState<Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+ // populate the different namespaces
+ // - abc spreads the values over three namespaces
+ // - def spreads teh values over two namespaces (one empty)
+ // - ghi is empty
+ // - jkl has all elements already in the target namespace
+ // - mno has all elements already in one source namespace
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.add(33L);
+ state.add(55L);
+
+ state.setCurrentNamespace(namespace2);
+ state.add(22L);
+ state.add(11L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(44L);
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(44L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(22L);
+ state.add(55L);
+ state.add(33L);
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("abc");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("def");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("ghi");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("mno");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ // make sure all lists / maps are cleared
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("ghi");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
@SuppressWarnings("unchecked,rawtypes")
public void testFoldingState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
http://git-wip-us.apache.org/repos/asf/flink/blob/0ffca863/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
deleted file mode 100644
index 928eaec..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.junit.Test;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the simple Java heap objects implementation of the {@link ReducingState}.
- */
-public class HeapReducingStateTest extends HeapStateBackendTestBase {
-
- @Test
- public void testAddAndGet() throws Exception {
-
- final ReducingStateDescriptor<Long> stateDescr =
- new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
- try {
- InternalReducingState<VoidNamespace, Long> state =
- keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
- state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
- state.add(17L);
- state.add(11L);
- assertEquals(28L, state.get().longValue());
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertNull(state.get());
- state.add(1L);
- state.add(2L);
-
- keyedBackend.setCurrentKey("def");
- assertEquals(28L, state.get().longValue());
- state.clear();
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- state.add(3L);
- state.add(2L);
- state.add(1L);
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertEquals(9L, state.get().longValue());
- state.clear();
-
- // make sure all lists / maps are cleared
-
- StateTable<String, VoidNamespace, Long> stateTable =
- ((HeapReducingState<String, VoidNamespace, Long>) state).stateTable;
-
- assertTrue(stateTable.isEmpty());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- @Test
- public void testMerging() throws Exception {
-
- final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>(
- "my-state", new AddingFunction(), Long.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final Integer namespace1 = 1;
- final Integer namespace2 = 2;
- final Integer namespace3 = 3;
-
- final Long expectedResult = 165L;
-
- final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
- try {
- final InternalReducingState<Integer, Long> state =
- keyedBackend.createReducingState(IntSerializer.INSTANCE, stateDescr);
-
- // populate the different namespaces
- // - abc spreads the values over three namespaces
- // - def spreads teh values over two namespaces (one empty)
- // - ghi is empty
- // - jkl has all elements already in the target namespace
- // - mno has all elements already in one source namespace
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(namespace1);
- state.add(33L);
- state.add(55L);
-
- state.setCurrentNamespace(namespace2);
- state.add(22L);
- state.add(11L);
-
- state.setCurrentNamespace(namespace3);
- state.add(44L);
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(namespace1);
- state.add(11L);
- state.add(44L);
-
- state.setCurrentNamespace(namespace3);
- state.add(22L);
- state.add(55L);
- state.add(33L);
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(namespace1);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(namespace3);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("abc");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("def");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("ghi");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("jkl");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("mno");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- // make sure all lists / maps are cleared
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("ghi");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- StateTable<String, Integer, Long> stateTable =
- ((HeapReducingState<String, Integer, Long>) state).stateTable;
-
- assertTrue(stateTable.isEmpty());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- // ------------------------------------------------------------------------
- // test functions
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static class AddingFunction implements ReduceFunction<Long> {
-
- @Override
- public Long reduce(Long a, Long b) {
- return a + b;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ffca863/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index e6adef8..75bd2b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -45,10 +45,10 @@ public abstract class HeapStateBackendTestBase {
return new HeapKeyedStateBackend<>(
mock(TaskKvStateRegistry.class),
StringSerializer.INSTANCE,
- HeapReducingStateTest.class.getClassLoader(),
+ HeapStateBackendTestBase.class.getClassLoader(),
16,
new KeyGroupRange(0, 15),
async,
new ExecutionConfig());
}
-}
\ No newline at end of file
+}
[7/7] flink git commit: [FLINK-5619] Consolidate AggregatingState
Tests in StateBackendTestBase
Posted by al...@apache.org.
[FLINK-5619] Consolidate AggregatingState Tests in StateBackendTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43d76f00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43d76f00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43d76f00
Branch: refs/heads/release-1.3
Commit: 43d76f008e21d1269ea42d4d5b2ea7f40f6076e9
Parents: b0a7c09
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 13:02:57 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 2 08:39:55 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBAggregatingStateTest.java | 252 ------------------
.../runtime/state/StateBackendTestBase.java | 206 +++++++++++++++
.../state/heap/HeapAggregatingStateTest.java | 256 -------------------
3 files changed, 206 insertions(+), 508 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/43d76f00/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
deleted file mode 100644
index 1b65466..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static java.util.Arrays.asList;
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-/**
- * Tests for the {@link InternalAggregatingState} implementation on top of RocksDB.
- */
-public class RocksDBAggregatingStateTest {
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testAddAndGet() throws Exception {
-
- final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
- backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
- final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
- try {
- InternalAggregatingState<VoidNamespace, Long, Long> state =
- keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
- state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
- state.add(17L);
- state.add(11L);
- assertEquals(28L, state.get().longValue());
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertNull(state.get());
- state.add(1L);
- state.add(2L);
-
- keyedBackend.setCurrentKey("def");
- assertEquals(28L, state.get().longValue());
- state.clear();
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- state.add(3L);
- state.add(2L);
- state.add(1L);
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertEquals(9L, state.get().longValue());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- @Test
- public void testMerging() throws Exception {
-
- final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final TimeWindow win1 = new TimeWindow(1000, 2000);
- final TimeWindow win2 = new TimeWindow(2000, 3000);
- final TimeWindow win3 = new TimeWindow(3000, 4000);
-
- final Long expectedResult = 165L;
-
- final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
- backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
- final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
- try {
- InternalAggregatingState<TimeWindow, Long, Long> state =
- keyedBackend.createAggregatingState(new TimeWindow.Serializer(), stateDescr);
-
- // populate the different namespaces
- // - abc spreads the values over three namespaces
- // - def spreads teh values over two namespaces (one empty)
- // - ghi is empty
- // - jkl has all elements already in the target namespace
- // - mno has all elements already in one source namespace
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(win1);
- state.add(33L);
- state.add(55L);
-
- state.setCurrentNamespace(win2);
- state.add(22L);
- state.add(11L);
-
- state.setCurrentNamespace(win3);
- state.add(44L);
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(win1);
- state.add(11L);
- state.add(44L);
-
- state.setCurrentNamespace(win3);
- state.add(22L);
- state.add(55L);
- state.add(33L);
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(win1);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(win3);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("abc");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("def");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("ghi");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("jkl");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("mno");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
- RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
- new DummyEnvironment("TestTask", 1, 0),
- new JobID(),
- "test-op",
- StringSerializer.INSTANCE,
- 16,
- new KeyGroupRange(2, 3),
- mock(TaskKvStateRegistry.class));
-
- keyedBackend.restore(null);
-
- return keyedBackend;
- }
-
- // test functions
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
-
- @Override
- public MutableLong createAccumulator() {
- return new MutableLong();
- }
-
- @Override
- public void add(Long value, MutableLong accumulator) {
- accumulator.value += value;
- }
-
- @Override
- public Long getResult(MutableLong accumulator) {
- return accumulator.value;
- }
-
- @Override
- public MutableLong merge(MutableLong a, MutableLong b) {
- a.value += b.value;
- return a;
- }
- }
-
- private static final class MutableLong {
- long value;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/43d76f00/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index b47ae37..36286fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -24,8 +24,11 @@ import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Joiner;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
@@ -62,6 +65,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -1660,6 +1664,179 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
+ public void testAggregatingStateAddAndGet() throws Exception {
+
+ final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ try {
+ AggregatingState<Long, Long> state =
+ keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+ state.add(17L);
+ state.add(11L);
+ assertEquals(28L, state.get().longValue());
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertNull(state.get());
+ state.add(1L);
+ state.add(2L);
+
+ keyedBackend.setCurrentKey("def");
+ assertEquals(28L, state.get().longValue());
+ state.clear();
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ state.add(3L);
+ state.add(2L);
+ state.add(1L);
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertEquals(9L, state.get().longValue());
+ state.clear();
+
+ // make sure all lists / maps are cleared
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testAggregatingStateMerging() throws Exception {
+
+ final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ final Long expectedResult = 165L;
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ try {
+ InternalAggregatingState<Integer, Long, Long> state =
+ (InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+ // populate the different namespaces
+ // - abc spreads the values over three namespaces
+ // - def spreads teh values over two namespaces (one empty)
+ // - ghi is empty
+ // - jkl has all elements already in the target namespace
+ // - mno has all elements already in one source namespace
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.add(33L);
+ state.add(55L);
+
+ state.setCurrentNamespace(namespace2);
+ state.add(22L);
+ state.add(11L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(44L);
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(44L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(22L);
+ state.add(55L);
+ state.add(33L);
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("abc");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("def");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("ghi");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("mno");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ // make sure all lists / maps are cleared
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("ghi");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
@SuppressWarnings("unchecked,rawtypes")
public void testFoldingState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
@@ -3160,4 +3337,33 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
throw new ExpectedKryoTestException();
}
}
+
+ @SuppressWarnings("serial")
+ private static class AggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+
+ @Override
+ public MutableLong createAccumulator() {
+ return new MutableLong();
+ }
+
+ @Override
+ public void add(Long value, MutableLong accumulator) {
+ accumulator.value += value;
+ }
+
+ @Override
+ public Long getResult(MutableLong accumulator) {
+ return accumulator.value;
+ }
+
+ @Override
+ public MutableLong merge(MutableLong a, MutableLong b) {
+ a.value += b.value;
+ return a;
+ }
+ }
+
+ private static final class MutableLong {
+ long value;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/43d76f00/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
deleted file mode 100644
index cb4e403..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.junit.Test;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the simple Java heap objects implementation of the {@link AggregatingState}.
- */
-public class HeapAggregatingStateTest extends HeapStateBackendTestBase {
-
- @Test
- public void testAddAndGet() throws Exception {
-
- final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
- try {
- InternalAggregatingState<VoidNamespace, Long, Long> state =
- keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
- state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
- state.add(17L);
- state.add(11L);
- assertEquals(28L, state.get().longValue());
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertNull(state.get());
- state.add(1L);
- state.add(2L);
-
- keyedBackend.setCurrentKey("def");
- assertEquals(28L, state.get().longValue());
- state.clear();
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- state.add(3L);
- state.add(2L);
- state.add(1L);
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertEquals(9L, state.get().longValue());
- state.clear();
-
- // make sure all lists / maps are cleared
-
- StateTable<String, VoidNamespace, MutableLong> stateTable =
- ((HeapAggregatingState<String, VoidNamespace, Long, MutableLong, Long>) state).stateTable;
-
- assertTrue(stateTable.isEmpty());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- @Test
- public void testMerging() throws Exception {
-
- final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final Integer namespace1 = 1;
- final Integer namespace2 = 2;
- final Integer namespace3 = 3;
-
- final Long expectedResult = 165L;
-
- final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
- try {
- InternalAggregatingState<Integer, Long, Long> state =
- keyedBackend.createAggregatingState(IntSerializer.INSTANCE, stateDescr);
-
- // populate the different namespaces
- // - abc spreads the values over three namespaces
- // - def spreads teh values over two namespaces (one empty)
- // - ghi is empty
- // - jkl has all elements already in the target namespace
- // - mno has all elements already in one source namespace
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(namespace1);
- state.add(33L);
- state.add(55L);
-
- state.setCurrentNamespace(namespace2);
- state.add(22L);
- state.add(11L);
-
- state.setCurrentNamespace(namespace3);
- state.add(44L);
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(namespace1);
- state.add(11L);
- state.add(44L);
-
- state.setCurrentNamespace(namespace3);
- state.add(22L);
- state.add(55L);
- state.add(33L);
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(namespace1);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(namespace3);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("abc");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("def");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("ghi");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("jkl");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("mno");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- // make sure all lists / maps are cleared
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("ghi");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- StateTable<String, Integer, MutableLong> stateTable =
- ((HeapAggregatingState<String, Integer, Long, MutableLong, Long>) state).stateTable;
-
- assertTrue(stateTable.isEmpty());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- // ------------------------------------------------------------------------
- // test functions
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
-
- @Override
- public MutableLong createAccumulator() {
- return new MutableLong();
- }
-
- @Override
- public void add(Long value, MutableLong accumulator) {
- accumulator.value += value;
- }
-
- @Override
- public Long getResult(MutableLong accumulator) {
- return accumulator.value;
- }
-
- @Override
- public MutableLong merge(MutableLong a, MutableLong b) {
- a.value += b.value;
- return a;
- }
- }
-
- private static final class MutableLong {
- long value;
- }
-}
[5/7] flink git commit: [FLINK-5619] Consolidate ListState Tests in
StateBackendTestBase
Posted by al...@apache.org.
[FLINK-5619] Consolidate ListState Tests in StateBackendTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e3d49cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e3d49cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e3d49cc
Branch: refs/heads/release-1.3
Commit: 5e3d49cc05591bf396bfcf9e578f8e30ce436114
Parents: 31cd6db
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 11:38:13 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:26:12 2017 +0200
----------------------------------------------------------------------
.../streaming/state/RocksDBListStateTest.java | 236 -------------------
.../runtime/state/StateBackendTestBase.java | 173 ++++++++++++++
.../runtime/state/heap/HeapListStateTest.java | 234 ------------------
3 files changed, 173 insertions(+), 470 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5e3d49cc/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
deleted file mode 100644
index e7efcfa..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the {@link ListState} implementation on top of RocksDB.
- */
-public class RocksDBListStateTest {
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testAddAndGet() throws Exception {
-
- final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
- backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
- final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
- try {
- InternalListState<VoidNamespace, Long> state =
- keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
- state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
- state.add(17L);
- state.add(11L);
- assertEquals(asList(17L, 11L), state.get());
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertNull(state.get());
- state.add(1L);
- state.add(2L);
-
- keyedBackend.setCurrentKey("def");
- assertEquals(asList(17L, 11L), state.get());
- state.clear();
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- state.add(3L);
- state.add(2L);
- state.add(1L);
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- @Test
- public void testMerging() throws Exception {
-
- final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final TimeWindow win1 = new TimeWindow(1000, 2000);
- final TimeWindow win2 = new TimeWindow(2000, 3000);
- final TimeWindow win3 = new TimeWindow(3000, 4000);
-
- final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 33L, 44L, 55L));
-
- final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
- backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
- final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
- try {
- InternalListState<TimeWindow, Long> state = keyedBackend.createListState(new TimeWindow.Serializer(), stateDescr);
-
- // populate the different namespaces
- // - abc spreads the values over three namespaces
- // - def spreads teh values over two namespaces (one empty)
- // - ghi is empty
- // - jkl has all elements already in the target namespace
- // - mno has all elements already in one source namespace
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(win1);
- state.add(33L);
- state.add(55L);
-
- state.setCurrentNamespace(win2);
- state.add(22L);
- state.add(11L);
-
- state.setCurrentNamespace(win3);
- state.add(44L);
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(win1);
- state.add(11L);
- state.add(44L);
-
- state.setCurrentNamespace(win3);
- state.add(22L);
- state.add(55L);
- state.add(33L);
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(win1);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(win3);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("abc");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- validateResult(state.get(), expectedResult);
-
- keyedBackend.setCurrentKey("def");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- validateResult(state.get(), expectedResult);
-
- keyedBackend.setCurrentKey("ghi");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("jkl");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- validateResult(state.get(), expectedResult);
-
- keyedBackend.setCurrentKey("mno");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- validateResult(state.get(), expectedResult);
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- // ------------------------------------------------------------------------
- // utilities
- // ------------------------------------------------------------------------
-
- private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
- RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
- new DummyEnvironment("TestTask", 1, 0),
- new JobID(),
- "test-op",
- StringSerializer.INSTANCE,
- 16,
- new KeyGroupRange(2, 3),
- mock(TaskKvStateRegistry.class));
-
- keyedBackend.restore(null);
-
- return keyedBackend;
- }
-
- private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
- int num = 0;
- for (T v : values) {
- num++;
- assertTrue(expected.contains(v));
- }
-
- assertEquals(expected.size(), num);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e3d49cc/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 57d8b0f..8e0f5eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.types.IntValue;
@@ -89,7 +90,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
+import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -1209,6 +1212,176 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
+ public void testListStateAddAndGet() throws Exception {
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+
+ try {
+ ListState<Long> state =
+ keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+ state.add(17L);
+ state.add(11L);
+ assertThat(state.get(), containsInAnyOrder(17L, 11L));
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertNull(state.get());
+ state.add(1L);
+ state.add(2L);
+
+ keyedBackend.setCurrentKey("def");
+ assertThat(state.get(), containsInAnyOrder(11L, 17L));
+
+ state.clear();
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ state.add(3L);
+ state.add(2L);
+ state.add(1L);
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertThat(state.get(), containsInAnyOrder(1L, 2L, 3L, 2L, 1L));
+
+ state.clear();
+
+ // make sure all lists / maps are cleared
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ } finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testListStateMerging() throws Exception {
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ try {
+ InternalListState<Integer, Long> state =
+ (InternalListState<Integer, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+ // populate the different namespaces
+ // - abc spreads the values over three namespaces
+ // - def spreads teh values over two namespaces (one empty)
+ // - ghi is empty
+ // - jkl has all elements already in the target namespace
+ // - mno has all elements already in one source namespace
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.add(33L);
+ state.add(55L);
+
+ state.setCurrentNamespace(namespace2);
+ state.add(22L);
+ state.add(11L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(44L);
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(44L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(22L);
+ state.add(55L);
+ state.add(33L);
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("abc");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertThat(state.get(), containsInAnyOrder(11L, 22L, 33L, 44L, 55L));
+
+ keyedBackend.setCurrentKey("def");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertThat(state.get(), containsInAnyOrder(11L, 22L, 33L, 44L, 55L));
+
+ keyedBackend.setCurrentKey("ghi");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertThat(state.get(), containsInAnyOrder(11L, 22L, 33L, 44L, 55L));
+
+ keyedBackend.setCurrentKey("mno");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertThat(state.get(), containsInAnyOrder(11L, 22L, 33L, 44L, 55L));
+
+ // make sure all lists / maps are cleared
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("ghi");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
@SuppressWarnings("unchecked")
public void testReducingState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
http://git-wip-us.apache.org/repos/asf/flink/blob/5e3d49cc/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
deleted file mode 100644
index affbf75..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the simple Java heap objects implementation of the {@link ListState}.
- */
-public class HeapListStateTest extends HeapStateBackendTestBase {
-
- @Test
- public void testAddAndGet() throws Exception {
-
- final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
- try {
- InternalListState<VoidNamespace, Long> state =
- keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
- state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
- state.add(17L);
- state.add(11L);
- assertEquals(asList(17L, 11L), state.get());
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertNull(state.get());
- state.add(1L);
- state.add(2L);
-
- keyedBackend.setCurrentKey("def");
- assertEquals(asList(17L, 11L), state.get());
- state.clear();
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- state.add(3L);
- state.add(2L);
- state.add(1L);
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get());
- state.clear();
-
- // make sure all lists / maps are cleared
-
- StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
- ((HeapListState<String, VoidNamespace, Long>) state).stateTable;
-
- assertTrue(stateTable.isEmpty());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- @Test
- public void testMerging() throws Exception {
-
- final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final Integer namespace1 = 1;
- final Integer namespace2 = 2;
- final Integer namespace3 = 3;
-
- final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 33L, 44L, 55L));
-
- final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
- try {
- InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
-
- // populate the different namespaces
- // - abc spreads the values over three namespaces
- // - def spreads teh values over two namespaces (one empty)
- // - ghi is empty
- // - jkl has all elements already in the target namespace
- // - mno has all elements already in one source namespace
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(namespace1);
- state.add(33L);
- state.add(55L);
-
- state.setCurrentNamespace(namespace2);
- state.add(22L);
- state.add(11L);
-
- state.setCurrentNamespace(namespace3);
- state.add(44L);
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(namespace1);
- state.add(11L);
- state.add(44L);
-
- state.setCurrentNamespace(namespace3);
- state.add(22L);
- state.add(55L);
- state.add(33L);
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(namespace1);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(namespace3);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("abc");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- validateResult(state.get(), expectedResult);
-
- keyedBackend.setCurrentKey("def");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- validateResult(state.get(), expectedResult);
-
- keyedBackend.setCurrentKey("ghi");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("jkl");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- validateResult(state.get(), expectedResult);
-
- keyedBackend.setCurrentKey("mno");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- validateResult(state.get(), expectedResult);
-
- // make sure all lists / maps are cleared
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("ghi");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- StateTable<String, Integer, ArrayList<Long>> stateTable =
- ((HeapListState<String, Integer, Long>) state).stateTable;
-
- assertTrue(stateTable.isEmpty());
- assertEquals(0, keyedBackend.numStateEntries());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
- int num = 0;
- for (T v : values) {
- num++;
- assertTrue(expected.contains(v));
- }
-
- assertEquals(expected.size(), num);
- }
-}