You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/06 18:47:05 UTC

[2/4] incubator-beam git commit: Add DirectGraphs to DirectRunner Tests

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 076e0fb..eb4d0cd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -31,7 +31,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
@@ -63,7 +62,6 @@ import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -116,33 +114,8 @@ public class WatermarkManagerTest implements Serializable {
     PCollectionList<Integer> preFlatten = PCollectionList.of(createdInts).and(intsToFlatten);
     flattened = preFlatten.apply("flattened", Flatten.<Integer>pCollections());
 
-    Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
-        ImmutableList.<AppliedPTransform<?, ?, ?>>of(
-            createdInts.getProducingTransformInternal(),
-            intsToFlatten.getProducingTransformInternal());
-
-    Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers = new HashMap<>();
-    consumers.put(
-        createdInts,
-        ImmutableList.<AppliedPTransform<?, ?, ?>>of(filtered.getProducingTransformInternal(),
-            keyed.getProducingTransformInternal(), flattened.getProducingTransformInternal()));
-    consumers.put(
-        filtered,
-        Collections.<AppliedPTransform<?, ?, ?>>singleton(
-            filteredTimesTwo.getProducingTransformInternal()));
-    consumers.put(filteredTimesTwo, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-    consumers.put(keyed, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-
-    consumers.put(
-        intsToFlatten,
-        Collections.<AppliedPTransform<?, ?, ?>>singleton(
-            flattened.getProducingTransformInternal()));
-    consumers.put(flattened, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-
     clock = MockClock.fromInstant(new Instant(1000));
-    DirectGraphVisitor visitor = new DirectGraphVisitor();
-    p.traverseTopologically(visitor);
-    graph = visitor.getGraph();
+    graph = DirectGraphs.getGraph(p);
 
     manager = WatermarkManager.create(clock, graph);
     bundleFactory = ImmutableListBundleFactory.create();
@@ -155,7 +128,7 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void getWatermarkForUntouchedTransform() {
     TransformWatermarks watermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
 
     assertThat(watermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
     assertThat(watermarks.getOutputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
@@ -170,13 +143,13 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(output)),
         new Instant(8000L));
     manager.refreshAll();
     TransformWatermarks updatedSourceWatermark =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
 
     assertThat(updatedSourceWatermark.getOutputWatermark(), equalTo(new Instant(8000L)));
   }
@@ -191,7 +164,7 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(intsToFlatten.getProducingTransformInternal(),
+        result(graph.getProducer(intsToFlatten),
             null,
             Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -199,7 +172,7 @@ public class WatermarkManagerTest implements Serializable {
 
     // We didn't do anything for the first source, so we shouldn't have progressed the watermark
     TransformWatermarks firstSourceWatermark =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(
         firstSourceWatermark.getOutputWatermark(),
         not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
@@ -207,14 +180,14 @@ public class WatermarkManagerTest implements Serializable {
     // the Second Source output all of the elements so it should be done (with a watermark at the
     // end of time).
     TransformWatermarks secondSourceWatermark =
-        manager.getWatermarks(intsToFlatten.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(intsToFlatten));
     assertThat(
         secondSourceWatermark.getOutputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
 
     // We haven't consumed anything yet, so our watermark should be at the beginning of time
     TransformWatermarks transformWatermark =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(flattened));
     assertThat(
         transformWatermark.getInputWatermark(), not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
     assertThat(
@@ -225,15 +198,15 @@ public class WatermarkManagerTest implements Serializable {
     // anything from the first PCollection yet; so our watermark shouldn't advance
     manager.updateWatermarks(secondPcollectionBundle,
         TimerUpdate.empty(),
-        result(flattened.getProducingTransformInternal(),
+        result(graph.getProducer(flattened),
             secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks transformAfterProcessing =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(flattened));
     manager.updateWatermarks(secondPcollectionBundle,
         TimerUpdate.empty(),
-        result(flattened.getProducingTransformInternal(),
+        result(graph.getProducer(flattened),
             secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -252,13 +225,13 @@ public class WatermarkManagerTest implements Serializable {
     // past the end of the global window
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks firstSourceWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(
         firstSourceWatermarks.getOutputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -266,7 +239,7 @@ public class WatermarkManagerTest implements Serializable {
     // We still haven't consumed any of the first source's input, so the watermark should still not
     // progress
     TransformWatermarks flattenAfterSourcesProduced =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(flattened));
     assertThat(
         flattenAfterSourcesProduced.getInputWatermark(), not(laterThan(firstCollectionTimestamp)));
     assertThat(
@@ -276,7 +249,7 @@ public class WatermarkManagerTest implements Serializable {
     // end of the global window), we should have a watermark equal to the min among buffered
     // elements
     TransformWatermarks withBufferedElements =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(flattened));
     assertThat(withBufferedElements.getInputWatermark(), equalTo(firstCollectionTimestamp));
     assertThat(withBufferedElements.getOutputWatermark(), equalTo(firstCollectionTimestamp));
 
@@ -284,13 +257,13 @@ public class WatermarkManagerTest implements Serializable {
         bundleFactory.createBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(firstPcollectionBundle,
         TimerUpdate.empty(),
-        result(flattened.getProducingTransformInternal(),
+        result(graph.getProducer(flattened),
             firstPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks afterConsumingAllInput =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(flattened));
     assertThat(
         afterConsumingAllInput.getInputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -329,8 +302,8 @@ public class WatermarkManagerTest implements Serializable {
 
     Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> initialInputs =
         ImmutableMap.<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>>builder()
-            .put(
-                created.getProducingTransformInternal(),
+            .put(graph.getProducer(
+                created),
                 Collections.<CommittedBundle<?>>singleton(root))
             .build();
     tstMgr.initialize(initialInputs);
@@ -338,7 +311,7 @@ public class WatermarkManagerTest implements Serializable {
         root,
         TimerUpdate.empty(),
         CommittedResult.create(
-            StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(),
+            StepTransformResult.withoutHold(graph.getProducer(created)).build(),
             root.withElements(Collections.<WindowedValue<Void>>emptyList()),
             Collections.singleton(createBundle),
             EnumSet.allOf(OutputType.class)),
@@ -385,13 +358,13 @@ public class WatermarkManagerTest implements Serializable {
         TimestampedValue.of(3, new Instant(-1000L)));
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
     manager.refreshAll();
     TransformWatermarks createdAfterProducing =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(
         createdAfterProducing.getOutputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -402,20 +375,20 @@ public class WatermarkManagerTest implements Serializable {
             TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
-        result(keyed.getProducingTransformInternal(),
+        result(graph.getProducer(keyed),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(keyed));
     assertThat(
         keyedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
     assertThat(
         keyedWatermarks.getOutputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
 
     TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     assertThat(filteredWatermarks.getInputWatermark(), not(laterThan(new Instant(-1000L))));
     assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
 
@@ -423,13 +396,13 @@ public class WatermarkManagerTest implements Serializable {
         timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L)));
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks filteredProcessedWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     assertThat(
         filteredProcessedWatermarks.getInputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -450,7 +423,7 @@ public class WatermarkManagerTest implements Serializable {
         TimestampedValue.of(3, new Instant(-1000L)));
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
@@ -461,13 +434,13 @@ public class WatermarkManagerTest implements Serializable {
         TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
-        result(keyed.getProducingTransformInternal(),
+        result(graph.getProducer(keyed),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
     manager.refreshAll();
     TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(keyed));
     assertThat(
         keyedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
     assertThat(keyedWatermarks.getOutputWatermark(), not(laterThan(new Instant(500L))));
@@ -494,27 +467,27 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             ImmutableList.of(firstKeyBundle, secondKeyBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     manager.updateWatermarks(firstKeyBundle,
         TimerUpdate.empty(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             firstKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(-1000L));
     manager.updateWatermarks(secondKeyBundle,
         TimerUpdate.empty(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             secondKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(1234L));
     manager.refreshAll();
 
     TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     assertThat(filteredWatermarks.getInputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
     assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
@@ -524,7 +497,7 @@ public class WatermarkManagerTest implements Serializable {
         createdInts).commit(clock.now());
     manager.updateWatermarks(fauxFirstKeyTimerBundle,
         TimerUpdate.empty(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             fauxFirstKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -537,7 +510,7 @@ public class WatermarkManagerTest implements Serializable {
         createdInts).commit(clock.now());
     manager.updateWatermarks(fauxSecondKeyTimerBundle,
         TimerUpdate.empty(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(5678L));
@@ -546,7 +519,7 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(fauxSecondKeyTimerBundle,
         TimerUpdate.empty(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -564,26 +537,26 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<?> firstInput =
         bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(null,  TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(firstInput)),
         new Instant(0L));
     manager.refreshAll();
     TransformWatermarks firstWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
 
     CommittedBundle<?> secondInput =
         bundleFactory.createBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(secondInput)),
         new Instant(-250L));
     manager.refreshAll();
     TransformWatermarks secondWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L))));
   }
 
@@ -599,7 +572,7 @@ public class WatermarkManagerTest implements Serializable {
         TimestampedValue.of(3, new Instant(-1000L)));
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
@@ -610,20 +583,20 @@ public class WatermarkManagerTest implements Serializable {
             TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
-        result(keyed.getProducingTransformInternal(),
+        result(graph.getProducer(keyed),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
     manager.refreshAll();
     TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(keyed));
     assertThat(
         keyedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
     assertThat(keyedWatermarks.getOutputWatermark(), not(laterThan(new Instant(500L))));
     Instant oldOutputWatermark = keyedWatermarks.getOutputWatermark();
 
     TransformWatermarks updatedWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(keyed));
     assertThat(
         updatedWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
     // We added a hold prior to the old watermark; we shouldn't progress (due to the earlier hold)
@@ -646,7 +619,7 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -655,12 +628,12 @@ public class WatermarkManagerTest implements Serializable {
         TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE));
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
-        result(keyed.getProducingTransformInternal(),
+        result(graph.getProducer(keyed),
             createdBundle.withElements(ImmutableList.of(second, third)),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(keyed));
     // the unprocessed second and third are readded to pending
     assertThat(
         keyedWatermarks.getInputWatermark(), not(laterThan(new Instant(-1000L))));
@@ -681,7 +654,7 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -689,15 +662,15 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         neverCreatedBundle,
         TimerUpdate.empty(),
-        result(
-            filtered.getProducingTransformInternal(),
+        result(graph.getProducer(
+            filtered),
             neverCreatedBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     manager.refreshAll();
     TransformWatermarks filteredWms =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     assertThat(filteredWms.getInputWatermark(), equalTo(new Instant(22L)));
   }
 
@@ -712,7 +685,7 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         sourceWatermark);
@@ -724,13 +697,13 @@ public class WatermarkManagerTest implements Serializable {
     // Finish processing the on-time data. The watermarks should progress to be equal to the source
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
-        result(keyed.getProducingTransformInternal(),
+        result(graph.getProducer(keyed),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks onTimeWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(keyed));
     assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
     assertThat(onTimeWatermarks.getOutputWatermark(), equalTo(sourceWatermark));
 
@@ -740,19 +713,19 @@ public class WatermarkManagerTest implements Serializable {
     // we don't advance the watermark past the current watermark until we've consumed the late data
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
         new Instant(2_000_000L));
     manager.refreshAll();
     TransformWatermarks bufferedLateWm =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L)));
 
     // The input watermark should be held to its previous value (not advanced due to late data; not
     // moved backwards in the presence of watermarks due to monotonicity).
     TransformWatermarks lateDataBufferedWatermark =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(keyed));
     assertThat(lateDataBufferedWatermark.getInputWatermark(), not(earlierThan(sourceWatermark)));
     assertThat(lateDataBufferedWatermark.getOutputWatermark(), not(earlierThan(sourceWatermark)));
 
@@ -760,7 +733,7 @@ public class WatermarkManagerTest implements Serializable {
         timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
     manager.updateWatermarks(lateDataBundle,
         TimerUpdate.empty(),
-        result(keyed.getProducingTransformInternal(),
+        result(graph.getProducer(keyed),
             lateDataBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -770,7 +743,7 @@ public class WatermarkManagerTest implements Serializable {
   public void updateWatermarkWithDifferentWindowedValueInstances() {
     manager.updateWatermarks(
         null,
-        TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), null,
+        TimerUpdate.empty(), result(graph.getProducer(createdInts), null,
         Collections.<CommittedBundle<?>>singleton(
             bundleFactory
                 .createBundle(createdInts)
@@ -783,13 +756,13 @@ public class WatermarkManagerTest implements Serializable {
         .commit(Instant.now());
     manager.updateWatermarks(createdBundle,
         TimerUpdate.empty(),
-        result(keyed.getProducingTransformInternal(),
+        result(graph.getProducer(keyed),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         null);
     manager.refreshAll();
     TransformWatermarks onTimeWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(keyed));
     assertThat(onTimeWatermarks.getInputWatermark(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
@@ -802,20 +775,20 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks updatedSourceWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
 
     assertThat(
         updatedSourceWatermarks.getOutputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
 
     TransformWatermarks finishedFilterWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     assertThat(
         finishedFilterWatermarks.getInputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -833,7 +806,7 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> firstCreateOutput = multiWindowedBundle(createdInts, 1, 2);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(firstCreateOutput)),
         new Instant(12_000L));
@@ -841,33 +814,33 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered);
     manager.updateWatermarks(firstCreateOutput,
         TimerUpdate.empty(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             firstCreateOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
         new Instant(10_000L));
     manager.refreshAll();
     TransformWatermarks firstFilterWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     assertThat(firstFilterWatermarks.getInputWatermark(), not(earlierThan(new Instant(12_000L))));
     assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new Instant(10_000L))));
 
     CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks updatedSourceWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
 
     assertThat(
         updatedSourceWatermarks.getOutputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
 
     TransformWatermarks finishedFilterWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     assertThat(
         finishedFilterWatermarks.getInputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
@@ -877,14 +850,14 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
     TransformWatermarks watermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
     assertThat(
         watermarks.getSynchronizedProcessingOutputTime(),
         equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
 
     TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     // Non-root processing watermarks don't progress until data has been processed
     assertThat(
         filteredWatermarks.getSynchronizedProcessingInputTime(),
@@ -898,18 +871,18 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks createAfterUpdate =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
     assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
 
     TransformWatermarks filterAfterProduced =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     assertThat(
         filterAfterProduced.getSynchronizedProcessingInputTime(), not(laterThan(clock.now())));
     assertThat(
@@ -929,13 +902,13 @@ public class WatermarkManagerTest implements Serializable {
         bundleFactory.createBundle(intsToFlatten).commit(new Instant(1250L));
     manager.updateWatermarks(createOutput,
         TimerUpdate.empty(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             createOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks filterAfterConsumed =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     assertThat(
         filterAfterConsumed.getSynchronizedProcessingInputTime(),
         not(laterThan(createAfterUpdate.getSynchronizedProcessingOutputTime())));
@@ -955,16 +928,16 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(1248L));
     manager.refreshAll();
 
     TransformWatermarks filteredWms =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     TransformWatermarks filteredDoubledWms =
-        manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filteredTimesTwo));
     Instant initialFilteredWm = filteredWms.getSynchronizedProcessingOutputTime();
     Instant initialFilteredDoubledWm = filteredDoubledWms.getSynchronizedProcessingOutputTime();
 
@@ -978,7 +951,7 @@ public class WatermarkManagerTest implements Serializable {
         TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build();
     manager.updateWatermarks(createdBundle,
         timers,
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1014,7 +987,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(filteredTimerBundle,
         TimerUpdate.builder(key)
             .withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1029,7 +1002,7 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(filteredTimerResult,
         TimerUpdate.empty(),
-        result(filteredTimesTwo.getProducingTransformInternal(),
+        result(graph.getProducer(filteredTimesTwo),
             filteredTimerResult.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1050,11 +1023,11 @@ public class WatermarkManagerTest implements Serializable {
   public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
     Instant startTime = clock.now();
     TransformWatermarks watermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(watermarks.getSynchronizedProcessingInputTime(), equalTo(startTime));
 
     TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filtered));
     // Non-root processing watermarks don't progress until data has been processed
     assertThat(
         filteredWatermarks.getSynchronizedProcessingInputTime(),
@@ -1068,13 +1041,13 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
     TransformWatermarks createAfterUpdate =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(createdInts));
     assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now())));
     assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(),
         not(laterThan(clock.now())));
@@ -1083,7 +1056,7 @@ public class WatermarkManagerTest implements Serializable {
         bundleFactory.createBundle(createdInts).commit(new Instant(750L));
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1097,7 +1070,7 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(created)),
         new Instant(40_900L));
@@ -1111,14 +1084,14 @@ public class WatermarkManagerTest implements Serializable {
         TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
             .setTimer(upstreamProcessingTimer)
             .build(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
     TransformWatermarks downstreamWms =
-        manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filteredTimesTwo));
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
 
     clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1133,7 +1106,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(otherCreated,
         TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
             .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1148,7 +1121,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>singleton(created)),
         new Instant(29_919_235L));
@@ -1160,14 +1133,14 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         created,
         TimerUpdate.empty(),
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
     manager.refreshAll();
 
     TransformWatermarks downstreamWms =
-        manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
+        manager.getWatermarks(graph.getProducer(filteredTimesTwo));
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(clock.now()));
 
     clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1184,7 +1157,7 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.singleton(createdBundle)),
         new Instant(1500L));
@@ -1206,7 +1179,7 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(createdBundle,
         update,
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
@@ -1220,7 +1193,7 @@ public class WatermarkManagerTest implements Serializable {
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
@@ -1242,7 +1215,7 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.singleton(createdBundle)),
         new Instant(1500L));
@@ -1264,7 +1237,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         createdBundle,
         update,
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
@@ -1278,7 +1251,7 @@ public class WatermarkManagerTest implements Serializable {
     clock.set(new Instant(50_000L));
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
@@ -1301,7 +1274,7 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.singleton(createdBundle)),
         new Instant(1500L));
@@ -1323,7 +1296,7 @@ public class WatermarkManagerTest implements Serializable {
     manager.updateWatermarks(
         createdBundle,
         update,
-        result(filtered.getProducingTransformInternal(),
+        result(graph.getProducer(filtered),
             createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
@@ -1338,7 +1311,7 @@ public class WatermarkManagerTest implements Serializable {
     clock.set(new Instant(50_000L));
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
-        result(createdInts.getProducingTransformInternal(),
+        result(graph.getProducer(createdInts),
             null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));