You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/01 01:08:59 UTC
[2/2] beam git commit: Separate View Evaluation and Overrides
Separate View Evaluation and Overrides
This simplifies the View Evaluation by separating it from the logic of
overriding CreatePCollectionView.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69d0b307
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69d0b307
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69d0b307
Branch: refs/heads/master
Commit: 69d0b3070eaee55f650b15ceda3608cc27807caf
Parents: 5f72b83
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 27 18:03:30 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 31 18:08:46 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 1 -
.../direct/TransformEvaluatorRegistry.java | 3 +-
.../runners/direct/ViewEvaluatorFactory.java | 79 ++----------
.../runners/direct/ViewOverrideFactory.java | 115 +++++++++++++++++
.../direct/ViewEvaluatorFactoryTest.java | 11 +-
.../runners/direct/ViewOverrideFactoryTest.java | 124 +++++++++++++++++++
6 files changed, 249 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 11fe3f5..bd210c3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -35,7 +35,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
-import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 5ad8709..ae7ad93 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
+import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten.PCollections;
@@ -54,7 +55,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
.put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt))
.put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
.put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
- .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
+ .put(WriteView.class, new ViewEvaluatorFactory(ctxt))
.put(Window.Assign.class, new WindowEvaluatorFactory(ctxt))
// Runner-specific primitives used in expansion of GroupByKey
.put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory(ctxt))
http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 9dcbf9e..dc74d3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -20,32 +20,25 @@ package org.apache.beam.runners.direct;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.runners.core.construction.ForwardingPTransform;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
import org.apache.beam.runners.direct.StepTransformResult.Builder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
/**
- * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link CreatePCollectionView} primitive {@link PTransform}.
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link CreatePCollectionView}
+ * primitive {@link PTransform}.
*
* <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
- * the {@link WriteView} {@link PTransform}, which is part of the
- * {@link DirectCreatePCollectionView} composite transform. This transform is an override for the
- * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is
- * written.
+ * the {@link WriteView} {@link PTransform}, which is part of the {@link DirectRunner} override.
+ * This transform is an override for the {@link CreatePCollectionView} transform that applies
+ * windowing and triggers before the view is written.
*/
class ViewEvaluatorFactory implements TransformEvaluatorFactory {
private final EvaluationContext context;
@@ -91,67 +84,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
if (!elements.isEmpty()) {
resultBuilder = resultBuilder.withAdditionalOutput(OutputType.PCOLLECTION_VIEW);
}
- return resultBuilder
- .build();
+ return resultBuilder.build();
}
};
}
- public static class ViewOverrideFactory<ElemT, ViewT>
- extends SingleInputOutputOverrideFactory<
- PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
- @Override
- public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
- CreatePCollectionView<ElemT, ViewT> transform) {
- return new DirectCreatePCollectionView<>(transform);
- }
- }
-
- /**
- * An in-process override for {@link CreatePCollectionView}.
- */
- private static class DirectCreatePCollectionView<ElemT, ViewT>
- extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
-
- private DirectCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
- }
-
- @Override
- public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
- return input.apply(WithKeys.<Void, ElemT>of((Void) null))
- .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
- .apply(GroupByKey.<Void, ElemT>create())
- .apply(Values.<Iterable<ElemT>>create())
- .apply(new WriteView<ElemT, ViewT>(og));
- }
-
- @Override
- protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
- return og;
- }
- }
-
- /**
- * An in-process implementation of the {@link CreatePCollectionView} primitive.
- *
- * <p>This implementation requires the input {@link PCollection} to be an iterable
- * of {@code WindowedValue<ElemT>}, which is provided
- * to {@link PCollectionView#getViewFn()} for conversion to {@link ViewT}.
- */
- public static final class WriteView<ElemT, ViewT>
- extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
-
- WriteView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) {
- return og.getView();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
new file mode 100644
index 0000000..64e1218
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+
+/**
+ * A {@link PTransformOverrideFactory} that provides overrides for the {@link CreatePCollectionView}
+ * {@link PTransform}.
+ */
+class ViewOverrideFactory<ElemT, ViewT>
+ implements PTransformOverrideFactory<
+ PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
+ @Override
+ public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
+ CreatePCollectionView<ElemT, ViewT> transform) {
+ return new GroupAndWriteView<>(transform);
+ }
+
+ @Override
+ public PCollection<ElemT> getInput(List<TaggedPValue> inputs, Pipeline p) {
+ return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue();
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) {
+ return Collections.emptyMap();
+ }
+
+ /** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */
+ static class GroupAndWriteView<ElemT, ViewT>
+ extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+ private final CreatePCollectionView<ElemT, ViewT> og;
+
+ private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) {
+ this.og = og;
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+ return input
+ .apply(WithKeys.<Void, ElemT>of((Void) null))
+ .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
+ .apply(GroupByKey.<Void, ElemT>create())
+ .apply(Values.<Iterable<ElemT>>create())
+ .apply(new WriteView<ElemT, ViewT>(og));
+ }
+
+ @Override
+ protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
+ return og;
+ }
+ }
+
+ /**
+ * The {@link DirectRunner} implementation of the {@link CreatePCollectionView} primitive.
+ *
+ * <p>This implementation requires the input {@link PCollection} to be an iterable of {@code
+ * WindowedValue<ElemT>}, which is provided to {@link PCollectionView#getViewFn()} for conversion
+ * to {@link ViewT}.
+ */
+ static final class WriteView<ElemT, ViewT>
+ extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
+ private final CreatePCollectionView<ElemT, ViewT> og;
+
+ WriteView(CreatePCollectionView<ElemT, ViewT> og) {
+ this.og = og;
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) {
+ return og.getView();
+ }
+
+ @SuppressWarnings("deprecation")
+ public PCollectionView<ViewT> getView() {
+ return og.getView();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index b56bd74..fe55a5f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
@@ -27,7 +26,6 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -71,7 +69,7 @@ public class ViewEvaluatorFactoryTest {
.apply(GroupByKey.<Void, String>create())
.apply(Values.<Iterable<String>>create());
PCollectionView<Iterable<String>> view =
- concat.apply(new ViewEvaluatorFactory.WriteView<>(createView));
+ concat.apply(new ViewOverrideFactory.WriteView<>(createView));
EvaluationContext context = mock(EvaluationContext.class);
TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
@@ -94,13 +92,6 @@ public class ViewEvaluatorFactoryTest {
WindowedValue.valueInGlobalWindow("foo"), WindowedValue.valueInGlobalWindow("bar")));
}
- @Test
- public void overrideFactoryGetInputSucceeds() {
- ViewOverrideFactory<String, String> factory = new ViewOverrideFactory<>();
- PCollection<String> input = p.apply(Create.of("foo", "bar"));
- assertThat(factory.getInput(input.expand(), p), equalTo(input));
- }
-
private static class TestViewWriter<ElemT, ViewT> implements PCollectionViewWriter<ElemT, ViewT> {
private Iterable<WindowedValue<ElemT>> latest;
http://git-wip-us.apache.org/repos/asf/beam/blob/69d0b307/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
new file mode 100644
index 0000000..6dcc13c
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ViewOverrideFactory}. */
+@RunWith(JUnit4.class)
+public class ViewOverrideFactoryTest implements Serializable {
+ @Rule
+ public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ private transient ViewOverrideFactory<Integer, List<Integer>> factory =
+ new ViewOverrideFactory<>();
+
+ @Test
+ public void replacementSucceeds() {
+ PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
+ final PCollectionView<List<Integer>> view =
+ PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
+ PTransform<PCollection<Integer>, PCollectionView<List<Integer>>> replacementTransform =
+ factory.getReplacementTransform(CreatePCollectionView.<Integer, List<Integer>>of(view));
+ PCollectionView<List<Integer>> afterReplacement = ints.apply(replacementTransform);
+ assertThat(
+ "The CreatePCollectionView replacement should return the same View",
+ afterReplacement,
+ equalTo(view));
+
+ PCollection<Set<Integer>> outputViewContents =
+ p.apply("CreateSingleton", Create.of(0))
+ .apply(
+ "OutputContents",
+ ParDo.of(
+ new DoFn<Integer, Set<Integer>>() {
+ @ProcessElement
+ public void outputSideInput(ProcessContext context) {
+ context.output(ImmutableSet.copyOf(context.sideInput(view)));
+ }
+ })
+ .withSideInputs(view));
+ PAssert.thatSingleton(outputViewContents).isEqualTo(ImmutableSet.of(1, 2, 3));
+
+ p.run();
+ }
+
+ @Test
+ public void replacementGetViewReturnsOriginal() {
+ final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
+ final PCollectionView<List<Integer>> view =
+ PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
+ PTransform<PCollection<Integer>, PCollectionView<List<Integer>>> replacement =
+ factory.getReplacementTransform(CreatePCollectionView.<Integer, List<Integer>>of(view));
+ ints.apply(replacement);
+ final AtomicBoolean writeViewVisited = new AtomicBoolean();
+ p.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ if (node.getTransform() instanceof WriteView) {
+ assertThat(
+ "There should only be one WriteView primitive in the graph",
+ writeViewVisited.getAndSet(true),
+ is(false));
+ PCollectionView replacementView = ((WriteView) node.getTransform()).getView();
+ assertThat(replacementView, Matchers.<PCollectionView>theInstance(view));
+ assertThat(node.getInputs(), hasSize(1));
+ }
+ }
+ });
+
+ assertThat(writeViewVisited.get(), is(true));
+ }
+
+ @Test
+ public void overrideFactoryGetInputSucceeds() {
+ ViewOverrideFactory<String, String> factory = new ViewOverrideFactory<>();
+ PCollection<String> input = p.apply(Create.of("foo", "bar"));
+ assertThat(factory.getInput(input.expand(), p), equalTo(input));
+ }
+}