You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/02 22:02:43 UTC

[1/3] incubator-beam git commit: Stop using Maps of Transforms in the DirectRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master 8cb2689f8 -> 1abbb9007


Stop using Maps of Transforms in the DirectRunner

Instead, add a "DirectGraph" class, which adds a layer of indirection to
all lookup methods.

Remove all remaining uses of getProducingTransformInternal, and instead
use DirectGraph methods to obtain the producing transform.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8162cd29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8162cd29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8162cd29

Branch: refs/heads/master
Commit: 8162cd29d97ef307b6fac588f453e4e39d70fca7
Parents: 8cb2689
Author: Thomas Groh <tg...@google.com>
Authored: Thu Dec 1 15:39:30 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Dec 2 14:02:24 2016 -0800

----------------------------------------------------------------------
 .../direct/ConsumerTrackingPipelineVisitor.java | 108 +++++++------------
 .../apache/beam/runners/direct/DirectGraph.java |  89 +++++++++++++++
 .../beam/runners/direct/DirectRunner.java       |  31 +++---
 .../beam/runners/direct/EvaluationContext.java  |  76 ++++---------
 .../direct/ExecutorServiceParallelExecutor.java |  15 +--
 .../ImmutabilityCheckingBundleFactory.java      |  21 ++--
 .../beam/runners/direct/WatermarkManager.java   |  50 ++++-----
 .../ConsumerTrackingPipelineVisitorTest.java    |  98 +++++------------
 .../runners/direct/EvaluationContextTest.java   |  25 ++---
 .../ImmutabilityCheckingBundleFactoryTest.java  |   6 +-
 .../runners/direct/WatermarkManagerTest.java    |  23 ++--
 11 files changed, 252 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
index acfad16..b9e77c5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 /**
@@ -41,9 +42,13 @@ import org.apache.beam.sdk.values.PValue;
  * input after the upstream transform has produced and committed output.
  */
 public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults {
-  private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
-  private Collection<AppliedPTransform<?, ?, ?>> rootTransforms = new ArrayList<>();
-  private Collection<PCollectionView<?>> views = new ArrayList<>();
+  private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
+
+  private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers =
+      ArrayListMultimap.create();
+
+  private Set<PCollectionView<?>> views = new HashSet<>();
+  private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>();
   private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
   private Set<PValue> toFinalize = new HashSet<>();
   private int numTransforms = 0;
@@ -81,81 +86,38 @@ public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults {
       rootTransforms.add(appliedTransform);
     } else {
       for (PValue value : node.getInput().expand()) {
-        valueToConsumers.get(value).add(appliedTransform);
+        primitiveConsumers.put(value, appliedTransform);
       }
     }
   }
 
-  private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) {
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform());
-    return application;
-  }
-
-  @Override
+ @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {
     toFinalize.add(value);
+
+    AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
+    if (!producers.containsKey(value)) {
+      producers.put(value, appliedTransform);
+    }
     for (PValue expandedValue : value.expand()) {
-      valueToConsumers.put(expandedValue, new ArrayList<AppliedPTransform<?, ?, ?>>());
       if (expandedValue instanceof PCollectionView) {
         views.add((PCollectionView<?>) expandedValue);
       }
-      expandedValue.recordAsOutput(getAppliedTransform(producer));
+      if (!producers.containsKey(expandedValue)) {
+        producers.put(value, appliedTransform);
+      }
     }
-    value.recordAsOutput(getAppliedTransform(producer));
-  }
-
-  private String genStepName() {
-    return String.format("s%s", numTransforms++);
-  }
-
-
-  /**
-   * Returns a mapping of each fully-expanded {@link PValue} to each
-   * {@link AppliedPTransform} that consumes it. For each AppliedPTransform in the collection
-   * returned from {@code getValueToCustomers().get(PValue)},
-   * {@code AppliedPTransform#getInput().expand()} will contain the argument {@link PValue}.
-   */
-  public Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> getValueToConsumers() {
-    checkState(
-        finalized,
-        "Can't call getValueToConsumers before the Pipeline has been completely traversed");
-
-    return valueToConsumers;
   }
 
-  /**
-   * Returns the mapping for each {@link AppliedPTransform} in the {@link Pipeline} to a unique step
-   * name.
-   */
-  public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
-    checkState(
-        finalized, "Can't call getStepNames before the Pipeline has been completely traversed");
-
-    return stepNames;
-  }
-
-  /**
-   * Returns the root transforms of the {@link Pipeline}. A root {@link AppliedPTransform} consumes
-   * a {@link PInput} where the {@link PInput#expand()} returns an empty collection.
-   */
-  public Collection<AppliedPTransform<?, ?, ?>> getRootTransforms() {
-    checkState(
-        finalized,
-        "Can't call getRootTransforms before the Pipeline has been completely traversed");
-
-    return rootTransforms;
+  private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) {
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
+        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform());
+    return application;
   }
 
-  /**
-   * Returns all of the {@link PCollectionView PCollectionViews} contained in the visited
-   * {@link Pipeline}.
-   */
-  public Collection<PCollectionView<?>> getViews() {
-    checkState(finalized, "Can't call getViews before the Pipeline has been completely traversed");
-
-    return views;
+  private String genStepName() {
+    return String.format("s%s", numTransforms++);
   }
 
   /**
@@ -163,11 +125,21 @@ public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults {
    * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the
    * {@link Pipeline} is executed.
    */
-  public Set<PValue> getUnfinalizedPValues() {
+  public void finishSpecifyingRemainder() {
     checkState(
         finalized,
-        "Can't call getUnfinalizedPValues before the Pipeline has been completely traversed");
+        "Can't call finishSpecifyingRemainder before the Pipeline has been completely traversed");
+    for (PValue unfinalized : toFinalize) {
+      unfinalized.finishSpecifying();
+    }
+  }
 
-    return toFinalize;
+  /**
+   * Get the graph constructed by this {@link ConsumerTrackingPipelineVisitor}, which provides
+   * lookups for producers and consumers of {@link PValue PValues}.
+   */
+  public DirectGraph getGraph() {
+    checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed");
+    return DirectGraph.create(producers, primitiveConsumers, views, rootTransforms, stepNames);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
new file mode 100644
index 0000000..f208f6e
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import com.google.common.collect.ListMultimap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Methods for interacting with the underlying structure of a {@link Pipeline} that is being
+ * executed with the {@link DirectRunner}.
+ */
+class DirectGraph {
+  private final Map<POutput, AppliedPTransform<?, ?, ?>> producers;
+  private final ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers;
+  private final Set<PCollectionView<?>> views;
+
+  private final Set<AppliedPTransform<?, ?, ?>> rootTransforms;
+  private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
+
+  public static DirectGraph create(
+      Map<POutput, AppliedPTransform<?, ?, ?>> producers,
+      ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers,
+      Set<PCollectionView<?>> views,
+      Set<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
+    return new DirectGraph(producers, primitiveConsumers, views, rootTransforms, stepNames);
+  }
+
+  private DirectGraph(
+      Map<POutput, AppliedPTransform<?, ?, ?>> producers,
+      ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers,
+      Set<PCollectionView<?>> views,
+      Set<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
+    this.producers = producers;
+    this.primitiveConsumers = primitiveConsumers;
+    this.views = views;
+    this.rootTransforms = rootTransforms;
+    this.stepNames = stepNames;
+  }
+
+  public AppliedPTransform<?, ?, ?> getProducer(PValue produced) {
+    return producers.get(produced);
+  }
+
+  public List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) {
+    return primitiveConsumers.get(consumed);
+  }
+
+  public Set<AppliedPTransform<?, ?, ?>> getRootTransforms() {
+    return rootTransforms;
+  }
+
+  public Set<PCollectionView<?>> getViews() {
+    return views;
+  }
+
+  public String getStepName(AppliedPTransform<?, ?, ?> step) {
+    return stepNames.get(step);
+  }
+
+  public Collection<AppliedPTransform<?, ?, ?>> getPrimitiveTransforms() {
+    return stepNames.keySet();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 82de9ab..0ad5836 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -62,7 +62,6 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -198,18 +197,18 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   enum Enforcement {
     ENCODABILITY {
       @Override
-      public boolean appliesTo(PTransform<?, ?> transform) {
+      public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
         return true;
       }
     },
     IMMUTABILITY {
       @Override
-      public boolean appliesTo(PTransform<?, ?> transform) {
-        return CONTAINS_UDF.contains(transform.getClass());
+      public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
+        return CONTAINS_UDF.contains(graph.getProducer(collection).getTransform().getClass());
       }
     };
 
-    public abstract boolean appliesTo(PTransform<?, ?> transform);
+    public abstract boolean appliesTo(PCollection<?> collection, DirectGraph graph);
 
     ////////////////////////////////////////////////////////////////////////////////////////////////
     // Utilities for creating enforcements
@@ -224,13 +223,13 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
       return Collections.unmodifiableSet(enabled);
     }
 
-    public static BundleFactory bundleFactoryFor(Set<Enforcement> enforcements) {
+    public static BundleFactory bundleFactoryFor(Set<Enforcement> enforcements, DirectGraph graph) {
       BundleFactory bundleFactory =
           enforcements.contains(Enforcement.ENCODABILITY)
               ? CloningBundleFactory.create()
               : ImmutableListBundleFactory.create();
       if (enforcements.contains(Enforcement.IMMUTABILITY)) {
-        bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
+        bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory, graph);
       }
       return bundleFactory;
     }
@@ -301,9 +300,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
     MetricsEnvironment.setMetricsSupported(true);
     ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
     pipeline.traverseTopologically(consumerTrackingVisitor);
-    for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
-      unfinalized.finishSpecifying();
-    }
+    consumerTrackingVisitor.finishSpecifyingRemainder();
+
     @SuppressWarnings("rawtypes")
     KeyedPValueTrackingVisitor keyedPValueVisitor =
         KeyedPValueTrackingVisitor.create(
@@ -315,28 +313,25 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
 
     DisplayDataValidator.validatePipeline(pipeline);
 
+    DirectGraph graph = consumerTrackingVisitor.getGraph();
     EvaluationContext context =
         EvaluationContext.create(
             getPipelineOptions(),
             clockSupplier.get(),
-            Enforcement.bundleFactoryFor(enabledEnforcements),
-            consumerTrackingVisitor.getRootTransforms(),
-            consumerTrackingVisitor.getValueToConsumers(),
-            consumerTrackingVisitor.getStepNames(),
-            consumerTrackingVisitor.getViews());
+            Enforcement.bundleFactoryFor(enabledEnforcements, graph),
+            graph);
 
     RootProviderRegistry rootInputProvider = RootProviderRegistry.defaultRegistry(context);
     TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context);
     PipelineExecutor executor =
         ExecutorServiceParallelExecutor.create(
-            options.getTargetParallelism(),
-            consumerTrackingVisitor.getValueToConsumers(),
+            options.getTargetParallelism(), graph,
             keyedPValueVisitor.getKeyedPValues(),
             rootInputProvider,
             registry,
             Enforcement.defaultModelEnforcements(enabledEnforcements),
             context);
-    executor.start(consumerTrackingVisitor.getRootTransforms());
+    executor.start(graph.getRootTransforms());
 
     Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
         pipeline.getAggregatorSteps();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 201aaed..b5a23d7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -74,8 +74,10 @@ import org.joda.time.Instant;
  * can be executed.
  */
 class EvaluationContext {
-  /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */
-  private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
+  /**
+   * The graph representing this {@link Pipeline}.
+   */
+  private final DirectGraph graph;
 
   /** The options that were used to create this {@link Pipeline}. */
   private final DirectOptions options;
@@ -99,36 +101,19 @@ class EvaluationContext {
   private final DirectMetrics metrics;
 
   public static EvaluationContext create(
-      DirectOptions options,
-      Clock clock,
-      BundleFactory bundleFactory,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
-      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
-      Collection<PCollectionView<?>> views) {
-    return new EvaluationContext(
-        options, clock, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
+      DirectOptions options, Clock clock, BundleFactory bundleFactory, DirectGraph graph) {
+    return new EvaluationContext(options, clock, bundleFactory, graph);
   }
 
   private EvaluationContext(
-      DirectOptions options,
-      Clock clock,
-      BundleFactory bundleFactory,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
-      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
-      Collection<PCollectionView<?>> views) {
+      DirectOptions options, Clock clock, BundleFactory bundleFactory, DirectGraph graph) {
     this.options = checkNotNull(options);
     this.clock = clock;
     this.bundleFactory = checkNotNull(bundleFactory);
-    checkNotNull(rootTransforms);
-    checkNotNull(valueToConsumers);
-    checkNotNull(stepNames);
-    checkNotNull(views);
-    this.stepNames = stepNames;
+    this.graph = checkNotNull(graph);
 
-    this.watermarkManager = WatermarkManager.create(clock, rootTransforms, valueToConsumers);
-    this.sideInputContainer = SideInputContainer.create(this, views);
+    this.watermarkManager = WatermarkManager.create(clock, graph);
+    this.sideInputContainer = SideInputContainer.create(this, graph.getViews());
 
     this.applicationStateInternals = new ConcurrentHashMap<>();
     this.mergedAggregators = AggregatorContainer.create();
@@ -211,7 +196,7 @@ class EvaluationContext {
     ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();
     for (UncommittedBundle<?> inProgress : bundles) {
       AppliedPTransform<?, ?, ?> producing =
-          inProgress.getPCollection().getProducingTransformInternal();
+          graph.getProducer(inProgress.getPCollection());
       TransformWatermarks watermarks = watermarkManager.getWatermarks(producing);
       CommittedBundle<?> committed =
           inProgress.commit(watermarks.getSynchronizedProcessingOutputTime());
@@ -225,7 +210,7 @@ class EvaluationContext {
   }
 
   private void fireAllAvailableCallbacks() {
-    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+    for (AppliedPTransform<?, ?, ?> transform : graph.getPrimitiveTransforms()) {
       fireAvailableCallbacks(transform);
     }
   }
@@ -290,10 +275,10 @@ class EvaluationContext {
       BoundedWindow window,
       WindowingStrategy<?, ?> windowingStrategy,
       Runnable runnable) {
-    AppliedPTransform<?, ?, ?> producing = getProducing(value);
+    AppliedPTransform<?, ?, ?> producing = graph.getProducer(value);
     callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
 
-    fireAvailableCallbacks(lookupProducing(value));
+    fireAvailableCallbacks(producing);
   }
 
   /**
@@ -311,22 +296,6 @@ class EvaluationContext {
     fireAvailableCallbacks(producing);
   }
 
-  private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
-    if (value.getProducingTransformInternal() != null) {
-      return value.getProducingTransformInternal();
-    }
-    return lookupProducing(value);
-  }
-
-  private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) {
-    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
-      if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) {
-        return transform;
-      }
-    }
-    return null;
-  }
-
   /**
    * Get the options used by this {@link Pipeline}.
    */
@@ -347,18 +316,17 @@ class EvaluationContext {
         watermarkManager.getWatermarks(application));
   }
 
-  /**
-   * Get all of the steps used in this {@link Pipeline}.
-   */
-  public Collection<AppliedPTransform<?, ?, ?>> getSteps() {
-    return stepNames.keySet();
-  }
 
   /**
    * Get the Step Name for the provided application.
    */
-  public String getStepName(AppliedPTransform<?, ?, ?> application) {
-    return stepNames.get(application);
+  String getStepName(AppliedPTransform<?, ?, ?> application) {
+    return graph.getStepName(application);
+  }
+
+  /** Returns all of the steps in this {@link Pipeline}. */
+  Collection<AppliedPTransform<?, ?, ?>> getSteps() {
+    return graph.getPrimitiveTransforms();
   }
 
   /**
@@ -450,7 +418,7 @@ class EvaluationContext {
    * Returns true if all steps are done.
    */
   public boolean isDone() {
-    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+    for (AppliedPTransform<?, ?, ?> transform : graph.getPrimitiveTransforms()) {
       if (!isDone(transform)) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index b7908c5..929d09d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -69,7 +69,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   private final int targetParallelism;
   private final ExecutorService executorService;
 
-  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
+  private final DirectGraph graph;
   private final Set<PValue> keyedPValues;
   private final RootProviderRegistry rootProviderRegistry;
   private final TransformEvaluatorRegistry registry;
@@ -104,7 +104,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
 
   public static ExecutorServiceParallelExecutor create(
       int targetParallelism,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      DirectGraph graph,
       Set<PValue> keyedPValues,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
@@ -114,7 +114,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       EvaluationContext context) {
     return new ExecutorServiceParallelExecutor(
         targetParallelism,
-        valueToConsumers,
+        graph,
         keyedPValues,
         rootProviderRegistry,
         registry,
@@ -124,7 +124,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
 
   private ExecutorServiceParallelExecutor(
       int targetParallelism,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      DirectGraph graph,
       Set<PValue> keyedPValues,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
@@ -133,7 +133,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       EvaluationContext context) {
     this.targetParallelism = targetParallelism;
     this.executorService = Executors.newFixedThreadPool(targetParallelism);
-    this.valueToConsumers = valueToConsumers;
+    this.graph = graph;
     this.keyedPValues = keyedPValues;
     this.rootProviderRegistry = rootProviderRegistry;
     this.registry = registry;
@@ -273,8 +273,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
         CommittedBundle<?> inputBundle, TransformResult<?> result) {
       CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
-            valueToConsumers.get(outputBundle.getPCollection())));
+        allUpdates.offer(
+            ExecutorUpdate.fromBundle(
+                outputBundle, graph.getPrimitiveConsumers(outputBundle.getPCollection())));
       }
       CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs();
       if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 4f72f68..8d77e25 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -46,17 +46,20 @@ import org.joda.time.Instant;
  */
 class ImmutabilityCheckingBundleFactory implements BundleFactory {
   /**
-   * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the underlying
-   * {@link BundleFactory} to create the output bundle.
+   * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the underlying {@link
+   * BundleFactory} to create the output bundle.
    */
-  public static ImmutabilityCheckingBundleFactory create(BundleFactory underlying) {
-    return new ImmutabilityCheckingBundleFactory(underlying);
+  public static ImmutabilityCheckingBundleFactory create(
+      BundleFactory underlying, DirectGraph graph) {
+    return new ImmutabilityCheckingBundleFactory(underlying, graph);
   }
 
   private final BundleFactory underlying;
+  private final DirectGraph graph;
 
-  private ImmutabilityCheckingBundleFactory(BundleFactory underlying) {
+  private ImmutabilityCheckingBundleFactory(BundleFactory underlying, DirectGraph graph) {
     this.underlying = checkNotNull(underlying);
+    this.graph = graph;
   }
 
   /**
@@ -72,7 +75,7 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory {
 
   @Override
   public <T> UncommittedBundle<T> createBundle(PCollection<T> output) {
-    if (Enforcement.IMMUTABILITY.appliesTo(output.getProducingTransformInternal().getTransform())) {
+    if (Enforcement.IMMUTABILITY.appliesTo(output, graph)) {
       return new ImmutabilityEnforcingBundle<>(underlying.createBundle(output));
     }
     return underlying.createBundle(output);
@@ -81,13 +84,13 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory {
   @Override
   public <K, T> UncommittedBundle<T> createKeyedBundle(
       StructuralKey<K> key, PCollection<T> output) {
-    if (Enforcement.IMMUTABILITY.appliesTo(output.getProducingTransformInternal().getTransform())) {
+    if (Enforcement.IMMUTABILITY.appliesTo(output, graph)) {
       return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(key, output));
     }
     return underlying.createKeyedBundle(key, output);
   }
 
-  private static class ImmutabilityEnforcingBundle<T> implements UncommittedBundle<T> {
+  private class ImmutabilityEnforcingBundle<T> implements UncommittedBundle<T> {
     private final UncommittedBundle<T> underlying;
     private final SetMultimap<WindowedValue<T>, MutationDetector> mutationDetectors;
     private Coder<T> coder;
@@ -125,7 +128,7 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory {
                 String.format(
                     "PTransform %s mutated value %s after it was output (new value was %s)."
                         + " Values must not be mutated in any way after being output.",
-                    underlying.getPCollection().getProducingTransformInternal().getFullName(),
+                    graph.getProducer(underlying.getPCollection()).getFullName(),
                     exn.getSavedValue(),
                     exn.getNewValue()),
                 exn.getSavedValue(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index a53c11c..247b1cc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -669,10 +669,10 @@ public class WatermarkManager {
   private final Clock clock;
 
   /**
-   * A map from each {@link PCollection} to all {@link AppliedPTransform PTransform applications}
-   * that consume that {@link PCollection}.
+   * The {@link DirectGraph} representing the {@link Pipeline} this {@link WatermarkManager} tracks
+   * watermarks for.
    */
-  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers;
+  private final DirectGraph graph;
 
   /**
    * The input and output watermark of each {@link AppliedPTransform}.
@@ -697,27 +697,21 @@ public class WatermarkManager {
   private final Set<AppliedPTransform<?, ?, ?>> pendingRefreshes;
 
   /**
-   * Creates a new {@link WatermarkManager}. All watermarks within the newly created
-   * {@link WatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the
-   * minimum watermark, with no watermark holds or pending elements.
+   * Creates a new {@link WatermarkManager}. All watermarks within the newly created {@link
+   * WatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the minimum watermark,
+   * with no watermark holds or pending elements.
    *
-   * @param rootTransforms the root-level transforms of the {@link Pipeline}
-   * @param consumers a mapping between each {@link PCollection} in the {@link Pipeline} to the
-   *                  transforms that consume it as a part of their input
+   * @param clock the clock to use to determine processing time
+   * @param graph the graph representing this pipeline
    */
-  public static WatermarkManager create(
-      Clock clock,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
-    return new WatermarkManager(clock, rootTransforms, consumers);
+  public static WatermarkManager create(Clock clock, DirectGraph graph) {
+    return new WatermarkManager(clock, graph);
   }
 
-  private WatermarkManager(
-      Clock clock,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
+  private WatermarkManager(Clock clock, DirectGraph graph) {
     this.clock = clock;
-    this.consumers = consumers;
+    this.graph = graph;
+
     this.pendingUpdates = new ConcurrentLinkedQueue<>();
 
     this.refreshLock = new ReentrantLock();
@@ -725,13 +719,11 @@ public class WatermarkManager {
 
     transformToWatermarks = new HashMap<>();
 
-    for (AppliedPTransform<?, ?, ?> rootTransform : rootTransforms) {
+    for (AppliedPTransform<?, ?, ?> rootTransform : graph.getRootTransforms()) {
       getTransformWatermark(rootTransform);
     }
-    for (Collection<AppliedPTransform<?, ?, ?>> intermediateTransforms : consumers.values()) {
-      for (AppliedPTransform<?, ?, ?> transform : intermediateTransforms) {
-        getTransformWatermark(transform);
-      }
+    for (AppliedPTransform<?, ?, ?> primitiveTransform : graph.getPrimitiveTransforms()) {
+      getTransformWatermark(primitiveTransform);
     }
   }
 
@@ -769,8 +761,7 @@ public class WatermarkManager {
     }
     for (PValue pvalue : inputs) {
       Watermark producerOutputWatermark =
-          getTransformWatermark(pvalue.getProducingTransformInternal())
-              .synchronizedProcessingOutputWatermark;
+          getTransformWatermark(graph.getProducer(pvalue)).synchronizedProcessingOutputWatermark;
       inputWmsBuilder.add(producerOutputWatermark);
     }
     return inputWmsBuilder.build();
@@ -784,7 +775,7 @@ public class WatermarkManager {
     }
     for (PValue pvalue : inputs) {
       Watermark producerOutputWatermark =
-          getTransformWatermark(pvalue.getProducingTransformInternal()).outputWatermark;
+          getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
       inputWatermarksBuilder.add(producerOutputWatermark);
     }
     List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
@@ -920,7 +911,8 @@ public class WatermarkManager {
     // do not share a Mutex within this call and thus can be interleaved with external calls to
     // refresh.
     for (CommittedBundle<?> bundle : result.getOutputs()) {
-      for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) {
+      for (AppliedPTransform<?, ?, ?> consumer :
+          graph.getPrimitiveConsumers(bundle.getPCollection())) {
         TransformWatermarks watermarks = transformToWatermarks.get(consumer);
         watermarks.addPending(bundle);
       }
@@ -968,7 +960,7 @@ public class WatermarkManager {
     if (updateResult.isAdvanced()) {
       Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
       for (PValue outputPValue : toRefresh.getOutput().expand()) {
-        additionalRefreshes.addAll(consumers.get(outputPValue));
+        additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue));
       }
       return additionalRefreshes;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
index f7f4b71..02fe007 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
@@ -36,7 +38,6 @@ import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.PValue;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -72,7 +73,7 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
         p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton());
     p.traverseTopologically(visitor);
     assertThat(
-        visitor.getViews(),
+        visitor.getGraph().getViews(),
         Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView));
   }
 
@@ -83,7 +84,7 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     PCollection<Long> unCounted = p.apply(CountingInput.unbounded());
     p.traverseTopologically(visitor);
     assertThat(
-        visitor.getRootTransforms(),
+        visitor.getGraph().getRootTransforms(),
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
             created.getProducingTransformInternal(),
             counted.getProducingTransformInternal(),
@@ -96,7 +97,7 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
         PCollectionList.<String>empty(p).apply(Flatten.<String>pCollections());
     p.traverseTopologically(visitor);
     assertThat(
-        visitor.getRootTransforms(),
+        visitor.getGraph().getRootTransforms(),
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
             empty.getProducingTransformInternal()));
   }
@@ -121,15 +122,15 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     p.traverseTopologically(visitor);
 
     assertThat(
-        visitor.getValueToConsumers().get(created),
+        visitor.getGraph().getPrimitiveConsumers(created),
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
             transformed.getProducingTransformInternal(),
             flattened.getProducingTransformInternal()));
     assertThat(
-        visitor.getValueToConsumers().get(transformed),
+        visitor.getGraph().getPrimitiveConsumers(transformed),
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
             flattened.getProducingTransformInternal()));
-    assertThat(visitor.getValueToConsumers().get(flattened), emptyIterable());
+    assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable());
   }
 
   @Test
@@ -142,11 +143,11 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     p.traverseTopologically(visitor);
 
     assertThat(
-        visitor.getValueToConsumers().get(created),
+        visitor.getGraph().getPrimitiveConsumers(created),
         Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
             flattened.getProducingTransformInternal(),
             flattened.getProducingTransformInternal()));
-    assertThat(visitor.getValueToConsumers().get(flattened), emptyIterable());
+    assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable());
   }
 
   @Test
@@ -163,32 +164,11 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
                   }
                 }));
 
-    p.traverseTopologically(visitor);
-    assertThat(visitor.getUnfinalizedPValues(), Matchers.<PValue>contains(transformed));
-  }
-
-  @Test
-  public void getUnfinalizedPValuesEmpty() {
-    p.apply(Create.of("1", "2", "3"))
-        .apply(
-            ParDo.of(
-                new DoFn<String, String>() {
-                  @ProcessElement
-                  public void processElement(DoFn<String, String>.ProcessContext c)
-                      throws Exception {
-                    c.output(Integer.toString(c.element().length()));
-                  }
-                }))
-        .apply(
-            new PTransform<PInput, PDone>() {
-              @Override
-              public PDone apply(PInput input) {
-                return PDone.in(input.getPipeline());
-              }
-            });
+    assertThat(transformed.isFinishedSpecifyingInternal(), is(false));
 
     p.traverseTopologically(visitor);
-    assertThat(visitor.getUnfinalizedPValues(), emptyIterable());
+    visitor.finishSpecifyingRemainder();
+    assertThat(transformed.isFinishedSpecifyingInternal(), is(true));
   }
 
   @Test
@@ -214,18 +194,12 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
             });
 
     p.traverseTopologically(visitor);
-    assertThat(
-        visitor.getStepNames(),
-        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
-            created.getProducingTransformInternal(), "s0"));
-    assertThat(
-        visitor.getStepNames(),
-        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
-            transformed.getProducingTransformInternal(), "s1"));
-    assertThat(
-        visitor.getStepNames(),
-        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
-            finished.getProducingTransformInternal(), "s2"));
+    DirectGraph graph = visitor.getGraph();
+    assertThat(graph.getStepName(graph.getProducer(created)), equalTo("s0"));
+    assertThat(graph.getStepName(graph.getProducer(transformed)), equalTo("s1"));
+    // finished doesn't have a producer, because it's not a PValue.
+    // TODO: Demonstrate that PCollectionList/Tuple and other composite PValues are either safe to
+    // use, or make them so.
   }
 
   @Test
@@ -248,40 +222,18 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
   }
 
   @Test
-  public void getRootTransformsWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getRootTransforms");
-    visitor.getRootTransforms();
-  }
-  @Test
-  public void getStepNamesWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getStepNames");
-    visitor.getStepNames();
-  }
-  @Test
-  public void getUnfinalizedPValuesWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getUnfinalizedPValues");
-    visitor.getUnfinalizedPValues();
-  }
-
-  @Test
-  public void getValueToConsumersWithoutVisitingThrows() {
+  public void getGraphWithoutVisitingThrows() {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getValueToConsumers");
-    visitor.getValueToConsumers();
+    thrown.expectMessage("get a graph");
+    visitor.getGraph();
   }
 
   @Test
-  public void getViewsWithoutVisitingThrows() {
+  public void finishSpecifyingRemainderWithoutVisitingThrows() {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("completely traversed");
-    thrown.expectMessage("getViews");
-    visitor.getViews();
+    thrown.expectMessage("finishSpecifyingRemainder");
+    visitor.finishSpecifyingRemainder();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 9a3959d..1c2bf14 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
@@ -67,7 +66,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -87,10 +85,9 @@ public class EvaluationContextTest {
   private PCollection<KV<String, Integer>> downstream;
   private PCollectionView<Iterable<Integer>> view;
   private PCollection<Long> unbounded;
-  private Collection<AppliedPTransform<?, ?, ?>> rootTransforms;
-  private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
 
   private BundleFactory bundleFactory;
+  private DirectGraph graph;
 
   @Before
   public void setup() {
@@ -106,20 +103,12 @@ public class EvaluationContextTest {
 
     ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor();
     p.traverseTopologically(cVis);
-    rootTransforms = cVis.getRootTransforms();
-    valueToConsumers = cVis.getValueToConsumers();
 
     bundleFactory = ImmutableListBundleFactory.create();
-
+    graph = cVis.getGraph();
     context =
         EvaluationContext.create(
-            runner.getPipelineOptions(),
-            NanosOffsetClock.create(),
-            ImmutableListBundleFactory.create(),
-            rootTransforms,
-            valueToConsumers,
-            cVis.getStepNames(),
-            cVis.getViews());
+            runner.getPipelineOptions(), NanosOffsetClock.create(), bundleFactory, graph);
   }
 
   @Test
@@ -427,13 +416,13 @@ public class EvaluationContextTest {
   @Test
   public void isDoneWithUnboundedPCollectionAndNotShutdown() {
     context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+    assertThat(context.isDone(graph.getProducer(unbounded)), is(false));
 
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+        StepTransformResult.withoutHold(graph.getProducer(unbounded)).build());
+    assertThat(context.isDone(graph.getProducer(unbounded)), is(false));
   }
 
   @Test
@@ -472,7 +461,7 @@ public class EvaluationContextTest {
         StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
     assertThat(context.isDone(), is(false));
 
-    for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) {
+    for (AppliedPTransform<?, ?, ?> consumers : graph.getPrimitiveConsumers(created)) {
       context.handleResult(
           committedBundle,
           ImmutableList.<TimerData>of(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index ea44125..e7e1e62 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -56,7 +56,11 @@ public class ImmutabilityCheckingBundleFactoryTest {
     TestPipeline p = TestPipeline.create();
     created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of()));
     transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>()));
-    factory = ImmutabilityCheckingBundleFactory.create(ImmutableListBundleFactory.create());
+    ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor();
+    p.traverseTopologically(visitor);
+    factory =
+        ImmutabilityCheckingBundleFactory.create(
+            ImmutableListBundleFactory.create(), visitor.getGraph());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 2e8ab84..5cde4d6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -94,6 +94,7 @@ public class WatermarkManagerTest implements Serializable {
 
   private transient WatermarkManager manager;
   private transient BundleFactory bundleFactory;
+  private DirectGraph graph;
 
   @Before
   public void setup() {
@@ -139,8 +140,11 @@ public class WatermarkManagerTest implements Serializable {
     consumers.put(flattened, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
 
     clock = MockClock.fromInstant(new Instant(1000));
+    ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor();
+    p.traverseTopologically(visitor);
+    graph = visitor.getGraph();
 
-    manager = WatermarkManager.create(clock, rootTransforms, consumers);
+    manager = WatermarkManager.create(clock, graph);
     bundleFactory = ImmutableListBundleFactory.create();
   }
 
@@ -305,20 +309,13 @@ public class WatermarkManagerTest implements Serializable {
     PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
     PCollection<Integer> multiConsumer =
         PCollectionList.of(created).and(created).apply(Flatten.<Integer>pCollections());
-    AppliedPTransform<?, ?, ?> theFlatten = multiConsumer.getProducingTransformInternal();
+    ConsumerTrackingPipelineVisitor trackingVisitor = new ConsumerTrackingPipelineVisitor();
+    p.traverseTopologically(trackingVisitor);
+    DirectGraph graph = trackingVisitor.getGraph();
 
-    Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers =
-        ImmutableMap.<PValue, Collection<AppliedPTransform<?, ?, ?>>>builder()
-            .put(created, ImmutableList.<AppliedPTransform<?, ?, ?>>of(theFlatten, theFlatten))
-            .put(multiConsumer, Collections.<AppliedPTransform<?, ?, ?>>emptyList())
-            .build();
+    AppliedPTransform<?, ?, ?> theFlatten = graph.getProducer(multiConsumer);
 
-    WatermarkManager tstMgr =
-        WatermarkManager.create(
-            clock,
-            Collections.<AppliedPTransform<?, ?, ?>>singleton(
-                created.getProducingTransformInternal()),
-            valueToConsumers);
+    WatermarkManager tstMgr = WatermarkManager.create(clock, graph);
     CommittedBundle<Void> root =
         bundleFactory
             .<Void>createRootBundle()


[3/3] incubator-beam git commit: This closes #1487

Posted by tg...@apache.org.
This closes #1487


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1abbb900
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1abbb900
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1abbb900

Branch: refs/heads/master
Commit: 1abbb9007e83fc64f1bb61ff4593f37c6c386545
Parents: 8cb2689 662416a
Author: Thomas Groh <tg...@google.com>
Authored: Fri Dec 2 14:02:25 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Dec 2 14:02:25 2016 -0800

----------------------------------------------------------------------
 .../direct/ConsumerTrackingPipelineVisitor.java | 173 -----------
 .../apache/beam/runners/direct/DirectGraph.java |  89 ++++++
 .../beam/runners/direct/DirectGraphVisitor.java | 145 ++++++++++
 .../beam/runners/direct/DirectRunner.java       |  35 +--
 .../beam/runners/direct/EvaluationContext.java  |  76 ++---
 .../direct/ExecutorServiceParallelExecutor.java |  15 +-
 .../ImmutabilityCheckingBundleFactory.java      |  21 +-
 .../beam/runners/direct/WatermarkManager.java   |  50 ++--
 .../ConsumerTrackingPipelineVisitorTest.java    | 287 -------------------
 .../runners/direct/DirectGraphVisitorTest.java  | 239 +++++++++++++++
 .../runners/direct/EvaluationContextTest.java   |  29 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |   6 +-
 .../runners/direct/WatermarkManagerTest.java    |  23 +-
 13 files changed, 575 insertions(+), 613 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Rename ConsumerTrackingPipelineVisitor to DirectGraphVisitor

Posted by tg...@apache.org.
Rename ConsumerTrackingPipelineVisitor to DirectGraphVisitor

Reduce visibility of Visitor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/662416a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/662416a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/662416a4

Branch: refs/heads/master
Commit: 662416a4e176cca252c0d6fde1bf4252aeaa56c0
Parents: 8162cd2
Author: Thomas Groh <tg...@google.com>
Authored: Fri Dec 2 10:07:05 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Dec 2 14:02:25 2016 -0800

----------------------------------------------------------------------
 .../direct/ConsumerTrackingPipelineVisitor.java | 145 -----------
 .../beam/runners/direct/DirectGraphVisitor.java | 145 +++++++++++
 .../beam/runners/direct/DirectRunner.java       |   8 +-
 .../ConsumerTrackingPipelineVisitorTest.java    | 239 -------------------
 .../runners/direct/DirectGraphVisitorTest.java  | 239 +++++++++++++++++++
 .../runners/direct/EvaluationContextTest.java   |   6 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |   2 +-
 .../runners/direct/WatermarkManagerTest.java    |   8 +-
 8 files changed, 396 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
deleted file mode 100644
index b9e77c5..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
- * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume
- * input after the upstream transform has produced and committed output.
- */
-public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults {
-  private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
-
-  private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers =
-      ArrayListMultimap.create();
-
-  private Set<PCollectionView<?>> views = new HashSet<>();
-  private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>();
-  private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
-  private Set<PValue> toFinalize = new HashSet<>();
-  private int numTransforms = 0;
-  private boolean finalized = false;
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    checkState(
-        !finalized,
-        "Attempting to traverse a pipeline (node %s) with a %s "
-            + "which has already visited a Pipeline and is finalized",
-        node.getFullName(),
-        ConsumerTrackingPipelineVisitor.class.getSimpleName());
-    return CompositeBehavior.ENTER_TRANSFORM;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {
-    checkState(
-        !finalized,
-        "Attempting to traverse a pipeline (node %s) with a %s which is already finalized",
-        node.getFullName(),
-        ConsumerTrackingPipelineVisitor.class.getSimpleName());
-    if (node.isRootNode()) {
-      finalized = true;
-    }
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    toFinalize.removeAll(node.getInput().expand());
-    AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
-    stepNames.put(appliedTransform, genStepName());
-    if (node.getInput().expand().isEmpty()) {
-      rootTransforms.add(appliedTransform);
-    } else {
-      for (PValue value : node.getInput().expand()) {
-        primitiveConsumers.put(value, appliedTransform);
-      }
-    }
-  }
-
- @Override
-  public void visitValue(PValue value, TransformHierarchy.Node producer) {
-    toFinalize.add(value);
-
-    AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
-    if (!producers.containsKey(value)) {
-      producers.put(value, appliedTransform);
-    }
-    for (PValue expandedValue : value.expand()) {
-      if (expandedValue instanceof PCollectionView) {
-        views.add((PCollectionView<?>) expandedValue);
-      }
-      if (!producers.containsKey(expandedValue)) {
-        producers.put(value, appliedTransform);
-      }
-    }
-  }
-
-  private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) {
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
-        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform());
-    return application;
-  }
-
-  private String genStepName() {
-    return String.format("s%s", numTransforms++);
-  }
-
-  /**
-   * Returns all of the {@link PValue PValues} that have been produced but not consumed. These
-   * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the
-   * {@link Pipeline} is executed.
-   */
-  public void finishSpecifyingRemainder() {
-    checkState(
-        finalized,
-        "Can't call finishSpecifyingRemainder before the Pipeline has been completely traversed");
-    for (PValue unfinalized : toFinalize) {
-      unfinalized.finishSpecifying();
-    }
-  }
-
-  /**
-   * Get the graph constructed by this {@link ConsumerTrackingPipelineVisitor}, which provides
-   * lookups for producers and consumers of {@link PValue PValues}.
-   */
-  public DirectGraph getGraph() {
-    checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed");
-    return DirectGraph.create(producers, primitiveConsumers, views, rootTransforms, stepNames);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
new file mode 100644
index 0000000..cd9d120
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
+ * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume
+ * input after the upstream transform has produced and committed output.
+ */
+class DirectGraphVisitor extends PipelineVisitor.Defaults {
+  private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
+
+  private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers =
+      ArrayListMultimap.create();
+
+  private Set<PCollectionView<?>> views = new HashSet<>();
+  private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>();
+  private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
+  private Set<PValue> toFinalize = new HashSet<>();
+  private int numTransforms = 0;
+  private boolean finalized = false;
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    checkState(
+        !finalized,
+        "Attempting to traverse a pipeline (node %s) with a %s "
+            + "which has already visited a Pipeline and is finalized",
+        node.getFullName(),
+        getClass().getSimpleName());
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    checkState(
+        !finalized,
+        "Attempting to traverse a pipeline (node %s) with a %s which is already finalized",
+        node.getFullName(),
+        getClass().getSimpleName());
+    if (node.isRootNode()) {
+      finalized = true;
+    }
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    toFinalize.removeAll(node.getInput().expand());
+    AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
+    stepNames.put(appliedTransform, genStepName());
+    if (node.getInput().expand().isEmpty()) {
+      rootTransforms.add(appliedTransform);
+    } else {
+      for (PValue value : node.getInput().expand()) {
+        primitiveConsumers.put(value, appliedTransform);
+      }
+    }
+  }
+
+ @Override
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
+    toFinalize.add(value);
+
+    AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
+    if (!producers.containsKey(value)) {
+      producers.put(value, appliedTransform);
+    }
+    for (PValue expandedValue : value.expand()) {
+      if (expandedValue instanceof PCollectionView) {
+        views.add((PCollectionView<?>) expandedValue);
+      }
+      if (!producers.containsKey(expandedValue)) {
+        producers.put(value, appliedTransform);
+      }
+    }
+  }
+
+  private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) {
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
+        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform());
+    return application;
+  }
+
+  private String genStepName() {
+    return String.format("s%s", numTransforms++);
+  }
+
+  /**
+   * Returns all of the {@link PValue PValues} that have been produced but not consumed. These
+   * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the
+   * {@link Pipeline} is executed.
+   */
+  public void finishSpecifyingRemainder() {
+    checkState(
+        finalized,
+        "Can't call finishSpecifyingRemainder before the Pipeline has been completely traversed");
+    for (PValue unfinalized : toFinalize) {
+      unfinalized.finishSpecifying();
+    }
+  }
+
+  /**
+   * Get the graph constructed by this {@link DirectGraphVisitor}, which provides
+   * lookups for producers and consumers of {@link PValue PValues}.
+   */
+  public DirectGraph getGraph() {
+    checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed");
+    return DirectGraph.create(producers, primitiveConsumers, views, rootTransforms, stepNames);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 0ad5836..2f84356 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -298,9 +298,9 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   @Override
   public DirectPipelineResult run(Pipeline pipeline) {
     MetricsEnvironment.setMetricsSupported(true);
-    ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
-    pipeline.traverseTopologically(consumerTrackingVisitor);
-    consumerTrackingVisitor.finishSpecifyingRemainder();
+    DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
+    pipeline.traverseTopologically(graphVisitor);
+    graphVisitor.finishSpecifyingRemainder();
 
     @SuppressWarnings("rawtypes")
     KeyedPValueTrackingVisitor keyedPValueVisitor =
@@ -313,7 +313,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
 
     DisplayDataValidator.validatePipeline(pipeline);
 
-    DirectGraph graph = consumerTrackingVisitor.getGraph();
+    DirectGraph graph = graphVisitor.getGraph();
     EvaluationContext context =
         EvaluationContext.create(
             getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
deleted file mode 100644
index 02fe007..0000000
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ConsumerTrackingPipelineVisitor}.
- */
-@RunWith(JUnit4.class)
-public class ConsumerTrackingPipelineVisitorTest implements Serializable {
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-
-  private transient TestPipeline p = TestPipeline.create();
-  private transient ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor();
-
-  @Test
-  public void getViewsReturnsViews() {
-    PCollectionView<List<String>> listView =
-        p.apply("listCreate", Create.of("foo", "bar"))
-            .apply(
-                ParDo.of(
-                    new DoFn<String, String>() {
-                      @ProcessElement
-                      public void processElement(DoFn<String, String>.ProcessContext c)
-                          throws Exception {
-                        c.output(Integer.toString(c.element().length()));
-                      }
-                    }))
-            .apply(View.<String>asList());
-    PCollectionView<Object> singletonView =
-        p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton());
-    p.traverseTopologically(visitor);
-    assertThat(
-        visitor.getGraph().getViews(),
-        Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView));
-  }
-
-  @Test
-  public void getRootTransformsContainsPBegins() {
-    PCollection<String> created = p.apply(Create.of("foo", "bar"));
-    PCollection<Long> counted = p.apply(CountingInput.upTo(1234L));
-    PCollection<Long> unCounted = p.apply(CountingInput.unbounded());
-    p.traverseTopologically(visitor);
-    assertThat(
-        visitor.getGraph().getRootTransforms(),
-        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
-            created.getProducingTransformInternal(),
-            counted.getProducingTransformInternal(),
-            unCounted.getProducingTransformInternal()));
-  }
-
-  @Test
-  public void getRootTransformsContainsEmptyFlatten() {
-    PCollection<String> empty =
-        PCollectionList.<String>empty(p).apply(Flatten.<String>pCollections());
-    p.traverseTopologically(visitor);
-    assertThat(
-        visitor.getGraph().getRootTransforms(),
-        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
-            empty.getProducingTransformInternal()));
-  }
-
-  @Test
-  public void getValueToConsumersSucceeds() {
-    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
-    PCollection<String> transformed =
-        created.apply(
-            ParDo.of(
-                new DoFn<String, String>() {
-                  @ProcessElement
-                  public void processElement(DoFn<String, String>.ProcessContext c)
-                      throws Exception {
-                    c.output(Integer.toString(c.element().length()));
-                  }
-                }));
-
-    PCollection<String> flattened =
-        PCollectionList.of(created).and(transformed).apply(Flatten.<String>pCollections());
-
-    p.traverseTopologically(visitor);
-
-    assertThat(
-        visitor.getGraph().getPrimitiveConsumers(created),
-        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
-            transformed.getProducingTransformInternal(),
-            flattened.getProducingTransformInternal()));
-    assertThat(
-        visitor.getGraph().getPrimitiveConsumers(transformed),
-        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
-            flattened.getProducingTransformInternal()));
-    assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable());
-  }
-
-  @Test
-  public void getValueToConsumersWithDuplicateInputSucceeds() {
-    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
-
-    PCollection<String> flattened =
-        PCollectionList.of(created).and(created).apply(Flatten.<String>pCollections());
-
-    p.traverseTopologically(visitor);
-
-    assertThat(
-        visitor.getGraph().getPrimitiveConsumers(created),
-        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
-            flattened.getProducingTransformInternal(),
-            flattened.getProducingTransformInternal()));
-    assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable());
-  }
-
-  @Test
-  public void getUnfinalizedPValuesContainsDanglingOutputs() {
-    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
-    PCollection<String> transformed =
-        created.apply(
-            ParDo.of(
-                new DoFn<String, String>() {
-                  @ProcessElement
-                  public void processElement(DoFn<String, String>.ProcessContext c)
-                      throws Exception {
-                    c.output(Integer.toString(c.element().length()));
-                  }
-                }));
-
-    assertThat(transformed.isFinishedSpecifyingInternal(), is(false));
-
-    p.traverseTopologically(visitor);
-    visitor.finishSpecifyingRemainder();
-    assertThat(transformed.isFinishedSpecifyingInternal(), is(true));
-  }
-
-  @Test
-  public void getStepNamesContainsAllTransforms() {
-    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
-    PCollection<String> transformed =
-        created.apply(
-            ParDo.of(
-                new DoFn<String, String>() {
-                  @ProcessElement
-                  public void processElement(DoFn<String, String>.ProcessContext c)
-                      throws Exception {
-                    c.output(Integer.toString(c.element().length()));
-                  }
-                }));
-    PDone finished =
-        transformed.apply(
-            new PTransform<PInput, PDone>() {
-              @Override
-              public PDone apply(PInput input) {
-                return PDone.in(input.getPipeline());
-              }
-            });
-
-    p.traverseTopologically(visitor);
-    DirectGraph graph = visitor.getGraph();
-    assertThat(graph.getStepName(graph.getProducer(created)), equalTo("s0"));
-    assertThat(graph.getStepName(graph.getProducer(transformed)), equalTo("s1"));
-    // finished doesn't have a producer, because it's not a PValue.
-    // TODO: Demonstrate that PCollectionList/Tuple and other composite PValues are either safe to
-    // use, or make them so.
-  }
-
-  @Test
-  public void traverseMultipleTimesThrows() {
-    p.apply(Create.of(1, 2, 3));
-
-    p.traverseTopologically(visitor);
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(ConsumerTrackingPipelineVisitor.class.getSimpleName());
-    thrown.expectMessage("is finalized");
-    p.traverseTopologically(visitor);
-  }
-
-  @Test
-  public void traverseIndependentPathsSucceeds() {
-    p.apply("left", Create.of(1, 2, 3));
-    p.apply("right", Create.of("foo", "bar", "baz"));
-
-    p.traverseTopologically(visitor);
-  }
-
-  @Test
-  public void getGraphWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("get a graph");
-    visitor.getGraph();
-  }
-
-  @Test
-  public void finishSpecifyingRemainderWithoutVisitingThrows() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("completely traversed");
-    thrown.expectMessage("finishSpecifyingRemainder");
-    visitor.finishSpecifyingRemainder();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
new file mode 100644
index 0000000..d218a81
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DirectGraphVisitor}.
+ */
+@RunWith(JUnit4.class)
+public class DirectGraphVisitorTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  private transient TestPipeline p = TestPipeline.create();
+  private transient DirectGraphVisitor visitor = new DirectGraphVisitor();
+
+  @Test
+  public void getViewsReturnsViews() {
+    PCollectionView<List<String>> listView =
+        p.apply("listCreate", Create.of("foo", "bar"))
+            .apply(
+                ParDo.of(
+                    new DoFn<String, String>() {
+                      @ProcessElement
+                      public void processElement(DoFn<String, String>.ProcessContext c)
+                          throws Exception {
+                        c.output(Integer.toString(c.element().length()));
+                      }
+                    }))
+            .apply(View.<String>asList());
+    PCollectionView<Object> singletonView =
+        p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton());
+    p.traverseTopologically(visitor);
+    assertThat(
+        visitor.getGraph().getViews(),
+        Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView));
+  }
+
+  @Test
+  public void getRootTransformsContainsPBegins() {
+    PCollection<String> created = p.apply(Create.of("foo", "bar"));
+    PCollection<Long> counted = p.apply(CountingInput.upTo(1234L));
+    PCollection<Long> unCounted = p.apply(CountingInput.unbounded());
+    p.traverseTopologically(visitor);
+    assertThat(
+        visitor.getGraph().getRootTransforms(),
+        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
+            created.getProducingTransformInternal(),
+            counted.getProducingTransformInternal(),
+            unCounted.getProducingTransformInternal()));
+  }
+
+  @Test
+  public void getRootTransformsContainsEmptyFlatten() {
+    PCollection<String> empty =
+        PCollectionList.<String>empty(p).apply(Flatten.<String>pCollections());
+    p.traverseTopologically(visitor);
+    assertThat(
+        visitor.getGraph().getRootTransforms(),
+        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
+            empty.getProducingTransformInternal()));
+  }
+
+  @Test
+  public void getValueToConsumersSucceeds() {
+    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
+    PCollection<String> transformed =
+        created.apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(DoFn<String, String>.ProcessContext c)
+                      throws Exception {
+                    c.output(Integer.toString(c.element().length()));
+                  }
+                }));
+
+    PCollection<String> flattened =
+        PCollectionList.of(created).and(transformed).apply(Flatten.<String>pCollections());
+
+    p.traverseTopologically(visitor);
+
+    assertThat(
+        visitor.getGraph().getPrimitiveConsumers(created),
+        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
+            transformed.getProducingTransformInternal(),
+            flattened.getProducingTransformInternal()));
+    assertThat(
+        visitor.getGraph().getPrimitiveConsumers(transformed),
+        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
+            flattened.getProducingTransformInternal()));
+    assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable());
+  }
+
+  @Test
+  public void getValueToConsumersWithDuplicateInputSucceeds() {
+    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
+
+    PCollection<String> flattened =
+        PCollectionList.of(created).and(created).apply(Flatten.<String>pCollections());
+
+    p.traverseTopologically(visitor);
+
+    assertThat(
+        visitor.getGraph().getPrimitiveConsumers(created),
+        Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
+            flattened.getProducingTransformInternal(),
+            flattened.getProducingTransformInternal()));
+    assertThat(visitor.getGraph().getPrimitiveConsumers(flattened), emptyIterable());
+  }
+
+  @Test
+  public void getUnfinalizedPValuesContainsDanglingOutputs() {
+    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
+    PCollection<String> transformed =
+        created.apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(DoFn<String, String>.ProcessContext c)
+                      throws Exception {
+                    c.output(Integer.toString(c.element().length()));
+                  }
+                }));
+
+    assertThat(transformed.isFinishedSpecifyingInternal(), is(false));
+
+    p.traverseTopologically(visitor);
+    visitor.finishSpecifyingRemainder();
+    assertThat(transformed.isFinishedSpecifyingInternal(), is(true));
+  }
+
+  @Test
+  public void getStepNamesContainsAllTransforms() {
+    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
+    PCollection<String> transformed =
+        created.apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void processElement(DoFn<String, String>.ProcessContext c)
+                      throws Exception {
+                    c.output(Integer.toString(c.element().length()));
+                  }
+                }));
+    PDone finished =
+        transformed.apply(
+            new PTransform<PInput, PDone>() {
+              @Override
+              public PDone apply(PInput input) {
+                return PDone.in(input.getPipeline());
+              }
+            });
+
+    p.traverseTopologically(visitor);
+    DirectGraph graph = visitor.getGraph();
+    assertThat(graph.getStepName(graph.getProducer(created)), equalTo("s0"));
+    assertThat(graph.getStepName(graph.getProducer(transformed)), equalTo("s1"));
+    // finished doesn't have a producer, because it's not a PValue.
+    // TODO: Demonstrate that PCollectionList/Tuple and other composite PValues are either safe to
+    // use, or make them so.
+  }
+
+  @Test
+  public void traverseMultipleTimesThrows() {
+    p.apply(Create.of(1, 2, 3));
+
+    p.traverseTopologically(visitor);
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(DirectGraphVisitor.class.getSimpleName());
+    thrown.expectMessage("is finalized");
+    p.traverseTopologically(visitor);
+  }
+
+  @Test
+  public void traverseIndependentPathsSucceeds() {
+    p.apply("left", Create.of(1, 2, 3));
+    p.apply("right", Create.of("foo", "bar", "baz"));
+
+    p.traverseTopologically(visitor);
+  }
+
+  @Test
+  public void getGraphWithoutVisitingThrows() {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("completely traversed");
+    thrown.expectMessage("get a graph");
+    visitor.getGraph();
+  }
+
+  @Test
+  public void finishSpecifyingRemainderWithoutVisitingThrows() {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("completely traversed");
+    thrown.expectMessage("finishSpecifyingRemainder");
+    visitor.finishSpecifyingRemainder();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 1c2bf14..17cdea1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -101,11 +101,11 @@ public class EvaluationContextTest {
     view = created.apply(View.<Integer>asIterable());
     unbounded = p.apply(CountingInput.unbounded());
 
-    ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor();
-    p.traverseTopologically(cVis);
+    DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
+    p.traverseTopologically(graphVisitor);
 
     bundleFactory = ImmutableListBundleFactory.create();
-    graph = cVis.getGraph();
+    graph = graphVisitor.getGraph();
     context =
         EvaluationContext.create(
             runner.getPipelineOptions(), NanosOffsetClock.create(), bundleFactory, graph);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index e7e1e62..6ab8aea 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -56,7 +56,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
     TestPipeline p = TestPipeline.create();
     created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of()));
     transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>()));
-    ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor();
+    DirectGraphVisitor visitor = new DirectGraphVisitor();
     p.traverseTopologically(visitor);
     factory =
         ImmutabilityCheckingBundleFactory.create(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 5cde4d6..076e0fb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -140,7 +140,7 @@ public class WatermarkManagerTest implements Serializable {
     consumers.put(flattened, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
 
     clock = MockClock.fromInstant(new Instant(1000));
-    ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor();
+    DirectGraphVisitor visitor = new DirectGraphVisitor();
     p.traverseTopologically(visitor);
     graph = visitor.getGraph();
 
@@ -309,9 +309,9 @@ public class WatermarkManagerTest implements Serializable {
     PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
     PCollection<Integer> multiConsumer =
         PCollectionList.of(created).and(created).apply(Flatten.<Integer>pCollections());
-    ConsumerTrackingPipelineVisitor trackingVisitor = new ConsumerTrackingPipelineVisitor();
-    p.traverseTopologically(trackingVisitor);
-    DirectGraph graph = trackingVisitor.getGraph();
+    DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
+    p.traverseTopologically(graphVisitor);
+    DirectGraph graph = graphVisitor.getGraph();
 
     AppliedPTransform<?, ?, ?> theFlatten = graph.getProducer(multiConsumer);