You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/28 15:09:13 UTC
[8/8] flink git commit: [FLINK-5619] Consolidate AggregatingState
Tests in StateBackendTestBase
[FLINK-5619] Consolidate AggregatingState Tests in StateBackendTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d7bc7bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d7bc7bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d7bc7bd
Branch: refs/heads/master
Commit: 9d7bc7bd66d60329dabb3cbd5d8be3a4f49e823b
Parents: ce0f83e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 27 13:02:57 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 28 17:03:54 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBAggregatingStateTest.java | 253 ------------------
.../runtime/state/StateBackendTestBase.java | 206 +++++++++++++++
.../state/heap/HeapAggregatingStateTest.java | 256 -------------------
3 files changed, 206 insertions(+), 509 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7bc7bd/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
deleted file mode 100644
index f3065ab..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the {@link InternalAggregatingState} implementation on top of RocksDB.
- */
-public class RocksDBAggregatingStateTest {
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testAddAndGet() throws Exception {
-
- final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
- backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
- final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
- try {
- InternalAggregatingState<VoidNamespace, Long, Long> state =
- keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
- state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
- state.add(17L);
- state.add(11L);
- assertEquals(28L, state.get().longValue());
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertNull(state.get());
- state.add(1L);
- state.add(2L);
-
- keyedBackend.setCurrentKey("def");
- assertEquals(28L, state.get().longValue());
- state.clear();
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- state.add(3L);
- state.add(2L);
- state.add(1L);
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertEquals(9L, state.get().longValue());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- @Test
- public void testMerging() throws Exception {
-
- final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final TimeWindow win1 = new TimeWindow(1000, 2000);
- final TimeWindow win2 = new TimeWindow(2000, 3000);
- final TimeWindow win3 = new TimeWindow(3000, 4000);
-
- final Long expectedResult = 165L;
-
- final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
- backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
-
- final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
-
- try {
- InternalAggregatingState<TimeWindow, Long, Long> state =
- keyedBackend.createAggregatingState(new TimeWindow.Serializer(), stateDescr);
-
- // populate the different namespaces
- // - abc spreads the values over three namespaces
- // - def spreads teh values over two namespaces (one empty)
- // - ghi is empty
- // - jkl has all elements already in the target namespace
- // - mno has all elements already in one source namespace
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(win1);
- state.add(33L);
- state.add(55L);
-
- state.setCurrentNamespace(win2);
- state.add(22L);
- state.add(11L);
-
- state.setCurrentNamespace(win3);
- state.add(44L);
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(win1);
- state.add(11L);
- state.add(44L);
-
- state.setCurrentNamespace(win3);
- state.add(22L);
- state.add(55L);
- state.add(33L);
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(win1);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(win3);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("abc");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("def");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("ghi");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("jkl");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("mno");
- state.mergeNamespaces(win1, asList(win2, win3));
- state.setCurrentNamespace(win1);
- assertEquals(expectedResult, state.get());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
- RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
- new DummyEnvironment("TestTask", 1, 0),
- new JobID(),
- "test-op",
- StringSerializer.INSTANCE,
- 16,
- new KeyGroupRange(2, 3),
- mock(TaskKvStateRegistry.class));
-
- keyedBackend.restore(null);
-
- return keyedBackend;
- }
-
- // test functions
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
-
- @Override
- public MutableLong createAccumulator() {
- return new MutableLong();
- }
-
- @Override
- public void add(Long value, MutableLong accumulator) {
- accumulator.value += value;
- }
-
- @Override
- public Long getResult(MutableLong accumulator) {
- return accumulator.value;
- }
-
- @Override
- public MutableLong merge(MutableLong a, MutableLong b) {
- a.value += b.value;
- return a;
- }
- }
-
- private static final class MutableLong {
- long value;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7bc7bd/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 5b8e8aa..ed280a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -19,8 +19,11 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
@@ -57,6 +60,7 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -1710,6 +1714,179 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
+ public void testAggregatingStateAddAndGet() throws Exception {
+
+ final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ try {
+ AggregatingState<Long, Long> state =
+ keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescr);
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+ state.add(17L);
+ state.add(11L);
+ assertEquals(28L, state.get().longValue());
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertNull(state.get());
+ state.add(1L);
+ state.add(2L);
+
+ keyedBackend.setCurrentKey("def");
+ assertEquals(28L, state.get().longValue());
+ state.clear();
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ state.add(3L);
+ state.add(2L);
+ state.add(1L);
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertEquals(9L, state.get().longValue());
+ state.clear();
+
+ // make sure all lists / maps are cleared
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testAggregatingStateMerging() throws Exception {
+
+ final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new AggregatingAddingFunction(), MutableLong.class);
+
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ final Long expectedResult = 165L;
+
+ AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
+
+ try {
+ InternalAggregatingState<Integer, Long, Long> state =
+ (InternalAggregatingState<Integer, Long, Long>) keyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, stateDescr);
+
+ // populate the different namespaces
+ // - abc spreads the values over three namespaces
+ // - def spreads teh values over two namespaces (one empty)
+ // - ghi is empty
+ // - jkl has all elements already in the target namespace
+ // - mno has all elements already in one source namespace
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.add(33L);
+ state.add(55L);
+
+ state.setCurrentNamespace(namespace2);
+ state.add(22L);
+ state.add(11L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(44L);
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(44L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(22L);
+ state.add(55L);
+ state.add(33L);
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("abc");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("def");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("ghi");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("mno");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ // make sure all lists / maps are cleared
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("ghi");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ assertThat("State backend is not empty.", keyedBackend.numStateEntries(), is(0));
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
@SuppressWarnings("unchecked,rawtypes")
public void testFoldingState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
@@ -3204,4 +3381,33 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
throw new ExpectedKryoTestException();
}
}
+
+ @SuppressWarnings("serial")
+ private static class AggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+
+ @Override
+ public MutableLong createAccumulator() {
+ return new MutableLong();
+ }
+
+ @Override
+ public void add(Long value, MutableLong accumulator) {
+ accumulator.value += value;
+ }
+
+ @Override
+ public Long getResult(MutableLong accumulator) {
+ return accumulator.value;
+ }
+
+ @Override
+ public MutableLong merge(MutableLong a, MutableLong b) {
+ a.value += b.value;
+ return a;
+ }
+ }
+
+ private static final class MutableLong {
+ long value;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7bc7bd/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
deleted file mode 100644
index cb4e403..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingState;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.junit.Test;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the simple Java heap objects implementation of the {@link AggregatingState}.
- */
-public class HeapAggregatingStateTest extends HeapStateBackendTestBase {
-
- @Test
- public void testAddAndGet() throws Exception {
-
- final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
- try {
- InternalAggregatingState<VoidNamespace, Long, Long> state =
- keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
- state.setCurrentNamespace(VoidNamespace.INSTANCE);
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
- state.add(17L);
- state.add(11L);
- assertEquals(28L, state.get().longValue());
-
- keyedBackend.setCurrentKey("abc");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertNull(state.get());
- state.add(1L);
- state.add(2L);
-
- keyedBackend.setCurrentKey("def");
- assertEquals(28L, state.get().longValue());
- state.clear();
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- state.add(3L);
- state.add(2L);
- state.add(1L);
-
- keyedBackend.setCurrentKey("def");
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("g");
- assertEquals(9L, state.get().longValue());
- state.clear();
-
- // make sure all lists / maps are cleared
-
- StateTable<String, VoidNamespace, MutableLong> stateTable =
- ((HeapAggregatingState<String, VoidNamespace, Long, MutableLong, Long>) state).stateTable;
-
- assertTrue(stateTable.isEmpty());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- @Test
- public void testMerging() throws Exception {
-
- final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
- new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- final Integer namespace1 = 1;
- final Integer namespace2 = 2;
- final Integer namespace3 = 3;
-
- final Long expectedResult = 165L;
-
- final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-
- try {
- InternalAggregatingState<Integer, Long, Long> state =
- keyedBackend.createAggregatingState(IntSerializer.INSTANCE, stateDescr);
-
- // populate the different namespaces
- // - abc spreads the values over three namespaces
- // - def spreads teh values over two namespaces (one empty)
- // - ghi is empty
- // - jkl has all elements already in the target namespace
- // - mno has all elements already in one source namespace
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(namespace1);
- state.add(33L);
- state.add(55L);
-
- state.setCurrentNamespace(namespace2);
- state.add(22L);
- state.add(11L);
-
- state.setCurrentNamespace(namespace3);
- state.add(44L);
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(namespace1);
- state.add(11L);
- state.add(44L);
-
- state.setCurrentNamespace(namespace3);
- state.add(22L);
- state.add(55L);
- state.add(33L);
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(namespace1);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(namespace3);
- state.add(11L);
- state.add(22L);
- state.add(33L);
- state.add(44L);
- state.add(55L);
-
- keyedBackend.setCurrentKey("abc");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("def");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("ghi");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertNull(state.get());
-
- keyedBackend.setCurrentKey("jkl");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- keyedBackend.setCurrentKey("mno");
- state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
- state.setCurrentNamespace(namespace1);
- assertEquals(expectedResult, state.get());
-
- // make sure all lists / maps are cleared
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("ghi");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(namespace1);
- state.clear();
-
- StateTable<String, Integer, MutableLong> stateTable =
- ((HeapAggregatingState<String, Integer, Long, MutableLong, Long>) state).stateTable;
-
- assertTrue(stateTable.isEmpty());
- }
- finally {
- keyedBackend.close();
- keyedBackend.dispose();
- }
- }
-
- // ------------------------------------------------------------------------
- // test functions
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
-
- @Override
- public MutableLong createAccumulator() {
- return new MutableLong();
- }
-
- @Override
- public void add(Long value, MutableLong accumulator) {
- accumulator.value += value;
- }
-
- @Override
- public Long getResult(MutableLong accumulator) {
- return accumulator.value;
- }
-
- @Override
- public MutableLong merge(MutableLong a, MutableLong b) {
- a.value += b.value;
- return a;
- }
- }
-
- private static final class MutableLong {
- long value;
- }
-}