You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/28 19:36:00 UTC
[1/2] beam git commit: BEAM-1981 Fix ApexTimerInternals serialization
error.
Repository: beam
Updated Branches:
refs/heads/master f9406c766 -> 2bd668fdb
BEAM-1981 Fix ApexTimerInternals serialization error.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f37d9548
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f37d9548
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f37d9548
Branch: refs/heads/master
Commit: f37d954873ae03cdae854837570b62e334be9022
Parents: 185dc47
Author: Thomas Weise <th...@apache.org>
Authored: Thu Apr 27 02:06:50 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Apr 27 23:51:16 2017 -0700
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 165 +--------
.../operators/ApexParDoOperator.java | 104 ++----
.../operators/ApexTimerInternals.java | 201 ++++++++++
.../translation/ApexStateInternalsTest.java | 369 +++++++++++++++++++
.../operators/ApexTimerInternalsTest.java | 89 +++++
.../utils/ApexStateInternalsTest.java | 368 ------------------
6 files changed, 692 insertions(+), 604 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 7d17ac6..f8b6653 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -29,38 +29,28 @@ import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
-import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -93,9 +83,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
private final SerializablePipelineOptions serializedOptions;
@Bind(JavaSerializer.class)
private final StateInternalsFactory<K> stateInternalsFactory;
- private Map<Slice, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
-
- private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
+ private final ApexTimerInternals<K> timerInternals;
private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input =
@@ -137,12 +125,16 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
this.stateInternalsFactory = stateBackend.newStateInternalsFactory(keyCoder);
+ TimerInternals.TimerDataCoder timerCoder =
+ TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+ this.timerInternals = new ApexTimerInternals<>(timerCoder);
}
@SuppressWarnings("unused") // for Kryo
private ApexGroupByKeyOperator() {
this.serializedOptions = null;
this.stateInternalsFactory = null;
+ this.timerInternals = null;
}
@Override
@@ -203,40 +195,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
serializedOptions.get());
}
- /**
- * Returns the list of timers that are ready to fire. These are the timers
- * that are registered to be triggered at a time before the current watermark.
- * We keep these timers in a Set, so that they are deduplicated, as the same
- * timer can be registered multiple times.
- */
- private Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess(
- long currentWatermark) {
-
- // we keep the timers to return in a different list and launch them later
- // because we cannot prevent a trigger from registering another trigger,
- // which would lead to concurrent modification exception.
- Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create();
-
- Iterator<Map.Entry<Slice, Set<TimerInternals.TimerData>>> it =
- activeTimers.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<Slice, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
-
- Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
- while (timerIt.hasNext()) {
- TimerInternals.TimerData timerData = timerIt.next();
- if (timerData.getTimestamp().isBefore(currentWatermark)) {
- toFire.put(keyWithTimers.getKey(), timerData);
- timerIt.remove();
- }
- }
-
- if (keyWithTimers.getValue().isEmpty()) {
- it.remove();
- }
- }
- return toFire;
- }
private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
final KV<K, V> kv = windowedValue.getValue();
@@ -244,58 +202,21 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPane());
- timerInternals.setKey(kv.getKey());
+ timerInternals.setContext(kv.getKey(), this.keyCoder, this.inputWatermark);
ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
newReduceFnRunner(kv.getKey());
reduceFnRunner.processElements(Collections.singletonList(updatedWindowedValue));
reduceFnRunner.persist();
}
- private StateInternals<K> getStateInternalsForKey(K key) {
- return stateInternalsFactory.stateInternalsForKey(key);
- }
-
- private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
- final Slice keyBytes;
- try {
- keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
- if (timersForKey == null) {
- timersForKey = new HashSet<>();
- }
- timersForKey.add(timer);
- activeTimers.put(keyBytes, timersForKey);
- }
-
- private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
- final Slice keyBytes;
- try {
- keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
- if (timersForKey != null) {
- timersForKey.remove(timer);
- if (timersForKey.isEmpty()) {
- activeTimers.remove(keyBytes);
- } else {
- activeTimers.put(keyBytes, timersForKey);
- }
- }
- }
-
private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
this.inputWatermark = new Instant(mark.getTimestamp());
- Multimap<Slice, TimerInternals.TimerData> timers = getTimersReadyToProcess(
+ Multimap<Slice, TimerInternals.TimerData> timers = timerInternals.getTimersReadyToProcess(
mark.getTimestamp());
if (!timers.isEmpty()) {
for (Slice keyBytes : timers.keySet()) {
K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
- timerInternals.setKey(key);
+ timerInternals.setContext(key, keyCoder, inputWatermark);
ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
reduceFnRunner.onTimers(timers.get(keyBytes));
reduceFnRunner.persist();
@@ -303,74 +224,4 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
}
- /**
- * An implementation of Beam's {@link TimerInternals}.
- *
- */
- private class ApexTimerInternals implements TimerInternals {
- private K key;
-
- public void setKey(K key) {
- this.key = key;
- }
-
- @Deprecated
- @Override
- public void setTimer(TimerData timerData) {
- registerActiveTimer(key, timerData);
- }
-
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
- throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
- }
-
- @Deprecated
- @Override
- public void deleteTimer(TimerData timerKey) {
- unregisterActiveTimer(key, timerKey);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return Instant.now();
- }
-
- @Override
- public Instant currentSynchronizedProcessingTime() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return inputWatermark;
- }
-
- @Override
- public Instant currentOutputWatermarkTime() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void setTimer(StateNamespace namespace, String timerId, Instant target,
- TimeDomain timeDomain) {
- throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
- }
-
- @Deprecated
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
- }
-
- }
-
- private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K> {
- @Override
- public StateInternals<K> stateInternalsForKey(K key) {
- return getStateInternalsForKey(key);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index f4c617d..9b5a75c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -28,7 +28,6 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.apex.ApexPipelineOptions;
@@ -42,17 +41,16 @@ import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializa
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.DoFn;
@@ -60,9 +58,9 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
@@ -90,12 +88,11 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
private final WindowingStrategy<?, ?> windowingStrategy;
@Bind(JavaSerializer.class)
private final List<PCollectionView<?>> sideInputs;
+ @Bind(JavaSerializer.class)
+ private final Coder<WindowedValue<InputT>> inputCoder;
private StateInternalsProxy<?> currentKeyStateInternals;
- // TODO: if the operator gets restored to checkpointed state due to a failure,
- // the timer state is lost.
- private final transient CurrentKeyTimerInternals<Object> currentKeyTimerInternals =
- new CurrentKeyTimerInternals<>();
+ private final ApexTimerInternals<Object> currentKeyTimerInternals;
private final StateInternals<Void> sideInputStateInternals;
private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
@@ -134,10 +131,14 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
throw new UnsupportedOperationException(msg);
}
- Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
+ Coder<List<WindowedValue<InputT>>> listCoder = ListCoder.of(inputCoder);
this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(),
- coder);
+ listCoder);
+ this.inputCoder = inputCoder;
+ TimerInternals.TimerDataCoder timerCoder =
+ TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+ this.currentKeyTimerInternals = new ApexTimerInternals<>(timerCoder);
}
@SuppressWarnings("unused") // for Kryo
@@ -150,6 +151,8 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
this.sideInputs = null;
this.pushedBack = null;
this.sideInputStateInternals = null;
+ this.inputCoder = null;
+ this.currentKeyTimerInternals = null;
}
public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input =
@@ -253,14 +256,23 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
pushbackDoFnRunner.startBundle();
if (currentKeyStateInternals != null) {
InputT value = elem.getValue();
- Object key;
+ final Object key;
+ final Coder<Object> keyCoder;
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ WindowedValueCoder<InputT> wvCoder = (WindowedValueCoder) inputCoder;
if (value instanceof KeyedWorkItem) {
key = ((KeyedWorkItem) value).key();
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ KeyedWorkItemCoder<Object, ?> kwiCoder = (KeyedWorkItemCoder) wvCoder.getValueCoder();
+ keyCoder = kwiCoder.getKeyCoder();
} else {
key = ((KV) value).getKey();
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ KvCoder<Object, ?> kwiCoder = (KvCoder) wvCoder.getValueCoder();
+ keyCoder = kwiCoder.getKeyCoder();
}
((StateInternalsProxy) currentKeyStateInternals).setKey(key);
- currentKeyTimerInternals.currentKey = key;
+ currentKeyTimerInternals.setContext(key, keyCoder, new Instant(this.currentInputWatermark));
}
Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
.processElementInReadyWindows(elem);
@@ -400,70 +412,4 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
}
- private class CurrentKeyTimerInternals<K> implements TimerInternals {
-
- private TimerInternalsFactory<K> factory = new TimerInternalsFactory<K>() {
- @Override
- public TimerInternals timerInternalsForKey(K key) {
- InMemoryTimerInternals timerInternals = perKeyTimerInternals.get(key);
- if (timerInternals == null) {
- perKeyTimerInternals.put(key, timerInternals = new InMemoryTimerInternals());
- }
- return timerInternals;
- }
- };
-
- // TODO: durable state store
- final Map<K, InMemoryTimerInternals> perKeyTimerInternals = new HashMap<>();
- private K currentKey;
-
- @Override
- public void setTimer(StateNamespace namespace, String timerId, Instant target,
- TimeDomain timeDomain) {
- factory.timerInternalsForKey(currentKey).setTimer(
- namespace, timerId, target, timeDomain);
- }
-
- @Override
- public void setTimer(TimerData timerData) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void deleteTimer(TimerData timerKey) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Instant currentProcessingTime() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Instant currentSynchronizedProcessingTime() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return new Instant(currentInputWatermark);
- }
-
- @Override
- public Instant currentOutputWatermarkTime() {
- throw new UnsupportedOperationException();
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
new file mode 100644
index 0000000..b142095
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.netlet.util.Slice;
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of Beam's {@link TimerInternals}.
+ *
+ * <p>Assumes that the current key is set prior to accessing the state.<br>
+ * This implementation stores timer data in heap memory and is serialized
+ * during checkpointing, it will only work with a small number of timers.
+ * @param <K>
+ */
+@DefaultSerializer(JavaSerializer.class)
+class ApexTimerInternals<K> implements TimerInternals, Serializable {
+
+ private Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
+ private TimerDataCoder timerDataCoder;
+ private transient K currentKey;
+ private transient Instant currentInputWatermark;
+ private transient Coder<K> keyCoder;
+
+ public ApexTimerInternals(TimerDataCoder timerDataCoder) {
+ this.timerDataCoder = timerDataCoder;
+ }
+
+ public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark) {
+ this.currentKey = key;
+ this.keyCoder = keyCoder;
+ this.currentInputWatermark = inputWatermark;
+ }
+
+ @Override
+ public void setTimer(StateNamespace namespace, String timerId, Instant target,
+ TimeDomain timeDomain) {
+ TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain);
+ registerActiveTimer(currentKey, timerData);
+ }
+
+ @Override
+ public void setTimer(TimerData timerData) {
+ registerActiveTimer(currentKey, timerData);
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteTimer(TimerData timerKey) {
+ unregisterActiveTimer(currentKey, timerKey);
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return Instant.now();
+ }
+
+ @Override
+ public Instant currentSynchronizedProcessingTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return new Instant(currentInputWatermark);
+ }
+
+ @Override
+ public Instant currentOutputWatermarkTime() {
+ return null;
+ }
+
+ /**
+ * Returns the list of timers that are ready to fire. These are the timers
+ * that are registered to be triggered at a time before the current watermark.
+ * We keep these timers in a Set, so that they are deduplicated, as the same
+ * timer can be registered multiple times.
+ */
+ public Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess(
+ long currentWatermark) {
+
+ // we keep the timers to return in a different list and launch them later
+ // because we cannot prevent a trigger from registering another timer,
+ // which would lead to concurrent modification exception.
+ Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+ Iterator<Map.Entry<Slice, Set<Slice>>> it =
+ activeTimers.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Slice, Set<Slice>> keyWithTimers = it.next();
+
+ Iterator<Slice> timerIt = keyWithTimers.getValue().iterator();
+ while (timerIt.hasNext()) {
+ try {
+ TimerData timerData = CoderUtils.decodeFromByteArray(timerDataCoder,
+ timerIt.next().buffer);
+ if (timerData.getTimestamp().isBefore(currentWatermark)) {
+ toFire.put(keyWithTimers.getKey(), timerData);
+ timerIt.remove();
+ }
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (keyWithTimers.getValue().isEmpty()) {
+ it.remove();
+ }
+ }
+ return toFire;
+ }
+
+ private void registerActiveTimer(K key, TimerData timer) {
+ final Slice keyBytes;
+ try {
+ keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ Set<Slice> timersForKey = activeTimers.get(keyBytes);
+ if (timersForKey == null) {
+ timersForKey = new HashSet<>();
+ }
+
+ try {
+ Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer));
+ timersForKey.add(timerBytes);
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+
+ activeTimers.put(keyBytes, timersForKey);
+ }
+
+ private void unregisterActiveTimer(K key, TimerData timer) {
+ final Slice keyBytes;
+ try {
+ keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+
+ Set<Slice> timersForKey = activeTimers.get(keyBytes);
+ if (timersForKey != null) {
+ try {
+ Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timer));
+ timersForKey.add(timerBytes);
+ timersForKey.remove(timerBytes);
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (timersForKey.isEmpty()) {
+ activeTimers.remove(keyBytes);
+ } else {
+ activeTimers.put(keyBytes, timersForKey);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
new file mode 100644
index 0000000..4021c62
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+import java.util.Arrays;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
+import org.apache.beam.runners.core.StateMerging;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaceForTest;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ApexStateInternals}. This is based on the tests for
+ * {@code InMemoryStateInternals}.
+ */
+public class ApexStateInternalsTest {
+ private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
+ private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
+ private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
+ private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
+
+ private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ StateTags.value("stringValue", StringUtf8Coder.of());
+ private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
+ SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+ "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
+ private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ StateTags.bag("stringBag", StringUtf8Coder.of());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+ WATERMARK_EARLIEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+ WATERMARK_LATEST_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+
+ private ApexStateInternals<String> underTest;
+
+ @Before
+ public void initStateInternals() {
+ underTest = new ApexStateInternals.ApexStateBackend()
+ .newStateInternalsFactory(StringUtf8Coder.of())
+ .stateInternalsForKey((String) null);
+ }
+
+ @Test
+ public void testBag() throws Exception {
+ BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+ assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+
+ assertThat(value.read(), Matchers.emptyIterable());
+ value.add("hello");
+ assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+
+ value.add("world");
+ assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+
+ value.clear();
+ assertThat(value.read(), Matchers.emptyIterable());
+ assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+
+ }
+
+ @Test
+ public void testBagIsEmpty() throws Exception {
+ BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add("hello");
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeBagIntoSource() throws Exception {
+ BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+ BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+ bag1.add("Hello");
+ bag2.add("World");
+ bag1.add("!");
+
+ StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+ // Reading the merged bag gets both the contents
+ assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+ assertThat(bag2.read(), Matchers.emptyIterable());
+ }
+
+ @Test
+ public void testMergeBagIntoNewNamespace() throws Exception {
+ BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+ BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+ BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+ bag1.add("Hello");
+ bag2.add("World");
+ bag1.add("!");
+
+ StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+ // Reading the merged bag gets both the contents
+ assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+ assertThat(bag1.read(), Matchers.emptyIterable());
+ assertThat(bag2.read(), Matchers.emptyIterable());
+ }
+
+ @Test
+ public void testCombiningValue() throws Exception {
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+ assertThat(value.read(), Matchers.equalTo(0));
+ value.add(2);
+ assertThat(value.read(), Matchers.equalTo(2));
+
+ value.add(3);
+ assertThat(value.read(), Matchers.equalTo(5));
+
+ value.clear();
+ assertThat(value.read(), Matchers.equalTo(0));
+ assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
+ }
+
+ @Test
+ public void testCombiningIsEmpty() throws Exception {
+ GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add(5);
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeCombiningValueIntoSource() throws Exception {
+ CombiningState<Integer, int[], Integer> value1 =
+ underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ CombiningState<Integer, int[], Integer> value2 =
+ underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+ value1.add(5);
+ value2.add(10);
+ value1.add(6);
+
+ assertThat(value1.read(), Matchers.equalTo(11));
+ assertThat(value2.read(), Matchers.equalTo(10));
+
+ // Merging clears the old values and updates the result value.
+ StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+ assertThat(value1.read(), Matchers.equalTo(21));
+ assertThat(value2.read(), Matchers.equalTo(0));
+ }
+
+ @Test
+ public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+ CombiningState<Integer, int[], Integer> value1 =
+ underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ CombiningState<Integer, int[], Integer> value2 =
+ underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+ CombiningState<Integer, int[], Integer> value3 =
+ underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+ value1.add(5);
+ value2.add(10);
+ value1.add(6);
+
+ StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+ // Merging clears the old values and updates the result value.
+ assertThat(value1.read(), Matchers.equalTo(0));
+ assertThat(value2.read(), Matchers.equalTo(0));
+ assertThat(value3.read(), Matchers.equalTo(21));
+ }
+
+ @Test
+ public void testWatermarkEarliestState() throws Exception {
+ WatermarkHoldState<BoundedWindow> value =
+ underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.add(new Instant(2000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+ value.add(new Instant(3000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+ value.add(new Instant(1000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
+
+ value.clear();
+ assertThat(value.read(), Matchers.equalTo(null));
+ assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
+ }
+
+ @Test
+ public void testWatermarkLatestState() throws Exception {
+ WatermarkHoldState<BoundedWindow> value =
+ underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.add(new Instant(2000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+ value.add(new Instant(3000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+ value.add(new Instant(1000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+ value.clear();
+ assertThat(value.read(), Matchers.equalTo(null));
+ assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
+ }
+
+ @Test
+ public void testWatermarkEndOfWindowState() throws Exception {
+ WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+
+ // State instances are cached, but depend on the namespace.
+ assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+ assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
+
+ assertThat(value.read(), Matchers.nullValue());
+ value.add(new Instant(2000));
+ assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+ value.clear();
+ assertThat(value.read(), Matchers.equalTo(null));
+ assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
+ }
+
+ @Test
+ public void testWatermarkStateIsEmpty() throws Exception {
+ WatermarkHoldState<BoundedWindow> value =
+ underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+ assertThat(value.isEmpty().read(), Matchers.is(true));
+ ReadableState<Boolean> readFuture = value.isEmpty();
+ value.add(new Instant(1000));
+ assertThat(readFuture.read(), Matchers.is(false));
+
+ value.clear();
+ assertThat(readFuture.read(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeEarliestWatermarkIntoSource() throws Exception {
+ WatermarkHoldState<BoundedWindow> value1 =
+ underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+ WatermarkHoldState<BoundedWindow> value2 =
+ underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+ value1.add(new Instant(3000));
+ value2.add(new Instant(5000));
+ value1.add(new Instant(4000));
+ value2.add(new Instant(2000));
+
+ // Merging clears the old values and updates the merged value.
+ StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
+
+ assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
+ assertThat(value2.read(), Matchers.equalTo(null));
+ }
+
+ @Test
+ public void testMergeLatestWatermarkIntoSource() throws Exception {
+ WatermarkHoldState<BoundedWindow> value1 =
+ underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+ WatermarkHoldState<BoundedWindow> value2 =
+ underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+ WatermarkHoldState<BoundedWindow> value3 =
+ underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+ value1.add(new Instant(3000));
+ value2.add(new Instant(5000));
+ value1.add(new Instant(4000));
+ value2.add(new Instant(2000));
+
+ // Merging clears the old values and updates the result value.
+ StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
+
+ // Merging clears the old values and updates the result value.
+ assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
+ assertThat(value1.read(), Matchers.equalTo(null));
+ assertThat(value2.read(), Matchers.equalTo(null));
+ }
+
+ @Test
+ public void testSerialization() throws Exception {
+ ApexStateInternalsFactory<String> sif = new ApexStateBackend().
+ newStateInternalsFactory(StringUtf8Coder.of());
+ ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
+
+ ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+ value.write("hello");
+
+ ApexStateInternalsFactory<String> cloned;
+ assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
+ ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
+
+ ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ assertThat(clonedValue.read(), Matchers.equalTo("hello"));
+ assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
new file mode 100644
index 0000000..1d7e586
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ApexTimerInternals}.
+ */
+public class ApexTimerInternalsTest {
+
+ @Test
+ public void testEventTimeTimers() {
+ TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+ String key1 = "key1";
+ Instant instant0 = new Instant(0);
+ Instant instant1 = new Instant(1);
+ Instant instant2 = new Instant(2);
+
+ ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
+ timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now());
+
+ TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(),
+ instant0, TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timerData0);
+
+ TimerData timerData1 = TimerData.of("timerData1", StateNamespaces.global(),
+ instant1, TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timerData1);
+
+ Multimap<Slice, TimerData> timers = timerInternals.getTimersReadyToProcess(
+ instant0.getMillis());
+ assertEquals(0, timers.size());
+
+ timers = timerInternals.getTimersReadyToProcess(instant1.getMillis());
+ assertEquals(1, timers.size());
+ assertEquals(Sets.newHashSet(timerData0), Sets.newHashSet(timers.values()));
+
+ timers = timerInternals.getTimersReadyToProcess(instant2.getMillis());
+ assertEquals(1, timers.size());
+ assertEquals(Sets.newHashSet(timerData1), Sets.newHashSet(timers.values()));
+ }
+
+ @Test
+ public void testSerialization() {
+ TimerDataCoder timerDataCoder = TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+ TimerData timerData = TimerData.of("arbitrary-id", StateNamespaces.global(),
+ new Instant(0), TimeDomain.EVENT_TIME);
+ String key = "key";
+ ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
+ timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now());
+ timerInternals.setTimer(timerData);
+ ApexTimerInternals<String> cloned;
+ assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(timerInternals));
+ cloned.setContext(key, StringUtf8Coder.of(), Instant.now());
+ Multimap<Slice, TimerData> timers = cloned.getTimersReadyToProcess(new Instant(1).getMillis());
+ assertEquals(1, timers.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f37d9548/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
deleted file mode 100644
index 7160e45..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-import java.util.Arrays;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
-import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
-import org.apache.beam.runners.core.StateMerging;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaceForTest;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.GroupingState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for {@link ApexStateInternals}. This is based on the tests for
- * {@code InMemoryStateInternals}.
- */
-public class ApexStateInternalsTest {
- private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
- private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
- private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
- private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
-
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
- StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
- SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
- "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
- StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_EARLIEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
- WATERMARK_LATEST_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
-
- private ApexStateInternals<String> underTest;
-
- @Before
- public void initStateInternals() {
- underTest = new ApexStateInternals.ApexStateBackend()
- .newStateInternalsFactory(StringUtf8Coder.of())
- .stateInternalsForKey((String) null);
- }
-
- @Test
- public void testBag() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
-
- assertThat(value.read(), Matchers.emptyIterable());
- value.add("hello");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
-
- value.add("world");
- assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
-
- value.clear();
- assertThat(value.read(), Matchers.emptyIterable());
- assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
-
- }
-
- @Test
- public void testBagIsEmpty() throws Exception {
- BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add("hello");
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeBagIntoSource() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
-
- // Reading the merged bag gets both the contents
- assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testMergeBagIntoNewNamespace() throws Exception {
- BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
- BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
-
- bag1.add("Hello");
- bag2.add("World");
- bag1.add("!");
-
- StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
-
- // Reading the merged bag gets both the contents
- assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
- assertThat(bag1.read(), Matchers.emptyIterable());
- assertThat(bag2.read(), Matchers.emptyIterable());
- }
-
- @Test
- public void testCombiningValue() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
-
- assertThat(value.read(), Matchers.equalTo(0));
- value.add(2);
- assertThat(value.read(), Matchers.equalTo(2));
-
- value.add(3);
- assertThat(value.read(), Matchers.equalTo(5));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(0));
- assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
- }
-
- @Test
- public void testCombiningIsEmpty() throws Exception {
- GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(5);
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeCombiningValueIntoSource() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- assertThat(value1.read(), Matchers.equalTo(11));
- assertThat(value2.read(), Matchers.equalTo(10));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
-
- assertThat(value1.read(), Matchers.equalTo(21));
- assertThat(value2.read(), Matchers.equalTo(0));
- }
-
- @Test
- public void testMergeCombiningValueIntoNewNamespace() throws Exception {
- CombiningState<Integer, int[], Integer> value1 =
- underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value2 =
- underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
- CombiningState<Integer, int[], Integer> value3 =
- underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
-
- value1.add(5);
- value2.add(10);
- value1.add(6);
-
- StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
-
- // Merging clears the old values and updates the result value.
- assertThat(value1.read(), Matchers.equalTo(0));
- assertThat(value2.read(), Matchers.equalTo(0));
- assertThat(value3.read(), Matchers.equalTo(21));
- }
-
- @Test
- public void testWatermarkEarliestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.add(new Instant(3000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.add(new Instant(1000));
- assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
- }
-
- @Test
- public void testWatermarkLatestState() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.add(new Instant(3000));
- assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
- value.add(new Instant(1000));
- assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
- }
-
- @Test
- public void testWatermarkEndOfWindowState() throws Exception {
- WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
-
- // State instances are cached, but depend on the namespace.
- assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
- assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
-
- assertThat(value.read(), Matchers.nullValue());
- value.add(new Instant(2000));
- assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
-
- value.clear();
- assertThat(value.read(), Matchers.equalTo(null));
- assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
- }
-
- @Test
- public void testWatermarkStateIsEmpty() throws Exception {
- WatermarkHoldState<BoundedWindow> value =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
-
- assertThat(value.isEmpty().read(), Matchers.is(true));
- ReadableState<Boolean> readFuture = value.isEmpty();
- value.add(new Instant(1000));
- assertThat(readFuture.read(), Matchers.is(false));
-
- value.clear();
- assertThat(readFuture.read(), Matchers.is(true));
- }
-
- @Test
- public void testMergeEarliestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
- underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
- underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the merged value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
-
- assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
- assertThat(value2.read(), Matchers.equalTo(null));
- }
-
- @Test
- public void testMergeLatestWatermarkIntoSource() throws Exception {
- WatermarkHoldState<BoundedWindow> value1 =
- underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value2 =
- underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
- WatermarkHoldState<BoundedWindow> value3 =
- underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
-
- value1.add(new Instant(3000));
- value2.add(new Instant(5000));
- value1.add(new Instant(4000));
- value2.add(new Instant(2000));
-
- // Merging clears the old values and updates the result value.
- StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
-
- // Merging clears the old values and updates the result value.
- assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
- assertThat(value1.read(), Matchers.equalTo(null));
- assertThat(value2.read(), Matchers.equalTo(null));
- }
-
- @Test
- public void testSerialization() throws Exception {
- ApexStateInternalsFactory<String> sif = new ApexStateBackend().
- newStateInternalsFactory(StringUtf8Coder.of());
- ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
-
- ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
- assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
- value.write("hello");
-
- ApexStateInternalsFactory<String> cloned;
- assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
- ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
-
- ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
- assertThat(clonedValue.read(), Matchers.equalTo("hello"));
- assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
- }
-
-}
[2/2] beam git commit: This closes #2734: Fix ApexTimerInternals
serialization error.
Posted by ke...@apache.org.
This closes #2734: Fix ApexTimerInternals serialization error.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2bd668fd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2bd668fd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2bd668fd
Branch: refs/heads/master
Commit: 2bd668fdb3048043610cba44b880373666d8a222
Parents: f9406c7 f37d954
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Apr 28 12:35:24 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Apr 28 12:35:24 2017 -0700
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 165 +--------
.../operators/ApexParDoOperator.java | 104 ++----
.../operators/ApexTimerInternals.java | 201 ++++++++++
.../translation/ApexStateInternalsTest.java | 369 +++++++++++++++++++
.../operators/ApexTimerInternalsTest.java | 89 +++++
.../utils/ApexStateInternalsTest.java | 368 ------------------
6 files changed, 692 insertions(+), 604 deletions(-)
----------------------------------------------------------------------