You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/06 18:47:05 UTC
[2/4] incubator-beam git commit: Add DirectGraphs to DirectRunner
Tests
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 076e0fb..eb4d0cd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -31,7 +31,6 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
@@ -63,7 +62,6 @@ import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
@@ -116,33 +114,8 @@ public class WatermarkManagerTest implements Serializable {
PCollectionList<Integer> preFlatten = PCollectionList.of(createdInts).and(intsToFlatten);
flattened = preFlatten.apply("flattened", Flatten.<Integer>pCollections());
- Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
- ImmutableList.<AppliedPTransform<?, ?, ?>>of(
- createdInts.getProducingTransformInternal(),
- intsToFlatten.getProducingTransformInternal());
-
- Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers = new HashMap<>();
- consumers.put(
- createdInts,
- ImmutableList.<AppliedPTransform<?, ?, ?>>of(filtered.getProducingTransformInternal(),
- keyed.getProducingTransformInternal(), flattened.getProducingTransformInternal()));
- consumers.put(
- filtered,
- Collections.<AppliedPTransform<?, ?, ?>>singleton(
- filteredTimesTwo.getProducingTransformInternal()));
- consumers.put(filteredTimesTwo, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
- consumers.put(keyed, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-
- consumers.put(
- intsToFlatten,
- Collections.<AppliedPTransform<?, ?, ?>>singleton(
- flattened.getProducingTransformInternal()));
- consumers.put(flattened, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-
clock = MockClock.fromInstant(new Instant(1000));
- DirectGraphVisitor visitor = new DirectGraphVisitor();
- p.traverseTopologically(visitor);
- graph = visitor.getGraph();
+ graph = DirectGraphs.getGraph(p);
manager = WatermarkManager.create(clock, graph);
bundleFactory = ImmutableListBundleFactory.create();
@@ -155,7 +128,7 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void getWatermarkForUntouchedTransform() {
TransformWatermarks watermarks =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(watermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
assertThat(watermarks.getOutputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
@@ -170,13 +143,13 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(output)),
new Instant(8000L));
manager.refreshAll();
TransformWatermarks updatedSourceWatermark =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(updatedSourceWatermark.getOutputWatermark(), equalTo(new Instant(8000L)));
}
@@ -191,7 +164,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(intsToFlatten.getProducingTransformInternal(),
+ result(graph.getProducer(intsToFlatten),
null,
Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -199,7 +172,7 @@ public class WatermarkManagerTest implements Serializable {
// We didn't do anything for the first source, so we shouldn't have progressed the watermark
TransformWatermarks firstSourceWatermark =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(
firstSourceWatermark.getOutputWatermark(),
not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
@@ -207,14 +180,14 @@ public class WatermarkManagerTest implements Serializable {
// the Second Source output all of the elements so it should be done (with a watermark at the
// end of time).
TransformWatermarks secondSourceWatermark =
- manager.getWatermarks(intsToFlatten.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(intsToFlatten));
assertThat(
secondSourceWatermark.getOutputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
// We haven't consumed anything yet, so our watermark should be at the beginning of time
TransformWatermarks transformWatermark =
- manager.getWatermarks(flattened.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(flattened));
assertThat(
transformWatermark.getInputWatermark(), not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
assertThat(
@@ -225,15 +198,15 @@ public class WatermarkManagerTest implements Serializable {
// anything from the first PCollection yet; so our watermark shouldn't advance
manager.updateWatermarks(secondPcollectionBundle,
TimerUpdate.empty(),
- result(flattened.getProducingTransformInternal(),
+ result(graph.getProducer(flattened),
secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks transformAfterProcessing =
- manager.getWatermarks(flattened.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(flattened));
manager.updateWatermarks(secondPcollectionBundle,
TimerUpdate.empty(),
- result(flattened.getProducingTransformInternal(),
+ result(graph.getProducer(flattened),
secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -252,13 +225,13 @@ public class WatermarkManagerTest implements Serializable {
// past the end of the global window
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks firstSourceWatermarks =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(
firstSourceWatermarks.getOutputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -266,7 +239,7 @@ public class WatermarkManagerTest implements Serializable {
// We still haven't consumed any of the first source's input, so the watermark should still not
// progress
TransformWatermarks flattenAfterSourcesProduced =
- manager.getWatermarks(flattened.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(flattened));
assertThat(
flattenAfterSourcesProduced.getInputWatermark(), not(laterThan(firstCollectionTimestamp)));
assertThat(
@@ -276,7 +249,7 @@ public class WatermarkManagerTest implements Serializable {
// end of the global window), we should have a watermark equal to the min among buffered
// elements
TransformWatermarks withBufferedElements =
- manager.getWatermarks(flattened.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(flattened));
assertThat(withBufferedElements.getInputWatermark(), equalTo(firstCollectionTimestamp));
assertThat(withBufferedElements.getOutputWatermark(), equalTo(firstCollectionTimestamp));
@@ -284,13 +257,13 @@ public class WatermarkManagerTest implements Serializable {
bundleFactory.createBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(firstPcollectionBundle,
TimerUpdate.empty(),
- result(flattened.getProducingTransformInternal(),
+ result(graph.getProducer(flattened),
firstPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks afterConsumingAllInput =
- manager.getWatermarks(flattened.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(flattened));
assertThat(
afterConsumingAllInput.getInputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -329,8 +302,8 @@ public class WatermarkManagerTest implements Serializable {
Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> initialInputs =
ImmutableMap.<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>>builder()
- .put(
- created.getProducingTransformInternal(),
+ .put(graph.getProducer(
+ created),
Collections.<CommittedBundle<?>>singleton(root))
.build();
tstMgr.initialize(initialInputs);
@@ -338,7 +311,7 @@ public class WatermarkManagerTest implements Serializable {
root,
TimerUpdate.empty(),
CommittedResult.create(
- StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(),
+ StepTransformResult.withoutHold(graph.getProducer(created)).build(),
root.withElements(Collections.<WindowedValue<Void>>emptyList()),
Collections.singleton(createBundle),
EnumSet.allOf(OutputType.class)),
@@ -385,13 +358,13 @@ public class WatermarkManagerTest implements Serializable {
TimestampedValue.of(3, new Instant(-1000L)));
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
new Instant(Long.MAX_VALUE));
manager.refreshAll();
TransformWatermarks createdAfterProducing =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(
createdAfterProducing.getOutputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -402,20 +375,20 @@ public class WatermarkManagerTest implements Serializable {
TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
- result(keyed.getProducingTransformInternal(),
+ result(graph.getProducer(keyed),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks keyedWatermarks =
- manager.getWatermarks(keyed.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(keyed));
assertThat(
keyedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
assertThat(
keyedWatermarks.getOutputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
TransformWatermarks filteredWatermarks =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
assertThat(filteredWatermarks.getInputWatermark(), not(laterThan(new Instant(-1000L))));
assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
@@ -423,13 +396,13 @@ public class WatermarkManagerTest implements Serializable {
timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L)));
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks filteredProcessedWatermarks =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
assertThat(
filteredProcessedWatermarks.getInputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -450,7 +423,7 @@ public class WatermarkManagerTest implements Serializable {
TimestampedValue.of(3, new Instant(-1000L)));
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
new Instant(Long.MAX_VALUE));
@@ -461,13 +434,13 @@ public class WatermarkManagerTest implements Serializable {
TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
- result(keyed.getProducingTransformInternal(),
+ result(graph.getProducer(keyed),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle)),
new Instant(500L));
manager.refreshAll();
TransformWatermarks keyedWatermarks =
- manager.getWatermarks(keyed.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(keyed));
assertThat(
keyedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
assertThat(keyedWatermarks.getOutputWatermark(), not(laterThan(new Instant(500L))));
@@ -494,27 +467,27 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
ImmutableList.of(firstKeyBundle, secondKeyBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(firstKeyBundle,
TimerUpdate.empty(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
firstKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
new Instant(-1000L));
manager.updateWatermarks(secondKeyBundle,
TimerUpdate.empty(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
secondKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
new Instant(1234L));
manager.refreshAll();
TransformWatermarks filteredWatermarks =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
assertThat(filteredWatermarks.getInputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
@@ -524,7 +497,7 @@ public class WatermarkManagerTest implements Serializable {
createdInts).commit(clock.now());
manager.updateWatermarks(fauxFirstKeyTimerBundle,
TimerUpdate.empty(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
fauxFirstKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -537,7 +510,7 @@ public class WatermarkManagerTest implements Serializable {
createdInts).commit(clock.now());
manager.updateWatermarks(fauxSecondKeyTimerBundle,
TimerUpdate.empty(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
new Instant(5678L));
@@ -546,7 +519,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(fauxSecondKeyTimerBundle,
TimerUpdate.empty(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -564,26 +537,26 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<?> firstInput =
bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(null, TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(firstInput)),
new Instant(0L));
manager.refreshAll();
TransformWatermarks firstWatermarks =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
CommittedBundle<?> secondInput =
bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(secondInput)),
new Instant(-250L));
manager.refreshAll();
TransformWatermarks secondWatermarks =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L))));
}
@@ -599,7 +572,7 @@ public class WatermarkManagerTest implements Serializable {
TimestampedValue.of(3, new Instant(-1000L)));
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
new Instant(Long.MAX_VALUE));
@@ -610,20 +583,20 @@ public class WatermarkManagerTest implements Serializable {
TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
- result(keyed.getProducingTransformInternal(),
+ result(graph.getProducer(keyed),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle)),
new Instant(500L));
manager.refreshAll();
TransformWatermarks keyedWatermarks =
- manager.getWatermarks(keyed.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(keyed));
assertThat(
keyedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
assertThat(keyedWatermarks.getOutputWatermark(), not(laterThan(new Instant(500L))));
Instant oldOutputWatermark = keyedWatermarks.getOutputWatermark();
TransformWatermarks updatedWatermarks =
- manager.getWatermarks(keyed.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(keyed));
assertThat(
updatedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
// We added a hold prior to the old watermark; we shouldn't progress (due to the earlier hold)
@@ -646,7 +619,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -655,12 +628,12 @@ public class WatermarkManagerTest implements Serializable {
TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE));
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
- result(keyed.getProducingTransformInternal(),
+ result(graph.getProducer(keyed),
createdBundle.withElements(ImmutableList.of(second, third)),
Collections.<CommittedBundle<?>>singleton(keyBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks keyedWatermarks =
- manager.getWatermarks(keyed.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(keyed));
// the unprocessed second and third are readded to pending
assertThat(
keyedWatermarks.getInputWatermark(), not(laterThan(new Instant(-1000L))));
@@ -681,7 +654,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -689,15 +662,15 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
neverCreatedBundle,
TimerUpdate.empty(),
- result(
- filtered.getProducingTransformInternal(),
+ result(graph.getProducer(
+ filtered),
neverCreatedBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks filteredWms =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
assertThat(filteredWms.getInputWatermark(), equalTo(new Instant(22L)));
}
@@ -712,7 +685,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
sourceWatermark);
@@ -724,13 +697,13 @@ public class WatermarkManagerTest implements Serializable {
// Finish processing the on-time data. The watermarks should progress to be equal to the source
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
- result(keyed.getProducingTransformInternal(),
+ result(graph.getProducer(keyed),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks onTimeWatermarks =
- manager.getWatermarks(keyed.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(keyed));
assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
assertThat(onTimeWatermarks.getOutputWatermark(), equalTo(sourceWatermark));
@@ -740,19 +713,19 @@ public class WatermarkManagerTest implements Serializable {
// we don't advance the watermark past the current watermark until we've consumed the late data
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
new Instant(2_000_000L));
manager.refreshAll();
TransformWatermarks bufferedLateWm =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L)));
// The input watermark should be held to its previous value (not advanced due to late data; not
// moved backwards in the presence of watermarks due to monotonicity).
TransformWatermarks lateDataBufferedWatermark =
- manager.getWatermarks(keyed.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(keyed));
assertThat(lateDataBufferedWatermark.getInputWatermark(), not(earlierThan(sourceWatermark)));
assertThat(lateDataBufferedWatermark.getOutputWatermark(), not(earlierThan(sourceWatermark)));
@@ -760,7 +733,7 @@ public class WatermarkManagerTest implements Serializable {
timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
manager.updateWatermarks(lateDataBundle,
TimerUpdate.empty(),
- result(keyed.getProducingTransformInternal(),
+ result(graph.getProducer(keyed),
lateDataBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -770,7 +743,7 @@ public class WatermarkManagerTest implements Serializable {
public void updateWatermarkWithDifferentWindowedValueInstances() {
manager.updateWatermarks(
null,
- TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), null,
+ TimerUpdate.empty(), result(graph.getProducer(createdInts), null,
Collections.<CommittedBundle<?>>singleton(
bundleFactory
.createBundle(createdInts)
@@ -783,13 +756,13 @@ public class WatermarkManagerTest implements Serializable {
.commit(Instant.now());
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
- result(keyed.getProducingTransformInternal(),
+ result(graph.getProducer(keyed),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
null);
manager.refreshAll();
TransformWatermarks onTimeWatermarks =
- manager.getWatermarks(keyed.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(keyed));
assertThat(onTimeWatermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
}
@@ -802,20 +775,20 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks updatedSourceWatermarks =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(
updatedSourceWatermarks.getOutputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
TransformWatermarks finishedFilterWatermarks =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
assertThat(
finishedFilterWatermarks.getInputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -833,7 +806,7 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> firstCreateOutput = multiWindowedBundle(createdInts, 1, 2);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(firstCreateOutput)),
new Instant(12_000L));
@@ -841,33 +814,33 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered);
manager.updateWatermarks(firstCreateOutput,
TimerUpdate.empty(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
firstCreateOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
new Instant(10_000L));
manager.refreshAll();
TransformWatermarks firstFilterWatermarks =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
assertThat(firstFilterWatermarks.getInputWatermark(), not(earlierThan(new Instant(12_000L))));
assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new Instant(10_000L))));
CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks updatedSourceWatermarks =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(
updatedSourceWatermarks.getOutputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
TransformWatermarks finishedFilterWatermarks =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
assertThat(
finishedFilterWatermarks.getInputWatermark(),
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -877,14 +850,14 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
TransformWatermarks watermarks =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
assertThat(
watermarks.getSynchronizedProcessingOutputTime(),
equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
TransformWatermarks filteredWatermarks =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
// Non-root processing watermarks don't progress until data has been processed
assertThat(
filteredWatermarks.getSynchronizedProcessingInputTime(),
@@ -898,18 +871,18 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks createAfterUpdate =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
TransformWatermarks filterAfterProduced =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
assertThat(
filterAfterProduced.getSynchronizedProcessingInputTime(), not(laterThan(clock.now())));
assertThat(
@@ -929,13 +902,13 @@ public class WatermarkManagerTest implements Serializable {
bundleFactory.createBundle(intsToFlatten).commit(new Instant(1250L));
manager.updateWatermarks(createOutput,
TimerUpdate.empty(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
createOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks filterAfterConsumed =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
assertThat(
filterAfterConsumed.getSynchronizedProcessingInputTime(),
not(laterThan(createAfterUpdate.getSynchronizedProcessingOutputTime())));
@@ -955,16 +928,16 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
new Instant(1248L));
manager.refreshAll();
TransformWatermarks filteredWms =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
TransformWatermarks filteredDoubledWms =
- manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filteredTimesTwo));
Instant initialFilteredWm = filteredWms.getSynchronizedProcessingOutputTime();
Instant initialFilteredDoubledWm = filteredDoubledWms.getSynchronizedProcessingOutputTime();
@@ -978,7 +951,7 @@ public class WatermarkManagerTest implements Serializable {
TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build();
manager.updateWatermarks(createdBundle,
timers,
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1014,7 +987,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(filteredTimerBundle,
TimerUpdate.builder(key)
.withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1029,7 +1002,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(filteredTimerResult,
TimerUpdate.empty(),
- result(filteredTimesTwo.getProducingTransformInternal(),
+ result(graph.getProducer(filteredTimesTwo),
filteredTimerResult.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1050,11 +1023,11 @@ public class WatermarkManagerTest implements Serializable {
public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
Instant startTime = clock.now();
TransformWatermarks watermarks =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(startTime));
TransformWatermarks filteredWatermarks =
- manager.getWatermarks(filtered.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filtered));
// Non-root processing watermarks don't progress until data has been processed
assertThat(
filteredWatermarks.getSynchronizedProcessingInputTime(),
@@ -1068,13 +1041,13 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks createAfterUpdate =
- manager.getWatermarks(createdInts.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(createdInts));
assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now())));
assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(),
not(laterThan(clock.now())));
@@ -1083,7 +1056,7 @@ public class WatermarkManagerTest implements Serializable {
bundleFactory.createBundle(createdInts).commit(new Instant(750L));
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1097,7 +1070,7 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(created)),
new Instant(40_900L));
@@ -1111,14 +1084,14 @@ public class WatermarkManagerTest implements Serializable {
TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
.setTimer(upstreamProcessingTimer)
.build(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks downstreamWms =
- manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filteredTimesTwo));
assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1133,7 +1106,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(otherCreated,
TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
.withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1148,7 +1121,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>singleton(created)),
new Instant(29_919_235L));
@@ -1160,14 +1133,14 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
created,
TimerUpdate.empty(),
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.refreshAll();
TransformWatermarks downstreamWms =
- manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
+ manager.getWatermarks(graph.getProducer(filteredTimesTwo));
assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1184,7 +1157,7 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.singleton(createdBundle)),
new Instant(1500L));
@@ -1206,7 +1179,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(createdBundle,
update,
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
new Instant(1000L));
@@ -1220,7 +1193,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
@@ -1242,7 +1215,7 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.singleton(createdBundle)),
new Instant(1500L));
@@ -1264,7 +1237,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
createdBundle,
update,
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
new Instant(1000L));
@@ -1278,7 +1251,7 @@ public class WatermarkManagerTest implements Serializable {
clock.set(new Instant(50_000L));
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
@@ -1301,7 +1274,7 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.singleton(createdBundle)),
new Instant(1500L));
@@ -1323,7 +1296,7 @@ public class WatermarkManagerTest implements Serializable {
manager.updateWatermarks(
createdBundle,
update,
- result(filtered.getProducingTransformInternal(),
+ result(graph.getProducer(filtered),
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
new Instant(1000L));
@@ -1338,7 +1311,7 @@ public class WatermarkManagerTest implements Serializable {
clock.set(new Instant(50_000L));
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ result(graph.getProducer(createdInts),
null,
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));