You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/12 20:36:00 UTC
[1/2] beam git commit: [BEAM-2423] Abstract StateInternalsTest for
the different state internals
Repository: beam
Updated Branches:
refs/heads/master 1597f3ca6 -> f9d51aa5c
[BEAM-2423] Abstract StateInternalsTest for the different state internals
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8362bdb9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8362bdb9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8362bdb9
Branch: refs/heads/master
Commit: 8362bdb9cd35cc02ed179b3a64fd72f1264a99be
Parents: 1597f3c
Author: JingsongLi <lz...@aliyun.com>
Authored: Thu Jun 8 01:31:34 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 12 11:51:29 2017 +0200
----------------------------------------------------------------------
pom.xml | 7 +
.../core/InMemoryStateInternalsTest.java | 555 ++----------------
.../beam/runners/core/StateInternalsTest.java | 573 +++++++++++++++++++
runners/flink/pom.xml | 8 +
.../streaming/FlinkStateInternalsTest.java | 348 +----------
5 files changed, 641 insertions(+), 850 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 805a8d6..9373a40 100644
--- a/pom.xml
+++ b/pom.xml
@@ -511,6 +511,13 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index b526305..335c2f8 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -17,545 +17,58 @@
*/
package org.apache.beam.runners.core;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItems;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.state.SetState;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.state.State;
import org.hamcrest.Matchers;
-import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.junit.runners.Suite;
/**
- * Tests for {@link InMemoryStateInternals}.
+ * Tests for {@link InMemoryStateInternals}. This is based on {@link StateInternalsTest}.
*/
-@RunWith(JUnit4.class)
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ InMemoryStateInternalsTest.StandardStateInternalsTests.class,
+ InMemoryStateInternalsTest.OtherTests.class
+})
public class InMemoryStateInternalsTest {
- private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
- private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
- private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
- private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
- private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
- StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<CombiningState<Integer, int[], Integer>>
- SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<BagState<String>> STRING_BAG_ADDR =
- StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<SetState<String>> STRING_SET_ADDR =
- StateTags.set("stringSet", StringUtf8Coder.of());
- private static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
- StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
- private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
-
- InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey");
-
- @Test
- public void testValue() throws Exception {
- ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value));
- assertThat(
- underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
- Matchers.not(Matchers.sameInstance(value)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.write("hello");
- assertThat(value.read(), equalTo("hello"));
- value.write("world");
- assertThat(value.read(), equalTo("world"));
-
- value.clear();
- assertThat(value.read(), Matchers.nullValue());
- assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value));
- }
-
- @Test
- public void testBag() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_BAG_ADDR)));
- assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))));
-
- assertThat(value.read(), Matchers.emptyIterable());
- value.add("hello");
- assertThat(value.read(), containsInAnyOrder("hello"));
-
- value.add("world");
- assertThat(value.read(), containsInAnyOrder("hello", "world"));
-
- value.clear();
- assertThat(value.read(), Matchers.emptyIterable());
- assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), Matchers.sameInstance(value));
- }
-
- @Test
- public void testBagIsEmpty() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add("hello");
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeBagIntoSource() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
- // Reading the merged bag gets both the contents
- assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testMergeBagIntoNewNamespace() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
- BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
- // Reading the merged bag gets both the contents
- assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag1.read(), Matchers.emptyIterable());
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testSet() throws Exception {
- SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_SET_ADDR)));
- assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_SET_ADDR))));
-
- // empty
- assertThat(value.read(), Matchers.emptyIterable());
- assertFalse(value.contains("A").read());
-
- // add
- value.add("A");
- value.add("B");
- value.add("A");
- assertFalse(value.addIfAbsent("B").read());
- assertThat(value.read(), containsInAnyOrder("A", "B"));
-
- // remove
- value.remove("A");
- assertThat(value.read(), containsInAnyOrder("B"));
- value.remove("C");
- assertThat(value.read(), containsInAnyOrder("B"));
-
- // contains
- assertFalse(value.contains("A").read());
- assertTrue(value.contains("B").read());
- value.add("C");
- value.add("D");
-
- // readLater
- assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D"));
- SetState<String> later = value.readLater();
- assertThat(later.read(), hasItems("C", "D"));
- assertFalse(later.contains("A").read());
-
- // clear
- value.clear();
- assertThat(value.read(), Matchers.emptyIterable());
- assertThat(underTest.state(NAMESPACE_1, STRING_SET_ADDR), Matchers.sameInstance(value));
-
- }
-
- @Test
- public void testSetIsEmpty() throws Exception {
- SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add("hello");
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeSetIntoSource() throws Exception {
- SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
- SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
-
- set1.add("Hello");
- set2.add("Hello");
- set2.add("World");
- set1.add("!");
-
- StateMerging.mergeSets(Arrays.asList(set1, set2), set1);
-
- // Reading the merged set gets both the contents
- assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!"));
- assertThat(set2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testMergeSetIntoNewNamespace() throws Exception {
- SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
- SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
- SetState<String> set3 = underTest.state(NAMESPACE_3, STRING_SET_ADDR);
-
- set1.add("Hello");
- set2.add("Hello");
- set2.add("World");
- set1.add("!");
-
- StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3);
-
- // Reading the merged set gets both the contents
- assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!"));
- assertThat(set1.read(), Matchers.emptyIterable());
- assertThat(set2.read(), Matchers.emptyIterable());
- }
-
- // for testMap
- private static class MapEntry<K, V> implements Map.Entry<K, V> {
- private K key;
- private V value;
-
- private MapEntry(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- static <K, V> Map.Entry<K, V> of(K k, V v) {
- return new MapEntry<>(k, v);
+ /**
+ * A standard StateInternals test.
+ */
+ @RunWith(JUnit4.class)
+ public static class StandardStateInternalsTests extends StateInternalsTest {
+ @Override
+ protected StateInternals createStateInternals() {
+ return new InMemoryStateInternals<>("dummyKey");
}
+ }
- public final K getKey() {
- return key;
- }
- public final V getValue() {
- return value;
- }
+ /**
+ * A specific test of InMemoryStateInternals.
+ */
+ @RunWith(JUnit4.class)
+ public static class OtherTests {
- public final String toString() {
- return key + "=" + value;
- }
+ StateInternals underTest = new InMemoryStateInternals<>("dummyKey");
- public final int hashCode() {
- return Objects.hashCode(key) ^ Objects.hashCode(value);
+ @Test
+ public void testSameInstance() {
+ assertSameInstance(StateInternalsTest.STRING_VALUE_ADDR);
+ assertSameInstance(StateInternalsTest.SUM_INTEGER_ADDR);
+ assertSameInstance(StateInternalsTest.STRING_BAG_ADDR);
+ assertSameInstance(StateInternalsTest.STRING_SET_ADDR);
+ assertSameInstance(StateInternalsTest.STRING_MAP_ADDR);
+ assertSameInstance(StateInternalsTest.WATERMARK_EARLIEST_ADDR);
}
- public final V setValue(V newValue) {
- V oldValue = value;
- value = newValue;
- return oldValue;
+ private <T extends State> void assertSameInstance(StateTag<T> address) {
+ assertThat(underTest.state(StateInternalsTest.NAMESPACE_1, address),
+ Matchers.sameInstance(underTest.state(StateInternalsTest.NAMESPACE_1, address)));
}
-
- public final boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (o instanceof Map.Entry) {
- Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
- if (Objects.equals(key, e.getKey())
- && Objects.equals(value, e.getValue())) {
- return true;
- }
- }
- return false;
- }
- }
-
- @Test
- public void testMap() throws Exception {
- MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_MAP_ADDR)));
- assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR))));
-
- // put
- assertThat(value.entries().read(), Matchers.emptyIterable());
- value.put("A", 1);
- value.put("B", 2);
- value.put("A", 11);
- assertThat(value.putIfAbsent("B", 22).read(), equalTo(2));
- assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11),
- MapEntry.of("B", 2)));
-
- // remove
- value.remove("A");
- assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
- value.remove("C");
- assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
-
- // get
- assertNull(value.get("A").read());
- assertThat(value.get("B").read(), equalTo(2));
- value.put("C", 3);
- value.put("D", 4);
- assertThat(value.get("C").read(), equalTo(3));
-
- // iterate
- value.put("E", 5);
- value.remove("C");
- assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E"));
- assertThat(value.values().read(), containsInAnyOrder(2, 4, 5));
- assertThat(
- value.entries().read(),
- containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
-
- // readLater
- assertThat(value.get("B").readLater().read(), equalTo(2));
- assertNull(value.get("A").readLater().read());
- assertThat(
- value.entries().readLater().read(),
- containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
-
- // clear
- value.clear();
- assertThat(value.entries().read(), Matchers.emptyIterable());
- assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), Matchers.sameInstance(value));
}
- @Test
- public void testCombiningValue() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
- assertThat(value.read(), equalTo(0));
- value.add(2);
- assertThat(value.read(), equalTo(2));
-
- value.add(3);
- assertThat(value.read(), equalTo(5));
-
- value.clear();
- assertThat(value.read(), equalTo(0));
- assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), Matchers.sameInstance(value));
- }
-
- @Test
- public void testCombiningIsEmpty() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(5);
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeCombiningValueIntoSource() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- assertThat(value1.read(), equalTo(11));
- assertThat(value2.read(), equalTo(10));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
- assertThat(value1.read(), equalTo(21));
- assertThat(value2.read(), equalTo(0));
- }
-
- @Test
- public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value3 =
- underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
- // Merging clears the old values and updates the result value.
- assertThat(value1.read(), equalTo(0));
- assertThat(value2.read(), equalTo(0));
- assertThat(value3.read(), equalTo(21));
- }
-
- @Test
- public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), equalTo(new Instant(2000)));
-
- value.add(new Instant(3000));
- assertThat(value.read(), equalTo(new Instant(2000)));
-
- value.add(new Instant(1000));
- assertThat(value.read(), equalTo(new Instant(1000)));
-
- value.clear();
- assertThat(value.read(), equalTo(null));
- assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), Matchers.sameInstance(value));
- }
-
- @Test
- public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState value =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), equalTo(new Instant(2000)));
-
- value.add(new Instant(3000));
- assertThat(value.read(), equalTo(new Instant(3000)));
-
- value.add(new Instant(1000));
- assertThat(value.read(), equalTo(new Instant(3000)));
-
- value.clear();
- assertThat(value.read(), equalTo(null));
- assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), Matchers.sameInstance(value));
- }
-
- @Test
- public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), equalTo(new Instant(2000)));
-
- value.clear();
- assertThat(value.read(), equalTo(null));
- assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), Matchers.sameInstance(value));
- }
-
- @Test
- public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(new Instant(1000));
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState value2 =
- underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the merged value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
-
- assertThat(value1.read(), equalTo(new Instant(2000)));
- assertThat(value2.read(), equalTo(null));
- }
-
- @Test
- public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value2 =
- underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value3 =
- underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
-
- // Merging clears the old values and updates the result value.
- assertThat(value3.read(), equalTo(new Instant(5000)));
- assertThat(value1.read(), equalTo(null));
- assertThat(value2.read(), equalTo(null));
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
new file mode 100644
index 0000000..bf3156a
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateInternalsTest.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.GroupingState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link StateInternals}.
+ */
+public abstract class StateInternalsTest {
+
+ private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
+ static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
+ private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
+ private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
+
+ static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
+ StateTags.value("stringValue", StringUtf8Coder.of());
+ static final StateTag<CombiningState<Integer, int[], Integer>>
+ SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+ "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
+ static final StateTag<BagState<String>> STRING_BAG_ADDR =
+ StateTags.bag("stringBag", StringUtf8Coder.of());
+ static final StateTag<SetState<String>> STRING_SET_ADDR =
+ StateTags.set("stringSet", StringUtf8Coder.of());
+ static final StateTag<MapState<String, Integer>> STRING_MAP_ADDR =
+ StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of());
+ static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+ private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+ private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+
+ private StateInternals underTest;
+
+ @Before
+ public void setUp() {
+ this.underTest = createStateInternals();
+ }
+
+ protected abstract StateInternals createStateInternals();
+
+ @Test
+ public void testValue() throws Exception {
+ ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value));
+ assertThat(
+ underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
+ Matchers.not(equalTo(value)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.write("hello");
+ assertThat(value.read(), equalTo("hello"));
+ value.write("world");
+ assertThat(value.read(), equalTo("world"));
+
+ value.clear();
+ assertThat(value.read(), Matchers.nullValue());
+ assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), equalTo(value));
+ }
+
+ @Test
+ public void testBag() throws Exception {
+ BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_BAG_ADDR)));
+ assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))));
+
+ assertThat(value.read(), Matchers.emptyIterable());
+ value.add("hello");
+ assertThat(value.read(), containsInAnyOrder("hello"));
+
+ value.add("world");
+ assertThat(value.read(), containsInAnyOrder("hello", "world"));
+
+ value.clear();
+ assertThat(value.read(), Matchers.emptyIterable());
+ assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), equalTo(value));
+ }
+
+ @Test
+ public void testBagIsEmpty() throws Exception {
+ BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add("hello");
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeBagIntoSource() throws Exception {
+ BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+ BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+ bag1.add("Hello");
+ bag2.add("World");
+ bag1.add("!");
+
+ StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+ // Reading the merged bag gets both the contents
+ assertThat(bag1.read(), containsInAnyOrder("Hello", "World", "!"));
+ assertThat(bag2.read(), Matchers.emptyIterable());
+ }
+
+ @Test
+ public void testMergeBagIntoNewNamespace() throws Exception {
+ BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+ BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+ BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+ bag1.add("Hello");
+ bag2.add("World");
+ bag1.add("!");
+
+ StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+ // Reading the merged bag gets both the contents
+ assertThat(bag3.read(), containsInAnyOrder("Hello", "World", "!"));
+ assertThat(bag1.read(), Matchers.emptyIterable());
+ assertThat(bag2.read(), Matchers.emptyIterable());
+ }
+
+ @Test
+ public void testSet() throws Exception {
+
+ SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_SET_ADDR)));
+ assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_SET_ADDR))));
+
+ // empty
+ assertThat(value.read(), Matchers.emptyIterable());
+ assertFalse(value.contains("A").read());
+
+ // add
+ value.add("A");
+ value.add("B");
+ value.add("A");
+ assertFalse(value.addIfAbsent("B").read());
+ assertThat(value.read(), containsInAnyOrder("A", "B"));
+
+ // remove
+ value.remove("A");
+ assertThat(value.read(), containsInAnyOrder("B"));
+ value.remove("C");
+ assertThat(value.read(), containsInAnyOrder("B"));
+
+ // contains
+ assertFalse(value.contains("A").read());
+ assertTrue(value.contains("B").read());
+ value.add("C");
+ value.add("D");
+
+ // readLater
+ assertThat(value.readLater().read(), containsInAnyOrder("B", "C", "D"));
+ SetState<String> later = value.readLater();
+ assertThat(later.read(), hasItems("C", "D"));
+ assertFalse(later.contains("A").read());
+
+ // clear
+ value.clear();
+ assertThat(value.read(), Matchers.emptyIterable());
+ assertThat(underTest.state(NAMESPACE_1, STRING_SET_ADDR), equalTo(value));
+
+ }
+
+ @Test
+ public void testSetIsEmpty() throws Exception {
+
+ SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add("hello");
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeSetIntoSource() throws Exception {
+
+ SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+ SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
+
+ set1.add("Hello");
+ set2.add("Hello");
+ set2.add("World");
+ set1.add("!");
+
+ StateMerging.mergeSets(Arrays.asList(set1, set2), set1);
+
+ // Reading the merged set gets both the contents
+ assertThat(set1.read(), containsInAnyOrder("Hello", "World", "!"));
+ assertThat(set2.read(), Matchers.emptyIterable());
+ }
+
+ @Test
+ public void testMergeSetIntoNewNamespace() throws Exception {
+
+ SetState<String> set1 = underTest.state(NAMESPACE_1, STRING_SET_ADDR);
+ SetState<String> set2 = underTest.state(NAMESPACE_2, STRING_SET_ADDR);
+ SetState<String> set3 = underTest.state(NAMESPACE_3, STRING_SET_ADDR);
+
+ set1.add("Hello");
+ set2.add("Hello");
+ set2.add("World");
+ set1.add("!");
+
+ StateMerging.mergeSets(Arrays.asList(set1, set2, set3), set3);
+
+ // Reading the merged set gets both the contents
+ assertThat(set3.read(), containsInAnyOrder("Hello", "World", "!"));
+ assertThat(set1.read(), Matchers.emptyIterable());
+ assertThat(set2.read(), Matchers.emptyIterable());
+ }
+
+ // for testMap
+ private static class MapEntry<K, V> implements Map.Entry<K, V> {
+ private K key;
+ private V value;
+
+ private MapEntry(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ static <K, V> Map.Entry<K, V> of(K k, V v) {
+ return new MapEntry<>(k, v);
+ }
+
+ public final K getKey() {
+ return key;
+ }
+ public final V getValue() {
+ return value;
+ }
+
+ public final String toString() {
+ return key + "=" + value;
+ }
+
+ public final int hashCode() {
+ return Objects.hashCode(key) ^ Objects.hashCode(value);
+ }
+
+ public final V setValue(V newValue) {
+ V oldValue = value;
+ value = newValue;
+ return oldValue;
+ }
+
+ public final boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o instanceof Map.Entry) {
+ Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
+ if (Objects.equals(key, e.getKey())
+ && Objects.equals(value, e.getValue())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ @Test
+ public void testMap() throws Exception {
+
+ MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertThat(value, equalTo(underTest.state(NAMESPACE_1, STRING_MAP_ADDR)));
+ assertThat(value, not(equalTo(underTest.state(NAMESPACE_2, STRING_MAP_ADDR))));
+
+ // put
+ assertThat(value.entries().read(), Matchers.emptyIterable());
+ value.put("A", 1);
+ value.put("B", 2);
+ value.put("A", 11);
+ assertThat(value.putIfAbsent("B", 22).read(), equalTo(2));
+ assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("A", 11),
+ MapEntry.of("B", 2)));
+
+ // remove
+ value.remove("A");
+ assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
+ value.remove("C");
+ assertThat(value.entries().read(), containsInAnyOrder(MapEntry.of("B", 2)));
+
+ // get
+ assertNull(value.get("A").read());
+ assertThat(value.get("B").read(), equalTo(2));
+ value.put("C", 3);
+ value.put("D", 4);
+ assertThat(value.get("C").read(), equalTo(3));
+
+ // iterate
+ value.put("E", 5);
+ value.remove("C");
+ assertThat(value.keys().read(), containsInAnyOrder("B", "D", "E"));
+ assertThat(value.values().read(), containsInAnyOrder(2, 4, 5));
+ assertThat(
+ value.entries().read(),
+ containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
+
+ // readLater
+ assertThat(value.get("B").readLater().read(), equalTo(2));
+ assertNull(value.get("A").readLater().read());
+ assertThat(
+ value.entries().readLater().read(),
+ containsInAnyOrder(MapEntry.of("B", 2), MapEntry.of("D", 4), MapEntry.of("E", 5)));
+
+ // clear
+ value.clear();
+ assertThat(value.entries().read(), Matchers.emptyIterable());
+ assertThat(underTest.state(NAMESPACE_1, STRING_MAP_ADDR), equalTo(value));
+ }
+
+ @Test
+ public void testCombiningValue() throws Exception {
+
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+ assertThat(value.read(), equalTo(0));
+ value.add(2);
+ assertThat(value.read(), equalTo(2));
+
+ value.add(3);
+ assertThat(value.read(), equalTo(5));
+
+ value.clear();
+ assertThat(value.read(), equalTo(0));
+ assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), equalTo(value));
+ }
+
+ @Test
+ public void testCombiningIsEmpty() throws Exception {
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add(5);
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeCombiningValueIntoSource() throws Exception {
+ CombiningState<Integer, int[], Integer> value1 =
+ underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ CombiningState<Integer, int[], Integer> value2 =
+ underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+ value1.add(5);
+ value2.add(10);
+ value1.add(6);
+
+ assertThat(value1.read(), equalTo(11));
+ assertThat(value2.read(), equalTo(10));
+
+ // Merging clears the old values and updates the result value.
+ StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+ assertThat(value1.read(), equalTo(21));
+ assertThat(value2.read(), equalTo(0));
+ }
+
+ @Test
+ public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+ CombiningState<Integer, int[], Integer> value1 =
+ underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ CombiningState<Integer, int[], Integer> value2 =
+ underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+ CombiningState<Integer, int[], Integer> value3 =
+ underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+ value1.add(5);
+ value2.add(10);
+ value1.add(6);
+
+ StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+ // Merging clears the old values and updates the result value.
+ assertThat(value1.read(), equalTo(0));
+ assertThat(value2.read(), equalTo(0));
+ assertThat(value3.read(), equalTo(21));
+ }
+
+ @Test
+ public void testWatermarkEarliestState() throws Exception {
+ WatermarkHoldState value =
+ underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.add(new Instant(2000));
+ assertThat(value.read(), equalTo(new Instant(2000)));
+
+ value.add(new Instant(3000));
+ assertThat(value.read(), equalTo(new Instant(2000)));
+
+ value.add(new Instant(1000));
+ assertThat(value.read(), equalTo(new Instant(1000)));
+
+ value.clear();
+ assertThat(value.read(), equalTo(null));
+ assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), equalTo(value));
+ }
+
+ @Test
+ public void testWatermarkLatestState() throws Exception {
+ WatermarkHoldState value =
+ underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.add(new Instant(2000));
+ assertThat(value.read(), equalTo(new Instant(2000)));
+
+ value.add(new Instant(3000));
+ assertThat(value.read(), equalTo(new Instant(3000)));
+
+ value.add(new Instant(1000));
+ assertThat(value.read(), equalTo(new Instant(3000)));
+
+ value.clear();
+ assertThat(value.read(), equalTo(null));
+ assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), equalTo(value));
+ }
+
+ @Test
+ public void testWatermarkEndOfWindowState() throws Exception {
+ WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.add(new Instant(2000));
+ assertThat(value.read(), equalTo(new Instant(2000)));
+
+ value.clear();
+ assertThat(value.read(), equalTo(null));
+ assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), equalTo(value));
+ }
+
+ @Test
+ public void testWatermarkStateIsEmpty() throws Exception {
+ WatermarkHoldState value =
+ underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add(new Instant(1000));
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeEarliestWatermarkIntoSource() throws Exception {
+ WatermarkHoldState value1 =
+ underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+ WatermarkHoldState value2 =
+ underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+ value1.add(new Instant(3000));
+ value2.add(new Instant(5000));
+ value1.add(new Instant(4000));
+ value2.add(new Instant(2000));
+
+ // Merging clears the old values and updates the merged value.
+ StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
+
+ assertThat(value1.read(), equalTo(new Instant(2000)));
+ assertThat(value2.read(), equalTo(null));
+ }
+
+ @Test
+ public void testMergeLatestWatermarkIntoSource() throws Exception {
+ WatermarkHoldState value1 =
+ underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+ WatermarkHoldState value2 =
+ underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+ WatermarkHoldState value3 =
+ underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+ value1.add(new Instant(3000));
+ value2.add(new Instant(5000));
+ value1.add(new Instant(4000));
+ value2.add(new Instant(2000));
+
+ // Merging clears the old values and updates the result value.
+ StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
+
+ // Merging clears the old values and updates the result value.
+ assertThat(value3.read(), equalTo(new Instant(5000)));
+ assertThat(value1.read(), equalTo(null));
+ assertThat(value2.read(), equalTo(null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index c4c6b55..a5b8203 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -381,5 +381,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/8362bdb9/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 35d2b78..e7564ec 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -17,31 +17,11 @@
*/
package org.apache.beam.runners.flink.streaming;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.apache.beam.runners.core.StateMerging;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsTest;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
@@ -52,42 +32,17 @@ import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Tests for {@link FlinkStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
+ * Tests for {@link FlinkStateInternals}. This is based on {@link StateInternalsTest}.
*/
@RunWith(JUnit4.class)
-public class FlinkStateInternalsTest {
- private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
- private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
- private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
- private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
-
- private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
- StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<CombiningState<Integer, int[], Integer>>
- SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<BagState<String>> STRING_BAG_ADDR =
- StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<WatermarkHoldState> WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
-
- FlinkStateInternals<String> underTest;
+public class FlinkStateInternalsTest extends StateInternalsTest {
- @Before
- public void initStateInternals() {
+ @Override
+ protected StateInternals createStateInternals() {
MemoryStateBackend backend = new MemoryStateBackend();
try {
AbstractKeyedStateBackend<ByteBuffer> keyedStateBackend = backend.createKeyedStateBackend(
@@ -98,296 +53,31 @@ public class FlinkStateInternalsTest {
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
- underTest = new FlinkStateInternals<>(keyedStateBackend, StringUtf8Coder.of());
keyedStateBackend.setCurrentKey(
ByteBuffer.wrap(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Hello")));
+
+ return new FlinkStateInternals<>(keyedStateBackend, StringUtf8Coder.of());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- @Test
- public void testValue() throws Exception {
- ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR);
-
- assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
- assertNotEquals(
- underTest.state(NAMESPACE_2, STRING_VALUE_ADDR),
- value);
-
- assertThat(value.read(), Matchers.nullValue());
- value.write("hello");
- assertThat(value.read(), Matchers.equalTo("hello"));
- value.write("world");
- assertThat(value.read(), Matchers.equalTo("world"));
-
- value.clear();
- assertThat(value.read(), Matchers.nullValue());
- assertEquals(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
-
- }
-
- @Test
- public void testBag() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
- assertThat(value.read(), Matchers.emptyIterable());
- value.add("hello");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
- value.add("world");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
- value.clear();
- assertThat(value.read(), Matchers.emptyIterable());
- assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
- }
-
- @Test
- public void testBagIsEmpty() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add("hello");
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeBagIntoSource() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
- // Reading the merged bag gets both the contents
- assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testMergeBagIntoNewNamespace() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
- BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
- // Reading the merged bag gets both the contents
- assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag1.read(), Matchers.emptyIterable());
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testCombiningValue() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
- assertThat(value.read(), Matchers.equalTo(0));
- value.add(2);
- assertThat(value.read(), Matchers.equalTo(2));
-
- value.add(3);
- assertThat(value.read(), Matchers.equalTo(5));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(0));
- assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
- }
-
- @Test
- public void testCombiningIsEmpty() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(5);
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeCombiningValueIntoSource() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- assertThat(value1.read(), Matchers.equalTo(11));
- assertThat(value2.read(), Matchers.equalTo(10));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
- assertThat(value1.read(), Matchers.equalTo(21));
- assertThat(value2.read(), Matchers.equalTo(0));
- }
-
- @Test
- public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value3 =
- underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
- // Merging clears the old values and updates the result value.
- assertThat(value1.read(), Matchers.equalTo(0));
- assertThat(value2.read(), Matchers.equalTo(0));
- assertThat(value3.read(), Matchers.equalTo(21));
- }
-
- @Test
- public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+ ///////////////////////// Unsupported tests \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
- value.add(new Instant(3000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+ @Override
+ public void testSet() {}
- value.add(new Instant(1000));
- assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
+ @Override
+ public void testSetIsEmpty() {}
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
- }
-
- @Test
- public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState value =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
+ @Override
+ public void testMergeSetIntoSource() {}
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+ @Override
+ public void testMergeSetIntoNewNamespace() {}
- value.add(new Instant(3000));
- assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+ @Override
+ public void testMap() {}
- value.add(new Instant(1000));
- assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
- }
-
- @Test
- public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
- }
-
- @Test
- public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(new Instant(1000));
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState value2 =
- underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the merged value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
-
- assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
- assertThat(value2.read(), Matchers.equalTo(null));
- }
-
- @Test
- public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value2 =
- underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value3 =
- underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
-
- // Merging clears the old values and updates the result value.
- assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
- assertThat(value1.read(), Matchers.equalTo(null));
- assertThat(value2.read(), Matchers.equalTo(null));
- }
}
[2/2] beam git commit: This closes #3313
Posted by al...@apache.org.
This closes #3313
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9d51aa5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9d51aa5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9d51aa5
Branch: refs/heads/master
Commit: f9d51aa5cc718c44c82ec4773dbd16a57e7d394e
Parents: 1597f3c 8362bdb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 12 22:35:35 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 12 22:35:35 2017 +0200
----------------------------------------------------------------------
pom.xml | 7 +
.../core/InMemoryStateInternalsTest.java | 555 ++----------------
.../beam/runners/core/StateInternalsTest.java | 573 +++++++++++++++++++
runners/flink/pom.xml | 8 +
.../streaming/FlinkStateInternalsTest.java | 348 +----------
5 files changed, 641 insertions(+), 850 deletions(-)
----------------------------------------------------------------------