You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/01 01:08:59 UTC

[2/2] beam git commit: Separate View Evaluation and Overrides

Separate View Evaluation and Overrides

This simplifies the View Evaluation by separating it from the logic of
overriding CreatePCollectionView.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69d0b307
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69d0b307
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69d0b307

Branch: refs/heads/master
Commit: 69d0b3070eaee55f650b15ceda3608cc27807caf
Parents: 5f72b83
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 27 18:03:30 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 31 18:08:46 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |   1 -
 .../direct/TransformEvaluatorRegistry.java      |   3 +-
 .../runners/direct/ViewEvaluatorFactory.java    |  79 ++----------
 .../runners/direct/ViewOverrideFactory.java     | 115 +++++++++++++++++
 .../direct/ViewEvaluatorFactoryTest.java        |  11 +-
 .../runners/direct/ViewOverrideFactoryTest.java | 124 +++++++++++++++++++
 6 files changed, 249 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 11fe3f5..bd210c3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -35,7 +35,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
-import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;

http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 5ad8709..ae7ad93 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
+import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten.PCollections;
@@ -54,7 +55,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt))
             .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
             .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
-            .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
+            .put(WriteView.class, new ViewEvaluatorFactory(ctxt))
             .put(Window.Assign.class, new WindowEvaluatorFactory(ctxt))
             // Runner-specific primitives used in expansion of GroupByKey
             .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory(ctxt))

http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 9dcbf9e..dc74d3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -20,32 +20,25 @@ package org.apache.beam.runners.direct;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.core.construction.ForwardingPTransform;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
- * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link CreatePCollectionView} primitive {@link PTransform}.
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link CreatePCollectionView}
+ * primitive {@link PTransform}.
  *
  * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
- * the {@link WriteView} {@link PTransform}, which is part of the
- * {@link DirectCreatePCollectionView} composite transform. This transform is an override for the
- * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is
- * written.
+ * the {@link WriteView} {@link PTransform}, which is part of the {@link DirectRunner} override.
+ * This transform is an override for the {@link CreatePCollectionView} transform that applies
+ * windowing and triggers before the view is written.
  */
 class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   private final EvaluationContext context;
@@ -91,67 +84,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
         if (!elements.isEmpty()) {
           resultBuilder = resultBuilder.withAdditionalOutput(OutputType.PCOLLECTION_VIEW);
         }
-        return resultBuilder
-            .build();
+        return resultBuilder.build();
       }
     };
   }
 
-  public static class ViewOverrideFactory<ElemT, ViewT>
-      extends SingleInputOutputOverrideFactory<
-                PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
-    @Override
-    public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
-        CreatePCollectionView<ElemT, ViewT> transform) {
-      return new DirectCreatePCollectionView<>(transform);
-    }
-  }
-
-  /**
-   * An in-process override for {@link CreatePCollectionView}.
-   */
-  private static class DirectCreatePCollectionView<ElemT, ViewT>
-      extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
-
-    private DirectCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
-    }
-
-    @Override
-    public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
-      return input.apply(WithKeys.<Void, ElemT>of((Void) null))
-          .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
-          .apply(GroupByKey.<Void, ElemT>create())
-          .apply(Values.<Iterable<ElemT>>create())
-          .apply(new WriteView<ElemT, ViewT>(og));
-    }
-
-    @Override
-    protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
-      return og;
-    }
-  }
-
-  /**
-   * An in-process implementation of the {@link CreatePCollectionView} primitive.
-   *
-   * <p>This implementation requires the input {@link PCollection} to be an iterable
-   * of {@code WindowedValue<ElemT>}, which is provided
-   * to {@link PCollectionView#getViewFn()} for conversion to {@link ViewT}.
-   */
-  public static final class WriteView<ElemT, ViewT>
-      extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
-    private final CreatePCollectionView<ElemT, ViewT> og;
-
-    WriteView(CreatePCollectionView<ElemT, ViewT> og) {
-      this.og = og;
-    }
-
-    @Override
-    @SuppressWarnings("deprecation")
-    public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) {
-      return og.getView();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
new file mode 100644
index 0000000..64e1218
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+
+/**
+ * A {@link PTransformOverrideFactory} that provides overrides for the {@link CreatePCollectionView}
+ * {@link PTransform}.
+ */
+class ViewOverrideFactory<ElemT, ViewT>
+    implements PTransformOverrideFactory<
+        PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
+  @Override
+  public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
+      CreatePCollectionView<ElemT, ViewT> transform) {
+    return new GroupAndWriteView<>(transform);
+  }
+
+  @Override
+  public PCollection<ElemT> getInput(List<TaggedPValue> inputs, Pipeline p) {
+    return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue();
+  }
+
+  @Override
+  public Map<PValue, ReplacementOutput> mapOutputs(
+      List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) {
+    return Collections.emptyMap();
+  }
+
+  /** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */
+  static class GroupAndWriteView<ElemT, ViewT>
+      extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+    private final CreatePCollectionView<ElemT, ViewT> og;
+
+    private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) {
+      this.og = og;
+    }
+
+    @Override
+    public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+      return input
+          .apply(WithKeys.<Void, ElemT>of((Void) null))
+          .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
+          .apply(GroupByKey.<Void, ElemT>create())
+          .apply(Values.<Iterable<ElemT>>create())
+          .apply(new WriteView<ElemT, ViewT>(og));
+    }
+
+    @Override
+    protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
+      return og;
+    }
+  }
+
+  /**
+   * The {@link DirectRunner} implementation of the {@link CreatePCollectionView} primitive.
+   *
+   * <p>This implementation requires the input {@link PCollection} to be an iterable of {@code
+   * WindowedValue<ElemT>}, which is provided to {@link PCollectionView#getViewFn()} for conversion
+   * to {@link ViewT}.
+   */
+  static final class WriteView<ElemT, ViewT>
+      extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
+    private final CreatePCollectionView<ElemT, ViewT> og;
+
+    WriteView(CreatePCollectionView<ElemT, ViewT> og) {
+      this.og = og;
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) {
+      return og.getView();
+    }
+
+    @SuppressWarnings("deprecation")
+    public PCollectionView<ViewT> getView() {
+      return og.getView();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index b56bd74..fe55a5f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
@@ -27,7 +26,6 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableList;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -71,7 +69,7 @@ public class ViewEvaluatorFactoryTest {
             .apply(GroupByKey.<Void, String>create())
             .apply(Values.<Iterable<String>>create());
     PCollectionView<Iterable<String>> view =
-        concat.apply(new ViewEvaluatorFactory.WriteView<>(createView));
+        concat.apply(new ViewOverrideFactory.WriteView<>(createView));
 
     EvaluationContext context = mock(EvaluationContext.class);
     TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
@@ -94,13 +92,6 @@ public class ViewEvaluatorFactoryTest {
             WindowedValue.valueInGlobalWindow("foo"), WindowedValue.valueInGlobalWindow("bar")));
   }
 
-  @Test
-  public void overrideFactoryGetInputSucceeds() {
-    ViewOverrideFactory<String, String> factory = new ViewOverrideFactory<>();
-    PCollection<String> input = p.apply(Create.of("foo", "bar"));
-    assertThat(factory.getInput(input.expand(), p), equalTo(input));
-  }
-
   private static class TestViewWriter<ElemT, ViewT> implements PCollectionViewWriter<ElemT, ViewT> {
     private Iterable<WindowedValue<ElemT>> latest;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
new file mode 100644
index 0000000..6dcc13c
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ViewOverrideFactory}. */
+@RunWith(JUnit4.class)
+public class ViewOverrideFactoryTest implements Serializable {
+  @Rule
+  public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  private transient ViewOverrideFactory<Integer, List<Integer>> factory =
+      new ViewOverrideFactory<>();
+
+  @Test
+  public void replacementSucceeds() {
+    PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
+    final PCollectionView<List<Integer>> view =
+        PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
+    PTransform<PCollection<Integer>, PCollectionView<List<Integer>>> replacementTransform =
+        factory.getReplacementTransform(CreatePCollectionView.<Integer, List<Integer>>of(view));
+    PCollectionView<List<Integer>> afterReplacement = ints.apply(replacementTransform);
+    assertThat(
+        "The CreatePCollectionView replacement should return the same View",
+        afterReplacement,
+        equalTo(view));
+
+    PCollection<Set<Integer>> outputViewContents =
+        p.apply("CreateSingleton", Create.of(0))
+            .apply(
+                "OutputContents",
+                ParDo.of(
+                        new DoFn<Integer, Set<Integer>>() {
+                          @ProcessElement
+                          public void outputSideInput(ProcessContext context) {
+                            context.output(ImmutableSet.copyOf(context.sideInput(view)));
+                          }
+                        })
+                    .withSideInputs(view));
+    PAssert.thatSingleton(outputViewContents).isEqualTo(ImmutableSet.of(1, 2, 3));
+
+    p.run();
+  }
+
+  @Test
+  public void replacementGetViewReturnsOriginal() {
+    final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
+    final PCollectionView<List<Integer>> view =
+        PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
+    PTransform<PCollection<Integer>, PCollectionView<List<Integer>>> replacement =
+        factory.getReplacementTransform(CreatePCollectionView.<Integer, List<Integer>>of(view));
+    ints.apply(replacement);
+    final AtomicBoolean writeViewVisited = new AtomicBoolean();
+    p.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            if (node.getTransform() instanceof WriteView) {
+              assertThat(
+                  "There should only be one WriteView primitive in the graph",
+                  writeViewVisited.getAndSet(true),
+                  is(false));
+              PCollectionView replacementView = ((WriteView) node.getTransform()).getView();
+              assertThat(replacementView, Matchers.<PCollectionView>theInstance(view));
+              assertThat(node.getInputs(), hasSize(1));
+            }
+          }
+        });
+
+    assertThat(writeViewVisited.get(), is(true));
+  }
+
+  @Test
+  public void overrideFactoryGetInputSucceeds() {
+    ViewOverrideFactory<String, String> factory = new ViewOverrideFactory<>();
+    PCollection<String> input = p.apply(Create.of("foo", "bar"));
+    assertThat(factory.getInput(input.expand(), p), equalTo(input));
+  }
+}