You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/07 19:59:24 UTC
[34/50] incubator-beam git commit: Flatten FiredTimers and
ExtractFiredTimers
Flatten FiredTimers and ExtractFiredTimers
Pass a single collection of fired timers, and have those objects contain
the associated transform and key that they fired for. Timers already
contain the domain they are in.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5dca2674
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5dca2674
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5dca2674
Branch: refs/heads/apex-runner
Commit: 5dca2674a8d145c6e619005c2282c6064cd7aab7
Parents: 6e1e57b
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 3 14:10:37 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 4 13:05:21 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/EvaluationContext.java | 6 +-
.../direct/ExecutorServiceParallelExecutor.java | 41 ++++----
.../beam/runners/direct/WatermarkManager.java | 79 ++++++++------
.../runners/direct/EvaluationContextTest.java | 23 ++---
.../runners/direct/WatermarkManagerTest.java | 102 ++++++-------------
5 files changed, 109 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 965e77d..b814def 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -391,11 +391,9 @@ class EvaluationContext {
* <p>This is a destructive operation. Timers will only appear in the result of this method once
* for each time they are set.
*/
- public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
+ public Collection<FiredTimers> extractFiredTimers() {
forceRefresh();
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
- watermarkManager.extractFiredTimers();
- return fired;
+ return watermarkManager.extractFiredTimers();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index e32f671..d1ffea1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
@@ -440,29 +439,23 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
*/
private void fireTimers() throws Exception {
try {
- for (Map.Entry<
- AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> transformTimers :
- evaluationContext.extractFiredTimers().entrySet()) {
- AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
- for (Map.Entry<StructuralKey<?>, FiredTimers> keyTimers :
- transformTimers.getValue().entrySet()) {
- for (TimeDomain domain : TimeDomain.values()) {
- Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain);
- if (delivery.isEmpty()) {
- continue;
- }
- KeyedWorkItem<?, Object> work =
- KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery);
- @SuppressWarnings({"unchecked", "rawtypes"})
- CommittedBundle<?> bundle =
- evaluationContext
- .createKeyedBundle(keyTimers.getKey(), (PCollection) transform.getInput())
- .add(WindowedValue.valueInGlobalWindow(work))
- .commit(evaluationContext.now());
- scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery));
- state.set(ExecutorState.ACTIVE);
- }
- }
+ for (FiredTimers transformTimers : evaluationContext.extractFiredTimers()) {
+ Collection<TimerData> delivery = transformTimers.getTimers();
+ KeyedWorkItem<?, Object> work =
+ KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CommittedBundle<?> bundle =
+ evaluationContext
+ .createKeyedBundle(
+ transformTimers.getKey(),
+ (PCollection) transformTimers.getTransform().getInput())
+ .add(WindowedValue.valueInGlobalWindow(work))
+ .commit(evaluationContext.now());
+ scheduleConsumption(
+ transformTimers.getTransform(),
+ bundle,
+ new TimerIterableCompletionCallback(delivery));
+ state.set(ExecutorState.ACTIVE);
}
} catch (Exception e) {
LOG.error("Internal Error while delivering timers", e);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 31b8091..f01c13c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -740,14 +740,17 @@ public class WatermarkManager {
wms =
new TransformWatermarks(
- inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark);
+ transform,
+ inputWatermark,
+ outputWatermark,
+ inputProcessingWatermark,
+ outputProcessingWatermark);
transformToWatermarks.put(transform, wms);
}
return wms;
}
- private Collection<Watermark> getInputProcessingWatermarks(
- AppliedPTransform<?, ?, ?> transform) {
+ private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
Collection<? extends PValue> inputs = transform.getInput().expand();
if (inputs.isEmpty()) {
@@ -924,15 +927,12 @@ public class WatermarkManager {
* Returns a map of each {@link PTransform} that has pending timers to those timers. All of the
* pending timers will be removed from this {@link WatermarkManager}.
*/
- public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> allTimers = new HashMap<>();
+ public Collection<FiredTimers> extractFiredTimers() {
+ Collection<FiredTimers> allTimers = new ArrayList<>();
for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
transformToWatermarks.entrySet()) {
- Map<StructuralKey<?>, FiredTimers> keyFiredTimers =
- watermarksEntry.getValue().extractFiredTimers();
- if (!keyFiredTimers.isEmpty()) {
- allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
- }
+ Collection<FiredTimers> firedTimers = watermarksEntry.getValue().extractFiredTimers();
+ allTimers.addAll(firedTimers);
}
return allTimers;
}
@@ -1043,6 +1043,8 @@ public class WatermarkManager {
* A reference to the input and output watermarks of an {@link AppliedPTransform}.
*/
public class TransformWatermarks {
+ private final AppliedPTransform<?, ?, ?> transform;
+
private final AppliedPTransformInputWatermark inputWatermark;
private final AppliedPTransformOutputWatermark outputWatermark;
@@ -1053,10 +1055,12 @@ public class WatermarkManager {
private Instant latestSynchronizedOutputWm;
private TransformWatermarks(
+ AppliedPTransform<?, ?, ?> transform,
AppliedPTransformInputWatermark inputWatermark,
AppliedPTransformOutputWatermark outputWatermark,
SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
+ this.transform = transform;
this.inputWatermark = inputWatermark;
this.outputWatermark = outputWatermark;
@@ -1128,7 +1132,7 @@ public class WatermarkManager {
synchronizedProcessingInputWatermark.addPending(bundle);
}
- private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() {
+ private Collection<FiredTimers> extractFiredTimers() {
Map<StructuralKey<?>, List<TimerData>> eventTimeTimers =
inputWatermark.extractFiredEventTimeTimers();
Map<StructuralKey<?>, List<TimerData>> processingTimers;
@@ -1137,31 +1141,33 @@ public class WatermarkManager {
TimeDomain.PROCESSING_TIME, clock.now());
synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
- Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>();
- groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers);
- Map<StructuralKey<?>, FiredTimers> keyFiredTimers = new HashMap<>();
- for (Map.Entry<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> firedTimers :
- groupedTimers.entrySet()) {
- keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue()));
+ Map<StructuralKey<?>, List<TimerData>> timersPerKey =
+ groupFiredTimers(eventTimeTimers, processingTimers, synchronizedTimers);
+ Collection<FiredTimers> keyFiredTimers = new ArrayList<>(timersPerKey.size());
+ for (Map.Entry<StructuralKey<?>, List<TimerData>> firedTimers :
+ timersPerKey.entrySet()) {
+ keyFiredTimers.add(
+ new FiredTimers(transform, firedTimers.getKey(), firedTimers.getValue()));
}
return keyFiredTimers;
}
@SafeVarargs
- private final void groupFiredTimers(
- Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedToMutate,
+ private final Map<StructuralKey<?>, List<TimerData>> groupFiredTimers(
Map<StructuralKey<?>, List<TimerData>>... timersToGroup) {
+ Map<StructuralKey<?>, List<TimerData>> groupedTimers = new HashMap<>();
for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) {
for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : subGroup.entrySet()) {
- Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey());
+ List<TimerData> grouped = groupedTimers.get(newTimers.getKey());
if (grouped == null) {
- grouped = new HashMap<>();
- groupedToMutate.put(newTimers.getKey(), grouped);
+ grouped = new ArrayList<>();
+ groupedTimers.put(newTimers.getKey(), grouped);
}
- grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue());
+ grouped.addAll(newTimers.getValue());
}
}
+ return groupedTimers;
}
private void updateTimers(TimerUpdate update) {
@@ -1334,24 +1340,35 @@ public class WatermarkManager {
* {@link WatermarkManager}.
*/
public static class FiredTimers {
- private final Map<TimeDomain, ? extends Collection<TimerData>> timers;
+ /** The transform the timers were set at and will be delivered to. */
+ private final AppliedPTransform<?, ?, ?> transform;
+ /** The key the timers were set for and will be delivered to. */
+ private final StructuralKey<?> key;
+ private final Collection<TimerData> timers;
- private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> timers) {
+ private FiredTimers(
+ AppliedPTransform<?, ?, ?> transform, StructuralKey<?> key, Collection<TimerData> timers) {
+ this.transform = transform;
+ this.key = key;
this.timers = timers;
}
+ public AppliedPTransform<?, ?, ?> getTransform() {
+ return transform;
+ }
+
+ public StructuralKey<?> getKey() {
+ return key;
+ }
+
/**
* Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers
* fired within the provided domain, return an empty collection.
*
* <p>Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp.
*/
- public Collection<TimerData> getTimers(TimeDomain domain) {
- Collection<TimerData> domainTimers = timers.get(domain);
- if (domainTimers == null) {
- return Collections.emptyList();
- }
- return domainTimers;
+ public Collection<TimerData> getTimers() {
+ return timers;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index bc53570..e1277ac 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -373,36 +373,31 @@ public class EvaluationContextTest {
.build();
// haven't added any timers, must be empty
- assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+ assertThat(context.extractFiredTimers(), emptyIterable());
context.handleResult(
context.createKeyedBundle(key, created).commit(Instant.now()),
ImmutableList.<TimerData>of(),
timerResult);
// timer hasn't fired
- assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+ assertThat(context.extractFiredTimers(), emptyIterable());
TransformResult advanceResult =
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
// Should cause the downstream timer to fire
context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
- context.extractFiredTimers();
+ Collection<FiredTimers> fired = context.extractFiredTimers();
assertThat(
- fired,
- Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal()));
- Map<StructuralKey<?>, FiredTimers> downstreamFired =
- fired.get(downstream.getProducingTransformInternal());
- assertThat(downstreamFired, Matchers.<Object>hasKey(key));
+ Iterables.getOnlyElement(fired).getKey(),
+ Matchers.<StructuralKey<?>>equalTo(key));
- FiredTimers firedForKey = downstreamFired.get(key);
- assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable());
- assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable());
- assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire));
+ FiredTimers firedForKey = Iterables.getOnlyElement(fired);
+ // Contains exclusively the fired timer
+ assertThat(firedForKey.getTimers(), contains(toFire));
// Don't reextract timers
- assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+ assertThat(context.extractFiredTimers(), emptyIterable());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 1954005..6bde462 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
@@ -68,6 +67,7 @@ import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Before;
@@ -915,12 +915,9 @@ public class WatermarkManagerTest implements Serializable {
filteredDoubledWms.getSynchronizedProcessingOutputTime(),
not(earlierThan(initialFilteredDoubledWm)));
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firedTimers =
- manager.extractFiredTimers();
+ Collection<FiredTimers> firedTimers = manager.extractFiredTimers();
assertThat(
- firedTimers.get(filtered.getProducingTransformInternal())
- .get(key)
- .getTimers(TimeDomain.PROCESSING_TIME),
+ Iterables.getOnlyElement(firedTimers).getTimers(),
contains(pastTimer));
// Our timer has fired, but has not been completed, so it holds our synchronized processing WM
assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime)));
@@ -1099,10 +1096,9 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void extractFiredTimersReturnsFiredEventTimeTimers() {
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers =
- manager.extractFiredTimers();
+ Collection<FiredTimers> initialTimers = manager.extractFiredTimers();
// Watermarks haven't advanced
- assertThat(initialTimers.entrySet(), emptyIterable());
+ assertThat(initialTimers, emptyIterable());
// Advance WM of keyed past the first timer, but ahead of the second and third
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
@@ -1136,15 +1132,11 @@ public class WatermarkManagerTest implements Serializable {
new Instant(1000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers =
+ Collection<FiredTimers> firstFiredTimers =
manager.extractFiredTimers();
- assertThat(
- firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
- firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
- assertThat(firstFilteredTimers.get(key), not(nullValue()));
- FiredTimers firstFired = firstFilteredTimers.get(key);
- assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer));
+ assertThat(firstFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+ FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers);
+ assertThat(firstFired.getTimers(), contains(earliestTimer));
manager.updateWatermarks(null,
TimerUpdate.empty(),
@@ -1153,24 +1145,18 @@ public class WatermarkManagerTest implements Serializable {
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers =
- manager.extractFiredTimers();
- assertThat(
- secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
- secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
- assertThat(secondFilteredTimers.get(key), not(nullValue()));
- FiredTimers secondFired = secondFilteredTimers.get(key);
+ Collection<FiredTimers> secondFiredTimers = manager.extractFiredTimers();
+ assertThat(secondFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+ FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers);
// Contains, in order, middleTimer and then lastTimer
- assertThat(secondFired.getTimers(TimeDomain.EVENT_TIME), contains(middleTimer, lastTimer));
+ assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer));
}
@Test
public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers =
- manager.extractFiredTimers();
+ Collection<FiredTimers> initialTimers = manager.extractFiredTimers();
// Watermarks haven't advanced
- assertThat(initialTimers.entrySet(), emptyIterable());
+ assertThat(initialTimers, emptyIterable());
// Advance WM of keyed past the first timer, but ahead of the second and third
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
@@ -1204,15 +1190,10 @@ public class WatermarkManagerTest implements Serializable {
new Instant(1000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers =
- manager.extractFiredTimers();
- assertThat(
- firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
- firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
- assertThat(firstFilteredTimers.get(key), not(nullValue()));
- FiredTimers firstFired = firstFilteredTimers.get(key);
- assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), contains(earliestTimer));
+ Collection<FiredTimers> firstFiredTimers = manager.extractFiredTimers();
+ assertThat(firstFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+ FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers);
+ assertThat(firstFired.getTimers(), contains(earliestTimer));
clock.set(new Instant(50_000L));
manager.updateWatermarks(null,
@@ -1222,24 +1203,19 @@ public class WatermarkManagerTest implements Serializable {
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers =
+ Collection<FiredTimers> secondFiredTimers =
manager.extractFiredTimers();
- assertThat(
- secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
- secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
- assertThat(secondFilteredTimers.get(key), not(nullValue()));
- FiredTimers secondFired = secondFilteredTimers.get(key);
+ assertThat(secondFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+ FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers);
// Contains, in order, middleTimer and then lastTimer
- assertThat(secondFired.getTimers(TimeDomain.PROCESSING_TIME), contains(middleTimer, lastTimer));
+ assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer));
}
@Test
public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers =
- manager.extractFiredTimers();
+ Collection<FiredTimers> initialTimers = manager.extractFiredTimers();
// Watermarks haven't advanced
- assertThat(initialTimers.entrySet(), emptyIterable());
+ assertThat(initialTimers, emptyIterable());
// Advance WM of keyed past the first timer, but ahead of the second and third
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
@@ -1273,16 +1249,11 @@ public class WatermarkManagerTest implements Serializable {
new Instant(1000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers =
+ Collection<FiredTimers> firstFiredTimers =
manager.extractFiredTimers();
- assertThat(
- firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
- firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
- assertThat(firstFilteredTimers.get(key), not(nullValue()));
- FiredTimers firstFired = firstFilteredTimers.get(key);
- assertThat(
- firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), contains(earliestTimer));
+ assertThat(firstFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+ FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers);
+ assertThat(firstFired.getTimers(), contains(earliestTimer));
clock.set(new Instant(50_000L));
manager.updateWatermarks(null,
@@ -1292,18 +1263,11 @@ public class WatermarkManagerTest implements Serializable {
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
manager.refreshAll();
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers =
- manager.extractFiredTimers();
- assertThat(
- secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
- Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
- secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
- assertThat(secondFilteredTimers.get(key), not(nullValue()));
- FiredTimers secondFired = secondFilteredTimers.get(key);
+ Collection<FiredTimers> secondFiredTimers = manager.extractFiredTimers();
+ assertThat(secondFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+ FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers);
// Contains, in order, middleTimer and then lastTimer
- assertThat(
- secondFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME),
- contains(middleTimer, lastTimer));
+ assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer));
}
@Test