You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/30 15:44:30 UTC
[1/2] beam git commit: BEAM-2022 fix triggering for processing time
timers
Repository: beam
Updated Branches:
refs/heads/master fc55d2f81 -> 202aae9d3
BEAM-2022 fix triggering for processing time timers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb860388
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb860388
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb860388
Branch: refs/heads/master
Commit: eb860388a8626837655e82171c8480421384e419
Parents: 2b6cb8c
Author: Thomas Weise <th...@apache.org>
Authored: Sat Apr 29 01:17:22 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sat Apr 29 01:17:22 2017 -0700
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 41 ++-
.../operators/ApexTimerInternals.java | 155 +++++---
.../translation/ApexStateInternalsTest.java | 368 -------------------
.../operators/ApexTimerInternalsTest.java | 78 +++-
.../utils/ApexStateInternalsTest.java | 367 ++++++++++++++++++
5 files changed, 567 insertions(+), 442 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index f8b6653..3c9f5ab 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -25,11 +25,9 @@ import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Throwables;
-import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.runners.apex.ApexPipelineOptions;
@@ -41,6 +39,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -49,8 +48,8 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -68,7 +67,8 @@ import org.slf4j.LoggerFactory;
* @param <K> key type
* @param <V> value type
*/
-public class ApexGroupByKeyOperator<K, V> implements Operator {
+public class ApexGroupByKeyOperator<K, V> implements Operator,
+ ApexTimerInternals.TimerProcessor<K> {
private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
private boolean traceTuples = true;
@@ -106,7 +106,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
processElement(t.getValue());
} catch (Exception e) {
- Throwables.propagateIfPossible(e);
+ Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
@@ -143,6 +143,8 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
@Override
public void endWindow() {
+ timerInternals.fireReadyTimers(timerInternals.currentProcessingTime().getMillis(),
+ this, TimeDomain.PROCESSING_TIME);
}
@Override
@@ -195,7 +197,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
serializedOptions.get());
}
-
private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
final KV<K, V> kv = windowedValue.getValue();
final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(),
@@ -209,19 +210,23 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
reduceFnRunner.persist();
}
- private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
- this.inputWatermark = new Instant(mark.getTimestamp());
- Multimap<Slice, TimerInternals.TimerData> timers = timerInternals.getTimersReadyToProcess(
- mark.getTimestamp());
- if (!timers.isEmpty()) {
- for (Slice keyBytes : timers.keySet()) {
- K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
- timerInternals.setContext(key, keyCoder, inputWatermark);
- ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
- reduceFnRunner.onTimers(timers.get(keyBytes));
- reduceFnRunner.persist();
- }
+ @Override
+ public void fireTimer(K key, Collection<TimerData> timerData) {
+ timerInternals.setContext(key, keyCoder, inputWatermark);
+ ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
+ try {
+ reduceFnRunner.onTimers(timerData);
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
+ reduceFnRunner.persist();
+ }
+
+ private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
+ this.inputWatermark = new Instant(mark.getTimestamp());
+ timerInternals.fireReadyTimers(this.inputWatermark.getMillis(),
+ this, TimeDomain.EVENT_TIME);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
index b142095..15ccbee 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
@@ -20,9 +20,12 @@ package org.apache.beam.runners.apex.translation.operators;
import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ComparisonChain;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.io.Serializable;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -47,14 +50,16 @@ import org.joda.time.Instant;
@DefaultSerializer(JavaSerializer.class)
class ApexTimerInternals<K> implements TimerInternals, Serializable {
- private Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
- private TimerDataCoder timerDataCoder;
+ private final TimerSet eventTimeTimeTimers;
+ private final TimerSet processingTimeTimers;
+
private transient K currentKey;
private transient Instant currentInputWatermark;
private transient Coder<K> keyCoder;
public ApexTimerInternals(TimerDataCoder timerDataCoder) {
- this.timerDataCoder = timerDataCoder;
+ this.eventTimeTimeTimers = new TimerSet(timerDataCoder);
+ this.processingTimeTimers = new TimerSet(timerDataCoder);
}
public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark) {
@@ -63,31 +68,37 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
this.currentInputWatermark = inputWatermark;
}
+ @VisibleForTesting
+ protected TimerSet getTimerSet(TimeDomain domain) {
+ return (domain == TimeDomain.EVENT_TIME) ? eventTimeTimeTimers : processingTimeTimers;
+ }
+
@Override
public void setTimer(StateNamespace namespace, String timerId, Instant target,
TimeDomain timeDomain) {
TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain);
- registerActiveTimer(currentKey, timerData);
+ setTimer(timerData);
}
@Override
public void setTimer(TimerData timerData) {
- registerActiveTimer(currentKey, timerData);
+ getTimerSet(timerData.getDomain()).addTimer(getKeyBytes(this.currentKey), timerData);
}
@Override
public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
- throw new UnsupportedOperationException();
+ getTimerSet(timeDomain).deleteTimer(getKeyBytes(this.currentKey), namespace, timerId);
}
@Override
public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException();
+ this.eventTimeTimeTimers.deleteTimer(getKeyBytes(this.currentKey), namespace, timerId);
+ this.processingTimeTimers.deleteTimer(getKeyBytes(this.currentKey), namespace, timerId);
}
@Override
public void deleteTimer(TimerData timerKey) {
- unregisterActiveTimer(currentKey, timerKey);
+ getTimerSet(timerKey.getDomain()).deleteTimer(getKeyBytes(this.currentKey), timerKey);
}
@Override
@@ -102,7 +113,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
@Override
public Instant currentInputWatermarkTime() {
- return new Instant(currentInputWatermark);
+ return currentInputWatermark;
}
@Override
@@ -110,14 +121,17 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
return null;
}
+ public interface TimerProcessor<K> {
+ void fireTimer(K key, Collection<TimerData> timerData);
+ }
+
/**
- * Returns the list of timers that are ready to fire. These are the timers
- * that are registered to be triggered at a time before the current watermark.
- * We keep these timers in a Set, so that they are deduplicated, as the same
- * timer can be registered multiple times.
+ * Fire the timers that are ready. These are the timers
+ * that are registered to be triggered at a time before the current time.
*/
- public Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess(
- long currentWatermark) {
+ public void fireReadyTimers(long currentTime,
+ TimerProcessor<K> timerProcessor, TimeDomain timeDomain) {
+ TimerSet timers = getTimerSet(timeDomain);
// we keep the timers to return in a different list and launch them later
// because we cannot prevent a trigger from registering another timer,
@@ -125,16 +139,16 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create();
Iterator<Map.Entry<Slice, Set<Slice>>> it =
- activeTimers.entrySet().iterator();
+ timers.activeTimers.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Slice, Set<Slice>> keyWithTimers = it.next();
Iterator<Slice> timerIt = keyWithTimers.getValue().iterator();
while (timerIt.hasNext()) {
try {
- TimerData timerData = CoderUtils.decodeFromByteArray(timerDataCoder,
+ TimerData timerData = CoderUtils.decodeFromByteArray(timers.timerDataCoder,
timerIt.next().buffer);
- if (timerData.getTimestamp().isBefore(currentWatermark)) {
+ if (timerData.getTimestamp().isBefore(currentTime)) {
toFire.put(keyWithTimers.getKey(), timerData);
timerIt.remove();
}
@@ -147,55 +161,106 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
it.remove();
}
}
- return toFire;
- }
- private void registerActiveTimer(K key, TimerData timer) {
- final Slice keyBytes;
- try {
- keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- Set<Slice> timersForKey = activeTimers.get(keyBytes);
- if (timersForKey == null) {
- timersForKey = new HashSet<>();
+ // fire ready timers
+ if (!toFire.isEmpty()) {
+ for (Slice keyBytes : toFire.keySet()) {
+ try {
+ K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
+ timerProcessor.fireTimer(key, toFire.get(keyBytes));
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
+ }
+ private Slice getKeyBytes(K key) {
try {
- Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer));
- timersForKey.add(timerBytes);
+ return new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
} catch (CoderException e) {
throw new RuntimeException(e);
}
-
- activeTimers.put(keyBytes, timersForKey);
}
- private void unregisterActiveTimer(K key, TimerData timer) {
- final Slice keyBytes;
- try {
- keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
- } catch (CoderException e) {
- throw new RuntimeException(e);
+ protected static class TimerSet implements Serializable {
+ private final Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
+ private final TimerDataCoder timerDataCoder;
+
+ protected TimerSet(TimerDataCoder timerDataCoder) {
+ this.timerDataCoder = timerDataCoder;
}
- Set<Slice> timersForKey = activeTimers.get(keyBytes);
- if (timersForKey != null) {
+ public void addTimer(Slice keyBytes, TimerData timer) {
+ Set<Slice> timersForKey = activeTimers.get(keyBytes);
+ if (timersForKey == null) {
+ timersForKey = new HashSet<>();
+ }
+
try {
Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer));
timersForKey.add(timerBytes);
- timersForKey.remove(timerBytes);
} catch (CoderException e) {
throw new RuntimeException(e);
}
+ activeTimers.put(keyBytes, timersForKey);
+ }
+
+ public void deleteTimer(Slice keyBytes, StateNamespace namespace, String timerId) {
+ Set<Slice> timersForKey = activeTimers.get(keyBytes);
+ if (timersForKey == null) {
+ return;
+ }
+
+ Iterator<Slice> timerIt = timersForKey.iterator();
+ while (timerIt.hasNext()) {
+ try {
+ TimerData timerData = CoderUtils.decodeFromByteArray(timerDataCoder,
+ timerIt.next().buffer);
+ ComparisonChain chain =
+ ComparisonChain.start().compare(timerData.getTimerId(), timerId);
+ if (chain.result() == 0 && !timerData.getNamespace().equals(namespace)) {
+ // Obtaining the stringKey may be expensive; only do so if required
+ chain = chain.compare(timerData.getNamespace().stringKey(), namespace.stringKey());
+ }
+ if (chain.result() == 0) {
+ timerIt.remove();
+ }
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
if (timersForKey.isEmpty()) {
activeTimers.remove(keyBytes);
- } else {
- activeTimers.put(keyBytes, timersForKey);
}
}
+
+ public void deleteTimer(Slice keyBytes, TimerData timerKey) {
+ Set<Slice> timersForKey = activeTimers.get(keyBytes);
+ if (timersForKey != null) {
+ try {
+ Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timerKey));
+ timersForKey.add(timerBytes);
+ timersForKey.remove(timerBytes);
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (timersForKey.isEmpty()) {
+ activeTimers.remove(keyBytes);
+ } else {
+ activeTimers.put(keyBytes, timersForKey);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ protected Map<Slice, Set<Slice>> getMap() {
+ return activeTimers;
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
deleted file mode 100644
index 091fe3b..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-import java.util.Arrays;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
-import org.apache.beam.runners.core.StateMerging;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.GroupingState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for {@link ApexStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
- */
-public class ApexStateInternalsTest {
- private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
- private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
- private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
- private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
-
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
- StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
- SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
- StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState>
- WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
- private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
-
- private ApexStateInternals<String> underTest;
-
- @Before
- public void initStateInternals() {
- underTest = new ApexStateInternals.ApexStateBackend()
- .newStateInternalsFactory(StringUtf8Coder.of())
- .stateInternalsForKey((String) null);
- }
-
- @Test
- public void testBag() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
- assertThat(value.read(), Matchers.emptyIterable());
- value.add("hello");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
- value.add("world");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
- value.clear();
- assertThat(value.read(), Matchers.emptyIterable());
- assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
- }
-
- @Test
- public void testBagIsEmpty() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add("hello");
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeBagIntoSource() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
- // Reading the merged bag gets both the contents
- assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testMergeBagIntoNewNamespace() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
- BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
- // Reading the merged bag gets both the contents
- assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag1.read(), Matchers.emptyIterable());
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testCombiningValue() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
- assertThat(value.read(), Matchers.equalTo(0));
- value.add(2);
- assertThat(value.read(), Matchers.equalTo(2));
-
- value.add(3);
- assertThat(value.read(), Matchers.equalTo(5));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(0));
- assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
- }
-
- @Test
- public void testCombiningIsEmpty() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(5);
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeCombiningValueIntoSource() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- assertThat(value1.read(), Matchers.equalTo(11));
- assertThat(value2.read(), Matchers.equalTo(10));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
- assertThat(value1.read(), Matchers.equalTo(21));
- assertThat(value2.read(), Matchers.equalTo(0));
- }
-
- @Test
- public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value3 =
- underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
- // Merging clears the old values and updates the result value.
- assertThat(value1.read(), Matchers.equalTo(0));
- assertThat(value2.read(), Matchers.equalTo(0));
- assertThat(value3.read(), Matchers.equalTo(21));
- }
-
- @Test
- public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.add(new Instant(3000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.add(new Instant(1000));
- assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
- }
-
- @Test
- public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState value =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.add(new Instant(3000));
- assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
- value.add(new Instant(1000));
- assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
- }
-
- @Test
- public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
- }
-
- @Test
- public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(new Instant(1000));
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState value2 =
- underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the merged value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
-
- assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
- assertThat(value2.read(), Matchers.equalTo(null));
- }
-
- @Test
- public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState value1 =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value2 =
- underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState value3 =
- underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
-
- // Merging clears the old values and updates the result value.
- assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
- assertThat(value1.read(), Matchers.equalTo(null));
- assertThat(value2.read(), Matchers.equalTo(null));
- }
-
- @Test
- public void testSerialization() throws Exception {
- ApexStateInternalsFactory<String> sif = new ApexStateBackend().
- newStateInternalsFactory(StringUtf8Coder.of());
- ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
-
- ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
- assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
- value.write("hello");
-
- ApexStateInternalsFactory<String> cloned;
- assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
- ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
-
- ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
- assertThat(clonedValue.read(), Matchers.equalTo("hello"));
- assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
index 1d7e586..ee142e2 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -22,8 +22,12 @@ import static org.junit.Assert.assertNotNull;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.netlet.util.Slice;
-import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.apex.translation.operators.ApexTimerInternals.TimerProcessor;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
@@ -40,6 +44,15 @@ public class ApexTimerInternalsTest {
@Test
public void testEventTimeTimers() {
+
+ final Map<String, Collection<TimerData>> firedTimers = new HashMap<>();
+ TimerProcessor<String> timerProcessor = new TimerProcessor<String>() {
+ @Override
+ public void fireTimer(String key, Collection<TimerData> timerData) {
+ firedTimers.put(key, timerData);
+ }
+ };
+
TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
String key1 = "key1";
Instant instant0 = new Instant(0);
@@ -57,17 +70,60 @@ public class ApexTimerInternalsTest {
instant1, TimeDomain.EVENT_TIME);
timerInternals.setTimer(timerData1);
- Multimap<Slice, TimerData> timers = timerInternals.getTimersReadyToProcess(
- instant0.getMillis());
- assertEquals(0, timers.size());
+ timerInternals.fireReadyTimers(instant0.getMillis(), timerProcessor, TimeDomain.EVENT_TIME);
+ assertEquals(0, firedTimers.size());
+ firedTimers.clear();
- timers = timerInternals.getTimersReadyToProcess(instant1.getMillis());
- assertEquals(1, timers.size());
- assertEquals(Sets.newHashSet(timerData0), Sets.newHashSet(timers.values()));
+ timerInternals.fireReadyTimers(instant1.getMillis(), timerProcessor,
+ TimeDomain.PROCESSING_TIME);
+ assertEquals(0, firedTimers.size());
+ timerInternals.fireReadyTimers(instant1.getMillis(), timerProcessor, TimeDomain.EVENT_TIME);
+ assertEquals(1, firedTimers.size());
+ assertEquals(Sets.newHashSet(timerData0),
+ Sets.newHashSet(firedTimers.values().iterator().next()));
+ firedTimers.clear();
- timers = timerInternals.getTimersReadyToProcess(instant2.getMillis());
- assertEquals(1, timers.size());
- assertEquals(Sets.newHashSet(timerData1), Sets.newHashSet(timers.values()));
+ timerInternals.fireReadyTimers(instant2.getMillis(), timerProcessor, TimeDomain.EVENT_TIME);
+ assertEquals(1, firedTimers.size());
+ assertEquals(Sets.newHashSet(timerData1),
+ Sets.newHashSet(firedTimers.values().iterator().next()));
+ firedTimers.clear();
+ }
+
+ @Test
+ public void testDeleteTimer() {
+ TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+ String key1 = "key1";
+ Instant instant0 = new Instant(0);
+ Instant instant1 = new Instant(1);
+
+ ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
+ timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now());
+
+ TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(),
+ instant0, TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timerData0);
+
+ TimerData timerData1 = TimerData.of("timerData1", StateNamespaces.global(),
+ instant1, TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timerData1);
+
+ Map<?, Set<Slice>> timerMap = timerInternals.getTimerSet(TimeDomain.EVENT_TIME).getMap();
+ assertEquals(1, timerMap.size());
+ assertEquals(2, timerMap.values().iterator().next().size());
+
+ timerInternals.deleteTimer(timerData0.getNamespace(), timerData0.getTimerId());
+ assertEquals(1, timerMap.size());
+ assertEquals(1, timerMap.values().iterator().next().size());
+
+ timerInternals.deleteTimer(timerData1.getNamespace(), timerData1.getTimerId(),
+ TimeDomain.PROCESSING_TIME);
+ assertEquals(1, timerMap.size());
+ assertEquals(1, timerMap.values().iterator().next().size());
+
+ timerInternals.deleteTimer(timerData1.getNamespace(), timerData1.getTimerId(),
+ TimeDomain.EVENT_TIME);
+ assertEquals(0, timerMap.size());
}
@Test
@@ -82,7 +138,7 @@ public class ApexTimerInternalsTest {
ApexTimerInternals<String> cloned;
assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(timerInternals));
cloned.setContext(key, StringUtf8Coder.of(), Instant.now());
- Multimap<Slice, TimerData> timers = cloned.getTimersReadyToProcess(new Instant(1).getMillis());
+ Map<?, Set<Slice>> timers = cloned.getTimerSet(TimeDomain.EVENT_TIME).getMap();
assertEquals(1, timers.size());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb860388/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
new file mode 100644
index 0000000..225b654
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+import java.util.Arrays;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
+import org.apache.beam.runners.core.StateMerging;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaceForTest;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ApexStateInternals}. This is based on the tests for
+ * {@code InMemoryStateInternals}.
+ */
+public class ApexStateInternalsTest {
+ private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
+ private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
+ private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
+ private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
+
+ private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ StateTags.value("stringValue", StringUtf8Coder.of());
+ private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+ SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+ "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
+ private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ StateTags.bag("stringBag", StringUtf8Coder.of());
+ private static final StateTag<Object, WatermarkHoldState>
+ WATERMARK_EARLIEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+ private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
+
+ private ApexStateInternals<String> underTest;
+
+ @Before
+ public void initStateInternals() {
+ underTest = new ApexStateInternals.ApexStateBackend()
+ .newStateInternalsFactory(StringUtf8Coder.of())
+ .stateInternalsForKey((String) null);
+ }
+
+ @Test
+ public void testBag() throws Exception {
+ BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+ assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+
+ assertThat(value.read(), Matchers.emptyIterable());
+ value.add("hello");
+ assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+
+ value.add("world");
+ assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+
+ value.clear();
+ assertThat(value.read(), Matchers.emptyIterable());
+ assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+
+ }
+
+ @Test
+ public void testBagIsEmpty() throws Exception {
+ BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add("hello");
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeBagIntoSource() throws Exception {
+ BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+ BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+ bag1.add("Hello");
+ bag2.add("World");
+ bag1.add("!");
+
+ StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+ // Reading the merged bag gets both the contents
+ assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+ assertThat(bag2.read(), Matchers.emptyIterable());
+ }
+
+ @Test
+ public void testMergeBagIntoNewNamespace() throws Exception {
+ BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+ BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+ BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+ bag1.add("Hello");
+ bag2.add("World");
+ bag1.add("!");
+
+ StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+ // Reading the merged bag gets both the contents
+ assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+ assertThat(bag1.read(), Matchers.emptyIterable());
+ assertThat(bag2.read(), Matchers.emptyIterable());
+ }
+
+ @Test
+ public void testCombiningValue() throws Exception {
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+ assertThat(value.read(), Matchers.equalTo(0));
+ value.add(2);
+ assertThat(value.read(), Matchers.equalTo(2));
+
+ value.add(3);
+ assertThat(value.read(), Matchers.equalTo(5));
+
+ value.clear();
+ assertThat(value.read(), Matchers.equalTo(0));
+ assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
+ }
+
+ @Test
+ public void testCombiningIsEmpty() throws Exception {
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add(5);
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeCombiningValueIntoSource() throws Exception {
+ CombiningState<Integer, int[], Integer> value1 =
+ underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ CombiningState<Integer, int[], Integer> value2 =
+ underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+ value1.add(5);
+ value2.add(10);
+ value1.add(6);
+
+ assertThat(value1.read(), Matchers.equalTo(11));
+ assertThat(value2.read(), Matchers.equalTo(10));
+
+ // Merging clears the old values and updates the result value.
+ StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+ assertThat(value1.read(), Matchers.equalTo(21));
+ assertThat(value2.read(), Matchers.equalTo(0));
+ }
+
+ @Test
+ public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+ CombiningState<Integer, int[], Integer> value1 =
+ underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ CombiningState<Integer, int[], Integer> value2 =
+ underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+ CombiningState<Integer, int[], Integer> value3 =
+ underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+ value1.add(5);
+ value2.add(10);
+ value1.add(6);
+
+ StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+ // Merging clears the old values and updates the result value.
+ assertThat(value1.read(), Matchers.equalTo(0));
+ assertThat(value2.read(), Matchers.equalTo(0));
+ assertThat(value3.read(), Matchers.equalTo(21));
+ }
+
+ @Test
+ public void testWatermarkEarliestState() throws Exception {
+ WatermarkHoldState value =
+ underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.add(new Instant(2000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+ value.add(new Instant(3000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+ value.add(new Instant(1000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
+
+ value.clear();
+ assertThat(value.read(), Matchers.equalTo(null));
+ assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
+ }
+
+ @Test
+ public void testWatermarkLatestState() throws Exception {
+ WatermarkHoldState value =
+ underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.add(new Instant(2000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+ value.add(new Instant(3000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+ value.add(new Instant(1000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+ value.clear();
+ assertThat(value.read(), Matchers.equalTo(null));
+ assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
+ }
+
+ @Test
+ public void testWatermarkEndOfWindowState() throws Exception {
+ WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.add(new Instant(2000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+ value.clear();
+ assertThat(value.read(), Matchers.equalTo(null));
+ assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
+ }
+
+ @Test
+ public void testWatermarkStateIsEmpty() throws Exception {
+ WatermarkHoldState value =
+ underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add(new Instant(1000));
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeEarliestWatermarkIntoSource() throws Exception {
+ WatermarkHoldState value1 =
+ underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+ WatermarkHoldState value2 =
+ underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+ value1.add(new Instant(3000));
+ value2.add(new Instant(5000));
+ value1.add(new Instant(4000));
+ value2.add(new Instant(2000));
+
+ // Merging clears the old values and updates the merged value.
+ StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
+
+ assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
+ assertThat(value2.read(), Matchers.equalTo(null));
+ }
+
+ @Test
+ public void testMergeLatestWatermarkIntoSource() throws Exception {
+ WatermarkHoldState value1 =
+ underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+ WatermarkHoldState value2 =
+ underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+ WatermarkHoldState value3 =
+ underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+ value1.add(new Instant(3000));
+ value2.add(new Instant(5000));
+ value1.add(new Instant(4000));
+ value2.add(new Instant(2000));
+
+ // Merging clears the old values and updates the result value.
+ StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
+
+ // Merging clears the old values and updates the result value.
+ assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
+ assertThat(value1.read(), Matchers.equalTo(null));
+ assertThat(value2.read(), Matchers.equalTo(null));
+ }
+
+ @Test
+ public void testSerialization() throws Exception {
+ ApexStateInternalsFactory<String> sif = new ApexStateBackend().
+ newStateInternalsFactory(StringUtf8Coder.of());
+ ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
+
+ ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+ value.write("hello");
+
+ ApexStateInternalsFactory<String> cloned;
+ assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
+ ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
+
+ ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ assertThat(clonedValue.read(), Matchers.equalTo("hello"));
+ assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+ }
+
+}
[2/2] beam git commit: This closes #2782: fix triggering for
processing time timers
Posted by ke...@apache.org.
This closes #2782: fix triggering for processing time timers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/202aae9d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/202aae9d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/202aae9d
Branch: refs/heads/master
Commit: 202aae9d3a85a4fff41943e4c9a13618bcf8acc4
Parents: fc55d2f eb86038
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Apr 30 08:43:35 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Apr 30 08:43:35 2017 -0700
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 41 ++-
.../operators/ApexTimerInternals.java | 155 +++++---
.../translation/ApexStateInternalsTest.java | 368 -------------------
.../operators/ApexTimerInternalsTest.java | 78 +++-
.../utils/ApexStateInternalsTest.java | 367 ++++++++++++++++++
5 files changed, 567 insertions(+), 442 deletions(-)
----------------------------------------------------------------------