You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/06/08 16:14:12 UTC

[3/3] beam git commit: Expand all PValues to component PCollections always

Expand all PValues to component PCollections always

Update the implementation of WriteView

The PCollectionView is constructed within the composite override, but
WriteView just produces a primitive PCollection which has no consumers.
Track the ViewWriter within the Direct Runner, and utilize that
transform rather than the producer to update PCollection Watermarks.

Remove most Flink View overrides. All of the overrides are materially
identical within the flink runner, so use a single override to replace
all of them.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ccf73448
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ccf73448
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ccf73448

Branch: refs/heads/master
Commit: ccf7344820d6c69ca922aa3176dc141718382629
Parents: 86e1fab
Author: Thomas Groh <tg...@google.com>
Authored: Thu Jun 1 18:39:58 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Jun 8 09:13:57 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |  59 +--
 .../translation/ApexPipelineTranslator.java     |  16 +-
 .../construction/RunnerPCollectionView.java     |   8 +
 .../apache/beam/runners/direct/DirectGraph.java |   4 +
 .../beam/runners/direct/DirectGraphVisitor.java |  22 +-
 .../runners/direct/ViewEvaluatorFactory.java    |   8 +-
 .../runners/direct/ViewOverrideFactory.java     |  29 +-
 .../beam/runners/direct/DirectGraphs.java       |   7 +
 .../runners/direct/EvaluationContextTest.java   |   5 +-
 .../ImmutabilityEnforcementFactoryTest.java     |   4 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   1 +
 .../runners/direct/TransformExecutorTest.java   |   1 +
 .../direct/ViewEvaluatorFactoryTest.java        |   5 +-
 .../runners/direct/ViewOverrideFactoryTest.java |  16 +-
 .../direct/WatermarkCallbackExecutorTest.java   |   1 +
 .../runners/direct/WatermarkManagerTest.java    |   1 +
 .../runners/flink/CreateStreamingFlinkView.java | 154 ++++++++
 .../flink/FlinkStreamingPipelineTranslator.java |  36 +-
 .../FlinkStreamingTransformTranslators.java     |   8 +-
 .../flink/FlinkStreamingViewOverrides.java      | 372 -------------------
 .../runners/dataflow/BatchViewOverrides.java    | 182 +++------
 .../runners/dataflow/CreateDataflowView.java    |   8 +-
 .../dataflow/DataflowPipelineTranslator.java    |  11 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  17 +-
 .../dataflow/StreamingViewOverrides.java        |  10 +-
 .../DataflowPipelineTranslatorTest.java         |   6 +-
 .../spark/translation/TransformTranslator.java  |  50 +--
 .../beam/sdk/runners/TransformHierarchy.java    |  46 ++-
 .../org/apache/beam/sdk/transforms/Combine.java |  17 +-
 .../org/apache/beam/sdk/transforms/View.java    |  38 +-
 .../org/apache/beam/sdk/values/PCollection.java |  12 +
 .../beam/sdk/values/PCollectionViews.java       |  14 +
 .../org/apache/beam/sdk/values/PValueBase.java  |  12 -
 .../sdk/testing/PCollectionViewTesting.java     |   8 +
 34 files changed, 458 insertions(+), 730 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index c595b3f..95b354a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -62,8 +62,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.View.AsIterable;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -214,7 +212,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
    * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
    */
   public static class CreateApexPCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+      extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
     private static final long serialVersionUID = 1L;
     private PCollectionView<ViewT> view;
 
@@ -228,7 +226,13 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+    public PCollection<ElemT> expand(PCollection<ElemT> input) {
+      return PCollection.<ElemT>createPrimitiveOutputInternal(
+              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+          .setCoder(input.getCoder());
+    }
+
+    public PCollectionView<ViewT> getView() {
       return view;
     }
   }
@@ -241,7 +245,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   }
 
   private static class StreamingWrapSingletonInList<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
+      extends PTransform<PCollection<T>, PCollection<T>> {
     private static final long serialVersionUID = 1L;
     CreatePCollectionView<T, T> transform;
 
@@ -254,10 +258,11 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      return input
+    public PCollection<T> expand(PCollection<T> input) {
+      input
           .apply(ParDo.of(new WrapAsList<T>()))
-          .apply(CreateApexPCollectionView.<T, T>of(transform.getView()));
+          .apply(CreateApexPCollectionView.<List<T>, T>of(transform.getView()));
+      return input;
     }
 
     @Override
@@ -267,15 +272,12 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
     static class Factory<T>
         extends SingleInputOutputOverrideFactory<
-            PCollection<T>, PCollectionView<T>,
+            PCollection<T>, PCollection<T>,
             CreatePCollectionView<T, T>> {
       @Override
-      public PTransformReplacement<PCollection<T>, PCollectionView<T>>
-          getReplacementTransform(
-              AppliedPTransform<
-                      PCollection<T>, PCollectionView<T>,
-                      CreatePCollectionView<T, T>>
-                  transform) {
+      public PTransformReplacement<PCollection<T>, PCollection<T>> getReplacementTransform(
+          AppliedPTransform<PCollection<T>, PCollection<T>, CreatePCollectionView<T, T>>
+              transform) {
         return PTransformReplacement.of(
             PTransformReplacements.getSingletonMainInput(transform),
             new StreamingWrapSingletonInList<>(transform.getTransform()));
@@ -284,18 +286,19 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   }
 
   private static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+      extends PTransform<PCollection<T>, PCollection<T>> {
     private static final long serialVersionUID = 1L;
+    private final PCollectionView<Iterable<T>> view;
 
-    private StreamingViewAsIterable() {}
+    private StreamingViewAsIterable(PCollectionView<Iterable<T>> view) {
+      this.view = view;
+    }
 
     @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateApexPCollectionView.<T, Iterable<T>> of(view));
+    public PCollection<T> expand(PCollection<T> input) {
+      return ((PCollection<T>)
+              input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()))
+          .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view));
     }
 
     @Override
@@ -305,15 +308,17 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
     static class Factory<T>
         extends SingleInputOutputOverrideFactory<
-            PCollection<T>, PCollectionView<Iterable<T>>, View.AsIterable<T>> {
+            PCollection<T>, PCollection<T>, CreatePCollectionView<T, Iterable<T>>> {
       @Override
-      public PTransformReplacement<PCollection<T>, PCollectionView<Iterable<T>>>
+      public PTransformReplacement<PCollection<T>, PCollection<T>>
           getReplacementTransform(
-              AppliedPTransform<PCollection<T>, PCollectionView<Iterable<T>>, AsIterable<T>>
+              AppliedPTransform<
+                      PCollection<T>, PCollection<T>,
+                      CreatePCollectionView<T, Iterable<T>>>
                   transform) {
         return PTransformReplacement.of(
             PTransformReplacements.getSingletonMainInput(transform),
-            new StreamingViewAsIterable<T>());
+            new StreamingViewAsIterable<T>(transform.getTransform().getView()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index bda074b..02f53ec 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,7 +153,6 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
           unboundedSource, true, context.getPipelineOptions());
       context.addOperator(operator, operator.output);
     }
-
   }
 
   private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
@@ -162,11 +160,10 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void translate(CreateApexPCollectionView<ElemT, ViewT> transform,
-        TranslationContext context) {
-      PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
-      context.addView(view);
-      LOG.debug("view {}", view.getName());
+    public void translate(
+        CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context) {
+      context.addView(transform.getView());
+      LOG.debug("view {}", transform.getView().getName());
     }
   }
 
@@ -177,9 +174,8 @@ public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
     @Override
     public void translate(
         CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
-      PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
-      context.addView(view);
-      LOG.debug("view {}", view.getName());
+      context.addView(transform.getView());
+      LOG.debug("view {}", transform.getView().getName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index 89e8784..c359cec 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.core.construction;
 
+import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
@@ -26,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.PValueBase;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -85,4 +87,10 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
   public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
     return coder;
   }
+
+  @Override
+  public Map<TupleTag<?>, PValue> expand() {
+    throw new UnsupportedOperationException(String.format(
+        "A %s cannot be expanded", RunnerPCollectionView.class.getSimpleName()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
index c2c0afa..9ca745d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
@@ -79,6 +79,10 @@ class DirectGraph {
     return rootTransforms;
   }
 
+  Set<PCollection<?>> getPCollections() {
+    return producers.keySet();
+  }
+
   Set<PCollectionView<?>> getViews() {
     return viewWriters.keySet();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index d54de5d..07bcf06 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -21,15 +21,18 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Sets;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
@@ -44,6 +47,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   private Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
   private Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters = new HashMap<>();
+  private Set<PCollectionView<?>> consumedViews = new HashSet<>();
 
   private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers =
       ArrayListMultimap.create();
@@ -73,6 +77,13 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
         getClass().getSimpleName());
     if (node.isRootNode()) {
       finalized = true;
+      checkState(
+          viewWriters.keySet().containsAll(consumedViews),
+          "All %ss that are consumed must be written by some %s %s: Missing %s",
+          PCollectionView.class.getSimpleName(),
+          WriteView.class.getSimpleName(),
+          PTransform.class.getSimpleName(),
+          Sets.difference(consumedViews, viewWriters.keySet()));
     }
   }
 
@@ -86,11 +97,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
       for (PValue value : node.getInputs().values()) {
         primitiveConsumers.put(value, appliedTransform);
       }
-      if (node.getTransform() instanceof ViewOverrideFactory.WriteView) {
-        viewWriters.put(
-            ((ViewOverrideFactory.WriteView<?, ?>) node.getTransform()).getView(),
-            node.toAppliedPTransform(getPipeline()));
-      }
+    }
+    if (node.getTransform() instanceof ParDo.MultiOutput) {
+      consumedViews.addAll(((ParDo.MultiOutput<?, ?>) node.getTransform()).getSideInputs());
+    } else if (node.getTransform() instanceof ViewOverrideFactory.WriteView) {
+      viewWriters.put(
+          ((WriteView) node.getTransform()).getView(), node.toAppliedPTransform(getPipeline()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 057f4a1..8a281a7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link CreatePCollectionView}
@@ -60,12 +59,13 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   public void cleanup() throws Exception {}
 
   private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
-      final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
+      final AppliedPTransform<
+              PCollection<Iterable<InT>>, PCollection<Iterable<InT>>, WriteView<InT, OuT>>
           application) {
     PCollection<Iterable<InT>> input =
         (PCollection<Iterable<InT>>) Iterables.getOnlyElement(application.getInputs().values());
-    final PCollectionViewWriter<InT, OuT> writer = context.createPCollectionViewWriter(input,
-        (PCollectionView<OuT>) Iterables.getOnlyElement(application.getOutputs().values()));
+    final PCollectionViewWriter<InT, OuT> writer =
+        context.createPCollectionViewWriter(input, application.getTransform().getView());
     return new TransformEvaluator<Iterable<InT>>() {
       private final List<WindowedValue<InT>> elements = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index fdff63d..06a7388 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -18,11 +18,11 @@
 
 package org.apache.beam.runners.direct;
 
-import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -43,12 +43,12 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 class ViewOverrideFactory<ElemT, ViewT>
     implements PTransformOverrideFactory<
-        PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
+        PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
 
   @Override
-  public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
+  public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
       AppliedPTransform<
-              PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>>
+              PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
           transform) {
     return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
@@ -57,13 +57,13 @@ class ViewOverrideFactory<ElemT, ViewT>
 
   @Override
   public Map<PValue, ReplacementOutput> mapOutputs(
-      Map<TupleTag<?>, PValue> outputs, PCollectionView<ViewT> newOutput) {
-    return Collections.emptyMap();
+      Map<TupleTag<?>, PValue> outputs, PCollection<ElemT> newOutput) {
+    return ReplacementOutputs.singleton(outputs, newOutput);
   }
 
   /** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */
   static class GroupAndWriteView<ElemT, ViewT>
-      extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+      extends ForwardingPTransform<PCollection<ElemT>, PCollection<ElemT>> {
     private final CreatePCollectionView<ElemT, ViewT> og;
 
     private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) {
@@ -71,17 +71,18 @@ class ViewOverrideFactory<ElemT, ViewT>
     }
 
     @Override
-    public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
-      return input
+    public PCollection<ElemT> expand(final PCollection<ElemT> input) {
+      input
           .apply(WithKeys.<Void, ElemT>of((Void) null))
           .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
           .apply(GroupByKey.<Void, ElemT>create())
           .apply(Values.<Iterable<ElemT>>create())
           .apply(new WriteView<ElemT, ViewT>(og));
+      return input;
     }
 
     @Override
-    protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
+    protected PTransform<PCollection<ElemT>, PCollection<ElemT>> delegate() {
       return og;
     }
   }
@@ -94,7 +95,7 @@ class ViewOverrideFactory<ElemT, ViewT>
    * to {@link ViewT}.
    */
   static final class WriteView<ElemT, ViewT>
-      extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
+      extends RawPTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> {
     private final CreatePCollectionView<ElemT, ViewT> og;
 
     WriteView(CreatePCollectionView<ElemT, ViewT> og) {
@@ -103,8 +104,10 @@ class ViewOverrideFactory<ElemT, ViewT>
 
     @Override
     @SuppressWarnings("deprecation")
-    public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> input) {
-      return og.getView();
+    public PCollection<Iterable<ElemT>> expand(PCollection<Iterable<ElemT>> input) {
+      return PCollection.<Iterable<ElemT>>createPrimitiveOutputInternal(
+              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+          .setCoder(input.getCoder());
     }
 
     @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
index 43de091..7707f7f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -25,6 +26,12 @@ import org.apache.beam.sdk.values.PValue;
 
 /** Test utilities for the {@link DirectRunner}. */
 final class DirectGraphs {
+  public static void performDirectOverrides(Pipeline p) {
+    p.replaceAll(
+        DirectRunner.fromOptions(PipelineOptionsFactory.create().as(DirectOptions.class))
+            .defaultTransformOverrides());
+  }
+
   public static DirectGraph getGraph(Pipeline p) {
     DirectGraphVisitor visitor = new DirectGraphVisitor();
     p.traverseTopologically(visitor);

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index c0e43d6..f3edf55 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -101,14 +101,13 @@ public class EvaluationContextTest {
     view = created.apply(View.<Integer>asIterable());
     unbounded = p.apply(GenerateSequence.from(0));
 
-    p.replaceAll(
-        DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
-            .defaultTransformOverrides());
+    p.replaceAll(runner.defaultTransformOverrides());
 
     KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = KeyedPValueTrackingVisitor.create();
     p.traverseTopologically(keyedPValueTrackingVisitor);
 
     BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+    DirectGraphs.performDirectOverrides(p);
     graph = DirectGraphs.getGraph(p);
     context =
         EvaluationContext.create(

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index c0919b9..365b6c4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -64,7 +64,9 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
                         c.element()[0] = 'b';
                       }
                     }));
-    consumer = DirectGraphs.getProducer(pcollection.apply(Count.<byte[]>globally()));
+    PCollection<Long> consumer = pcollection.apply(Count.<byte[]>globally());
+    DirectGraphs.performDirectOverrides(p);
+    this.consumer = DirectGraphs.getProducer(consumer);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 09a21ac..df84cbf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -149,6 +149,7 @@ public class ParDoEvaluatorTest {
                 Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class)))
         .thenReturn(executionContext);
 
+    DirectGraphs.performDirectOverrides(p);
     @SuppressWarnings("unchecked")
     AppliedPTransform<PCollection<Integer>, ?, ?> transform =
         (AppliedPTransform<PCollection<Integer>, ?, ?>) DirectGraphs.getProducer(output);

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 86412a0..3dd4028 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -90,6 +90,7 @@ public class TransformExecutorTest {
     created = p.apply(Create.of("foo", "spam", "third"));
     PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3));
 
+    DirectGraphs.performDirectOverrides(p);
     DirectGraph graph = DirectGraphs.getGraph(p);
     createdProducer = graph.getProducer(created);
     downstreamProducer = graph.getProducer(downstream);

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 419698e..ad1aecc 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -66,12 +65,12 @@ public class ViewEvaluatorFactoryTest {
             .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
             .apply(GroupByKey.<Void, String>create())
             .apply(Values.<Iterable<String>>create());
-    PCollectionView<Iterable<String>> view =
+    PCollection<Iterable<String>> view =
         concat.apply(new ViewOverrideFactory.WriteView<>(createView));
 
     EvaluationContext context = mock(EvaluationContext.class);
     TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
-    when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter);
+    when(context.createPCollectionViewWriter(concat, createView.getView())).thenReturn(viewWriter);
 
     CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now());
     AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view);

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 024e15c..94728c7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.direct;
 
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
@@ -63,11 +62,11 @@ public class ViewOverrideFactoryTest implements Serializable {
     PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
     final PCollectionView<List<Integer>> view =
         PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
-    PTransformReplacement<PCollection<Integer>, PCollectionView<List<Integer>>>
+    PTransformReplacement<PCollection<Integer>, PCollection<Integer>>
         replacementTransform =
             factory.getReplacementTransform(
                 AppliedPTransform
-                    .<PCollection<Integer>, PCollectionView<List<Integer>>,
+                    .<PCollection<Integer>, PCollection<Integer>,
                         CreatePCollectionView<Integer, List<Integer>>>
                         of(
                             "foo",
@@ -75,12 +74,7 @@ public class ViewOverrideFactoryTest implements Serializable {
                             view.expand(),
                             CreatePCollectionView.<Integer, List<Integer>>of(view),
                             p));
-    PCollectionView<List<Integer>> afterReplacement =
-        ints.apply(replacementTransform.getTransform());
-    assertThat(
-        "The CreatePCollectionView replacement should return the same View",
-        afterReplacement,
-        equalTo(view));
+    ints.apply(replacementTransform.getTransform());
 
     PCollection<Set<Integer>> outputViewContents =
         p.apply("CreateSingleton", Create.of(0))
@@ -104,10 +98,10 @@ public class ViewOverrideFactoryTest implements Serializable {
     final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
     final PCollectionView<List<Integer>> view =
         PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
-    PTransformReplacement<PCollection<Integer>, PCollectionView<List<Integer>>> replacement =
+    PTransformReplacement<PCollection<Integer>, PCollection<Integer>> replacement =
         factory.getReplacementTransform(
             AppliedPTransform
-                .<PCollection<Integer>, PCollectionView<List<Integer>>,
+                .<PCollection<Integer>, PCollection<Integer>,
                     CreatePCollectionView<Integer, List<Integer>>>
                     of(
                         "foo",

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
index b667346..1d8aac1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -59,6 +59,7 @@ public class WatermarkCallbackExecutorTest {
   public void setup() {
     PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
     PCollection<Integer> summed = created.apply(Sum.integersGlobally());
+    DirectGraphs.performDirectOverrides(p);
     DirectGraph graph = DirectGraphs.getGraph(p);
     create = graph.getProducer(created);
     sum = graph.getProducer(summed);

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 9528ac9..e0b5251 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -121,6 +121,7 @@ public class WatermarkManagerTest implements Serializable {
     flattened = preFlatten.apply("flattened", Flatten.<Integer>pCollections());
 
     clock = MockClock.fromInstant(new Instant(1000));
+    DirectGraphs.performDirectOverrides(p);
     graph = DirectGraphs.getGraph(p);
 
     manager = WatermarkManager.create(clock, graph);

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
new file mode 100644
index 0000000..0cc3aec
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/** Flink streaming overrides for various view (side input) transforms. */
+class CreateStreamingFlinkView<ElemT, ViewT>
+    extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+  private final PCollectionView<ViewT> view;
+
+  public CreateStreamingFlinkView(PCollectionView<ViewT> view) {
+    this.view = view;
+  }
+
+  @Override
+  public PCollection<ElemT> expand(PCollection<ElemT> input) {
+    input
+        .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
+        .apply(CreateFlinkPCollectionView.<ElemT, ViewT>of(view));
+    return input;
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   *
+   * <p>For internal use by {@link CreateStreamingFlinkView}. This combiner requires that the input
+   * {@link PCollection} fits in memory. For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<T>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   *
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+   */
+  public static class CreateFlinkPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollection<List<ElemT>>> {
+    private PCollectionView<ViewT> view;
+
+    private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateFlinkPCollectionView<>(view);
+    }
+
+    @Override
+    public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) {
+      return PCollection.<List<ElemT>>createPrimitiveOutputInternal(
+              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+          .setCoder(input.getCoder());
+    }
+
+    public PCollectionView<ViewT> getView() {
+      return view;
+    }
+  }
+
+  public static class Factory<ElemT, ViewT>
+      implements PTransformOverrideFactory<
+          PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
+    public Factory() {}
+
+    @Override
+    public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
+        AppliedPTransform<
+                PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
+            transform) {
+      return PTransformReplacement.of(
+          (PCollection<ElemT>) Iterables.getOnlyElement(transform.getInputs().values()),
+          new CreateStreamingFlinkView<ElemT, ViewT>(transform.getTransform().getView()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<ElemT> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 8da68c5..a88ff07 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -33,10 +33,9 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -85,37 +84,8 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
                     new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
             .add(
                 PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsIterable.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsList.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner)))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsMap.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner)))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsMultimap.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner)))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner)))
-            // this has to be last since the ViewAsSingleton override
-            // can expand to a Combine.GloballyAsSingletonView
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-                    new ReflectiveOneToOneOverrideFactory(
-                        FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
-                        flinkRunner)))
+                    PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+                    new CreateStreamingFlinkView.Factory()))
             .build();
 
     // Ensure all outputs of all reads are consumed.

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 2a7c5d6..ef46b63 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -124,7 +124,7 @@ class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
     TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
     TRANSLATORS.put(
-        FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
+        CreateStreamingFlinkView.CreateFlinkPCollectionView.class,
         new CreateViewStreamingTranslator());
 
     TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
@@ -584,17 +584,17 @@ class FlinkStreamingTransformTranslators {
 
   private static class CreateViewStreamingTranslator<ElemT, ViewT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
+      CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT>> {
 
     @Override
     public void translateNode(
-        FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform,
+        CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT> transform,
         FlinkStreamingTranslationContext context) {
       // just forward
       DataStream<WindowedValue<List<ElemT>>> inputDataSet =
           context.getInputDataStream(context.getInput(transform));
 
-      PCollectionView<ViewT> view = context.getOutput(transform);
+      PCollectionView<ViewT> view = transform.getView();
 
       context.setOutputDataStream(view, inputDataSet);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
deleted file mode 100644
index ce1c895..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-
-/**
- * Flink streaming overrides for various view (side input) transforms.
- */
-class FlinkStreamingViewOverrides {
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-   * for the Flink runner in streaming mode.
-   */
-  static class StreamingViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
-    private final transient FlinkRunner runner;
-
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMap";
-    }
-  }
-
-  /**
-   * Specialized expansion for {@link
-   * View.AsMultimap View.AsMultimap} for the
-   * Flink runner in streaming mode.
-   */
-  static class StreamingViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
-    private final transient FlinkRunner runner;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMultimap";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link View.AsList View.AsList} for the
-   * Flink runner in streaming mode.
-   */
-  static class StreamingViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
-
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsList";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link View.AsIterable View.AsIterable} for the
-   * Flink runner in streaming mode.
-   */
-  static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
-
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsIterable";
-    }
-  }
-
-  /**
-   * Specialized expansion for
-   * {@link View.AsSingleton View.AsSingleton} for the
-   * Flink runner in streaming mode.
-   */
-  static class StreamingViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-    private View.AsSingleton<T> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine.globally(
-          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one element "
-            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException(
-              "Empty PCollection accessed as a singleton view. "
-                  + "Consider setting withDefault to provide a default value");
-        }
-      }
-    }
-  }
-
-  static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingCombineGloballyAsSingletonView(
-        FlinkRunner runner,
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined =
-          input.apply(Combine.globally(transform.getCombineFn())
-              .withoutDefaults()
-              .withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined,
-          combined.getWindowingStrategy(),
-          transform.getInsertDefault(),
-          transform.getInsertDefault()
-              ? transform.getCombineFn().defaultValue() : null,
-          combined.getCoder());
-      return combined
-          .apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
-    }
-  }
-
-  private static class WrapAsList<T> extends DoFn<T, List<T>> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(Collections.singletonList(c.element()));
-    }
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   *
-   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
-   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
-   * They require the input {@link PCollection} fits in memory.
-   * For a large {@link PCollection} this is expected to crash!
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<T>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-
-  /**
-   * Creates a primitive {@link PCollectionView}.
-   *
-   * <p>For internal use only by runner implementors.
-   *
-   * @param <ElemT> The type of the elements of the input PCollection
-   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
-   */
-  public static class CreateFlinkPCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
-    private PCollectionView<ViewT> view;
-
-    private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
-      this.view = view;
-    }
-
-    public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
-        PCollectionView<ViewT> view) {
-      return new CreateFlinkPCollectionView<>(view);
-    }
-
-    @Override
-    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
-      return view;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index b4a6e64..ad3faed 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -39,8 +39,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.dataflow.internal.IsmFormat;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
@@ -57,17 +55,11 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
-import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.View.AsSingleton;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -83,7 +75,6 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -192,12 +183,13 @@ class BatchViewOverrides {
     }
 
     private final DataflowRunner runner;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
+    private final PCollectionView<Map<K, V>> view;
+    /** Builds an instance of this class from the overridden transform. */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchViewAsMap(DataflowRunner runner, View.AsMap<K, V> transform) {
+    public BatchViewAsMap(
+        DataflowRunner runner, CreatePCollectionView<KV<K, V>, Map<K, V>> transform) {
       this.runner = runner;
+      this.view = transform.getView();
     }
 
     @Override
@@ -207,12 +199,7 @@ class BatchViewOverrides {
 
     private <W extends BoundedWindow> PCollectionView<Map<K, V>>
     applyInternal(PCollection<KV<K, V>> input) {
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
       try {
-        PCollectionView<Map<K, V>> view = PCollectionViews.mapView(
-            input, input.getWindowingStrategy(), inputCoder);
         return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */);
       } catch (NonDeterministicException e) {
         runner.recordViewUsesNonDeterministicKeyCoder(this);
@@ -249,19 +236,14 @@ class BatchViewOverrides {
                   inputCoder.getKeyCoder(),
                   FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder)));
 
-      TransformedMap<K, WindowedValue<V>, V> defaultValue = new TransformedMap<>(
-          WindowedValueToValue.<V>of(),
-          ImmutableMap.<K, WindowedValue<V>>of());
-
       return BatchViewAsSingleton.<KV<K, V>, TransformedMap<K, WindowedValue<V>, V>,
           Map<K, V>,
           W> applyForSingleton(
           runner,
           input,
           new ToMapDoFn<K, V, W>(windowCoder),
-          true,
-          defaultValue,
-          finalValueCoder);
+          finalValueCoder,
+          view);
     }
   }
 
@@ -680,12 +662,13 @@ class BatchViewOverrides {
     }
 
     private final DataflowRunner runner;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
+    private final PCollectionView<Map<K, Iterable<V>>> view;
+    /** Builds an instance of this class from the overridden transform. */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchViewAsMultimap(DataflowRunner runner, View.AsMultimap<K, V> transform) {
+    public BatchViewAsMultimap(
+        DataflowRunner runner, CreatePCollectionView<KV<K, V>, Map<K, Iterable<V>>> transform) {
       this.runner = runner;
+      this.view = transform.getView();
     }
 
     @Override
@@ -695,12 +678,7 @@ class BatchViewOverrides {
 
     private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
     applyInternal(PCollection<KV<K, V>> input) {
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
       try {
-        PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView(
-            input, input.getWindowingStrategy(), inputCoder);
-
         return applyForMapLike(runner, input, view, false /* unique keys not expected */);
       } catch (NonDeterministicException e) {
         runner.recordViewUsesNonDeterministicKeyCoder(this);
@@ -738,16 +716,15 @@ class BatchViewOverrides {
               IterableWithWindowedValuesToIterable.<V>of(),
               ImmutableMap.<K, Iterable<WindowedValue<V>>>of());
 
-      return BatchViewAsSingleton.<KV<K, V>,
-          TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>>,
-          Map<K, Iterable<V>>,
-          W> applyForSingleton(
-          runner,
-          input,
-          new ToMultimapDoFn<K, V, W>(windowCoder),
-          true,
-          defaultValue,
-          finalValueCoder);
+      return BatchViewAsSingleton
+          .<KV<K, V>, TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>>,
+              Map<K, Iterable<V>>, W>
+              applyForSingleton(
+                  runner,
+                  input,
+                  new ToMultimapDoFn<K, V, W>(windowCoder),
+                  finalValueCoder,
+                  view);
     }
 
     private static <K, V, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForMapLike(
@@ -827,10 +804,9 @@ class BatchViewOverrides {
           PCollectionList.of(ImmutableList.of(
               perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata));
 
-      return Pipeline.applyTransform(outputs,
-          Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
-          .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>,
-              ViewT>of(view));
+      Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
+          .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>of(view));
+      return view;
     }
 
     @Override
@@ -915,14 +891,12 @@ class BatchViewOverrides {
     }
 
     private final DataflowRunner runner;
-    private final View.AsSingleton<T> transform;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
+    private final PCollectionView<T> view;
+    /** Builds an instance of this class from the overridden transform. */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchViewAsSingleton(DataflowRunner runner, View.AsSingleton<T> transform) {
+    public BatchViewAsSingleton(DataflowRunner runner, CreatePCollectionView<T, T> transform) {
       this.runner = runner;
-      this.transform = transform;
+      this.view = transform.getView();
     }
 
     @Override
@@ -935,9 +909,8 @@ class BatchViewOverrides {
           runner,
           input,
           new IsmRecordForSingularValuePerWindowDoFn<T, BoundedWindow>(windowCoder),
-          transform.hasDefaultValue(),
-          transform.defaultValue(),
-          input.getCoder());
+          input.getCoder(),
+          view);
     }
 
     static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT>
@@ -946,23 +919,13 @@ class BatchViewOverrides {
         PCollection<T> input,
         DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
             IsmRecord<WindowedValue<FinalT>>> doFn,
-        boolean hasDefault,
-        FinalT defaultValue,
-        Coder<FinalT> defaultValueCoder) {
+        Coder<FinalT> defaultValueCoder,
+        PCollectionView<ViewT> view) {
 
       @SuppressWarnings("unchecked")
       Coder<W> windowCoder = (Coder<W>)
           input.getWindowingStrategy().getWindowFn().windowCoder();
 
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      PCollectionView<ViewT> view =
-          (PCollectionView<ViewT>) PCollectionViews.<FinalT, W>singletonView(
-              (PCollection) input,
-              (WindowingStrategy) input.getWindowingStrategy(),
-              hasDefault,
-              defaultValue,
-              defaultValueCoder);
-
       IsmRecordCoder<WindowedValue<FinalT>> ismCoder =
           coderForSingleton(windowCoder, defaultValueCoder);
 
@@ -972,8 +935,9 @@ class BatchViewOverrides {
       reifiedPerWindowAndSorted.setCoder(ismCoder);
 
       runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
-      return reifiedPerWindowAndSorted.apply(
+      reifiedPerWindowAndSorted.apply(
           CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
+      return view;
     }
 
     @Override
@@ -1079,18 +1043,18 @@ class BatchViewOverrides {
     }
 
     private final DataflowRunner runner;
+    private final PCollectionView<List<T>> view;
     /**
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchViewAsList(DataflowRunner runner, View.AsList<T> transform) {
+    public BatchViewAsList(DataflowRunner runner, CreatePCollectionView<T, List<T>> transform) {
       this.runner = runner;
+      this.view = transform.getView();
     }
 
     @Override
     public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollectionView<List<T>> view = PCollectionViews.listView(
-          input, input.getWindowingStrategy(), input.getCoder());
       return applyForIterableLike(runner, input, view);
     }
 
@@ -1116,8 +1080,9 @@ class BatchViewOverrides {
         reifiedPerWindowAndSorted.setCoder(ismCoder);
 
         runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
-        return reifiedPerWindowAndSorted.apply(
+        reifiedPerWindowAndSorted.apply(
             CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
+        return view;
       }
 
       PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input
@@ -1126,8 +1091,9 @@ class BatchViewOverrides {
       reifiedPerWindowAndSorted.setCoder(ismCoder);
 
       runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
-      return reifiedPerWindowAndSorted.apply(
+      reifiedPerWindowAndSorted.apply(
           CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
+      return view;
     }
 
     @Override
@@ -1164,18 +1130,17 @@ class BatchViewOverrides {
       extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
 
     private final DataflowRunner runner;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
+    private final PCollectionView<Iterable<T>> view;
+    /** Builds an instance of this class from the overridden transform. */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchViewAsIterable(DataflowRunner runner, View.AsIterable<T> transform) {
+    public BatchViewAsIterable(
+        DataflowRunner runner, CreatePCollectionView<T, Iterable<T>> transform) {
       this.runner = runner;
+      this.view = transform.getView();
     }
 
     @Override
     public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
-          input, input.getWindowingStrategy(), input.getCoder());
       return BatchViewAsList.applyForIterableLike(runner, input, view);
     }
   }
@@ -1377,59 +1342,4 @@ class BatchViewOverrides {
       verifyDeterministic(this, "Expected map coder to be deterministic.", originalMapCoder);
     }
   }
-
-  static class BatchCombineGloballyAsSingletonViewFactory<ElemT, ViewT>
-      extends SingleInputOutputOverrideFactory<
-          PCollection<ElemT>, PCollectionView<ViewT>,
-          Combine.GloballyAsSingletonView<ElemT, ViewT>> {
-    private final DataflowRunner runner;
-
-    BatchCombineGloballyAsSingletonViewFactory(DataflowRunner runner) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>>
-        getReplacementTransform(
-            AppliedPTransform<
-                    PCollection<ElemT>, PCollectionView<ViewT>,
-                    GloballyAsSingletonView<ElemT, ViewT>>
-                transform) {
-      GloballyAsSingletonView<ElemT, ViewT> combine = transform.getTransform();
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          new BatchCombineGloballyAsSingletonView<>(
-              runner, combine.getCombineFn(), combine.getFanout(), combine.getInsertDefault()));
-    }
-
-    private static class BatchCombineGloballyAsSingletonView<ElemT, ViewT>
-        extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
-      private final DataflowRunner runner;
-      private final GlobalCombineFn<? super ElemT, ?, ViewT> combineFn;
-      private final int fanout;
-      private final boolean insertDefault;
-
-      BatchCombineGloballyAsSingletonView(
-          DataflowRunner runner,
-          GlobalCombineFn<? super ElemT, ?, ViewT> combineFn,
-          int fanout,
-          boolean insertDefault) {
-        this.runner = runner;
-        this.combineFn = combineFn;
-        this.fanout = fanout;
-        this.insertDefault = insertDefault;
-      }
-
-      @Override
-      public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
-        PCollection<ViewT> combined =
-            input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout));
-        AsSingleton<ViewT> viewAsSingleton = View.asSingleton();
-        if (insertDefault) {
-          viewAsSingleton.withDefaultValue(combineFn.defaultValue());
-        }
-        return combined.apply(new BatchViewAsSingleton<>(runner, viewAsSingleton));
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index e7542cb..caad7f8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 
 /** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
 public class CreateDataflowView<ElemT, ViewT>
-    extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+    extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
   public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) {
     return new CreateDataflowView<>(view);
   }
@@ -36,8 +36,10 @@ public class CreateDataflowView<ElemT, ViewT>
   }
 
   @Override
-  public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
-    return view;
+  public PCollection<ElemT> expand(PCollection<ElemT> input) {
+    return PCollection.<ElemT>createPrimitiveOutputInternal(
+            input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+        .setCoder(input.getCoder());
   }
 
   public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 8eaf61b..a3a7ab6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -440,6 +440,14 @@ public class DataflowPipelineTranslator {
     public void visitValue(PValue value, TransformHierarchy.Node producer) {
       LOG.debug("Checking translation of {}", value);
       // Primitive transforms are the only ones assigned step names.
+      if (producer.getTransform() instanceof CreateDataflowView) {
+        // CreateDataflowView produces a dummy output (as it must be a primitive transform) but
+        // in the Dataflow Job graph produces only the view and not the output PCollection.
+        asOutputReference(
+            ((CreateDataflowView) producer.getTransform()).getView(),
+            producer.toAppliedPTransform(getPipeline()));
+        return;
+      }
       asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
     }
 
@@ -465,6 +473,7 @@ public class DataflowPipelineTranslator {
       StepTranslator stepContext = new StepTranslator(this, step);
       stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform));
       stepContext.addDisplayData(step, stepName, transform);
+      LOG.info("Adding {} as step {}", getCurrentTransform(transform).getFullName(), stepName);
       return stepContext;
     }
 
@@ -677,7 +686,7 @@ public class DataflowPipelineTranslator {
                 context.addStep(transform, "CollectionToSingleton");
             PCollection<ElemT> input = context.getInput(transform);
             stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
-            stepContext.addCollectionToSingletonOutput(input, context.getOutput(transform));
+            stepContext.addCollectionToSingletonOutput(input, transform.getView());
           }
         });
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 3e7c8ce..ea9db24 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -67,7 +67,6 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
 import org.apache.beam.runners.core.construction.UnconsumedReads;
-import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
@@ -129,6 +128,7 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
@@ -350,34 +350,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               PTransformOverride.of(
                   PTransformMatchers.stateOrTimerParDoSingle(),
                   BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
-
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-                  new BatchCombineGloballyAsSingletonViewFactory(this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(View.AsMap.class),
+                  PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsMap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                  PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsMultimap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                  PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsSingleton.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(View.AsList.class),
+                  PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsList.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(View.AsIterable.class),
+                  PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
                   new ReflectiveOneToOneOverrideFactory(
                       BatchViewOverrides.BatchViewAsIterable.class, this)));
     }