You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/11/15 17:00:35 UTC

[1/7] beam git commit: Fix BatchViewOverrides ViewAsSingleton to apply the combine fn that was being replaced.

Repository: beam
Updated Branches:
  refs/heads/master 7ce0a82b0 -> 30886acee


Fix BatchViewOverrides ViewAsSingleton to apply the combine fn that was being replaced.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/accb2087
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/accb2087
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/accb2087

Branch: refs/heads/master
Commit: accb2087f88c641be9db038bbb5be715aacffb8d
Parents: c2f815c
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 14 19:19:48 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800

----------------------------------------------------------------------
 .../runners/dataflow/BatchViewOverrides.java    | 16 +++-
 .../runners/dataflow/CreateDataflowView.java    |  6 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 80 +++++++++++++++-----
 .../DataflowPipelineTranslatorTest.java         |  6 +-
 4 files changed, 80 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 8ed41cb..2953a42 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -55,6 +55,8 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -893,15 +895,23 @@ class BatchViewOverrides {
 
     private final DataflowRunner runner;
     private final PCollectionView<T> view;
-    /** Builds an instance of this class from the overridden transform. */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchViewAsSingleton(DataflowRunner runner, CreatePCollectionView<T, T> transform) {
+    private final CombineFn<T, ?, T> combineFn;
+    private final int fanout;
+
+    public BatchViewAsSingleton(
+        DataflowRunner runner,
+        CreatePCollectionView<T, T> transform,
+        CombineFn<T, ?, T> combineFn,
+        int fanout) {
       this.runner = runner;
       this.view = transform.getView();
+      this.combineFn = combineFn;
+      this.fanout = fanout;
     }
 
     @Override
     public PCollection<?> expand(PCollection<T> input) {
+      input = input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout));
       @SuppressWarnings("unchecked")
       Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
           input.getWindowingStrategy().getWindowFn().windowCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index 10888c2..f64f3fb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -25,11 +25,13 @@ import org.apache.beam.sdk.values.PCollectionView;
 /** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
 public class CreateDataflowView<ElemT, ViewT>
     extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
-  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) {
+  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(
+      PCollectionView<ViewT> view) {
     return new CreateDataflowView<>(view, false);
   }
 
-  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) {
+  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(
+      PCollectionView<ViewT> view) {
     return new CreateDataflowView<>(view, true);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 72e4f83..a650092 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -116,6 +116,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.state.MapState;
 import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -406,9 +407,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                       BatchViewOverrides.BatchViewAsMultimap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                  new ReflectiveViewOverrideFactory(
-                      BatchViewOverrides.BatchViewAsSingleton.class, this)))
+                  PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                  new CombineGloballyAsSingletonViewOverrideFactory(this)))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(View.AsList.class),
@@ -437,29 +437,58 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the
-   * new replacement transform uses the supplied PCollectionView and does not create another instance.
+   * Replace the {@link Combine.GloballyAsSingletonView} transform with a specialization which
+   * re-applies the {@link CombineFn} and adds a specialization specific to the Dataflow runner.
+   */
+  private static class CombineGloballyAsSingletonViewOverrideFactory<InputT, ViewT>
+      extends ReflectiveViewOverrideFactory<InputT, ViewT> {
+
+    private CombineGloballyAsSingletonViewOverrideFactory(DataflowRunner runner) {
+      super((Class) BatchViewOverrides.BatchViewAsSingleton.class, runner);
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+        AppliedPTransform<
+            PCollection<InputT>,
+            PValue,
+            PTransform<PCollection<InputT>, PValue>> transform) {
+      Combine.GloballyAsSingletonView<?, ?> combineTransform =
+          (Combine.GloballyAsSingletonView) transform.getTransform();
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new BatchViewOverrides.BatchViewAsSingleton(
+              runner,
+              findCreatePCollectionView(transform),
+              (CombineFn) combineTransform.getCombineFn(),
+              combineTransform.getFanout()));
+    }
+  }
+
+  /**
+   * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required
+   * that the new replacement transform uses the supplied PCollectionView and does not create
+   * another instance.
    */
   private static class ReflectiveViewOverrideFactory<InputT, ViewT>
       implements PTransformOverrideFactory<PCollection<InputT>,
       PValue, PTransform<PCollection<InputT>, PValue>> {
 
-    private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement;
-    private final DataflowRunner runner;
+    final Class<PTransform<PCollection<InputT>, PValue>> replacement;
+    final DataflowRunner runner;
 
     private ReflectiveViewOverrideFactory(
-        Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement,
+        Class<PTransform<PCollection<InputT>, PValue>> replacement,
         DataflowRunner runner) {
       this.replacement = replacement;
       this.runner = runner;
     }
 
-    @Override
-    public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
-         final AppliedPTransform<PCollection<InputT>,
-             PValue,
-             PTransform<PCollection<InputT>, PValue>> transform) {
-
+    CreatePCollectionView<ViewT, ViewT> findCreatePCollectionView(
+        final AppliedPTransform<
+            PCollection<InputT>,
+            PValue,
+            PTransform<PCollection<InputT>, PValue>> transform) {
       final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>();
       transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() {
         // Stores whether we have entered the expected composite view transform.
@@ -495,18 +524,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       checkState(viewTransformRef.get() != null,
           "Expected to find CreatePCollectionView contained within %s",
           transform.getTransform());
-      PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep =
+      return viewTransformRef.get();
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+         final AppliedPTransform<PCollection<InputT>,
+             PValue,
+             PTransform<PCollection<InputT>, PValue>> transform) {
+
+      PTransform<PCollection<InputT>, PValue> rep =
           InstanceBuilder.ofType(replacement)
               .withArg(DataflowRunner.class, runner)
-              .withArg(CreatePCollectionView.class, viewTransformRef.get())
+              .withArg(CreatePCollectionView.class, findCreatePCollectionView(transform))
               .build();
-      return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
     }
 
     @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
-      // We do not replace any of the outputs because we expect that the new PTransform will re-use the original
-      // PCollectionView that was returned.
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
+      // We do not replace any of the outputs because we expect that the new PTransform will
+      // re-use the original PCollectionView that was returned.
       return ImmutableMap.of();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index e03abb9..81e7a97 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(5, steps.size());
+    assertEquals(9, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
-    Step collectionToSingletonStep = steps.get(4);
+    Step collectionToSingletonStep = steps.get(8);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 


[4/7] beam git commit: [BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.

Posted by lc...@apache.org.
[BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e2593da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e2593da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e2593da

Branch: refs/heads/master
Commit: 5e2593daacec83e876b747d56d8c335531a54d1d
Parents: 7ce0a82
Author: Luke Cwik <lc...@google.com>
Authored: Fri Nov 10 11:08:24 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/ParDoTranslatorTest.java   |   3 +-
 .../core/construction/PTransformMatchers.java   |   3 +-
 .../core/construction/ParDoTranslation.java     |  17 +-
 .../CreatePCollectionViewTranslationTest.java   |  10 +-
 .../construction/PTransformMatchersTest.java    |  33 +-
 .../core/construction/ParDoTranslationTest.java |   7 +-
 .../core/InMemoryMultimapSideInputView.java     |  62 +++
 .../beam/runners/core/SideInputHandler.java     |  63 ++--
 .../core/InMemoryMultimapSideInputViewTest.java |  53 +++
 .../beam/runners/core/SideInputHandlerTest.java |  89 +++--
 .../beam/runners/direct/SideInputContainer.java |  38 +-
 .../runners/direct/EvaluationContextTest.java   |  44 ++-
 .../runners/direct/SideInputContainerTest.java  | 226 +++++------
 .../direct/ViewEvaluatorFactoryTest.java        |  13 +-
 .../runners/direct/ViewOverrideFactoryTest.java |   9 +-
 .../direct/WriteWithShardingFactoryTest.java    |   9 +-
 .../FlinkStreamingTransformTranslators.java     |   1 -
 .../functions/FlinkSideInputReader.java         |  27 +-
 .../functions/SideInputInitializer.java         |  50 ++-
 .../flink/streaming/DoFnOperatorTest.java       |  40 +-
 .../DataflowPipelineTranslatorTest.java         |  12 +-
 .../spark/translation/TransformTranslator.java  |   7 +-
 .../spark/util/SparkSideInputReader.java        |  50 ++-
 .../org/apache/beam/sdk/transforms/Combine.java |  13 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  20 +-
 .../beam/sdk/transforms/Materializations.java   |  29 +-
 .../org/apache/beam/sdk/transforms/View.java    |  67 +++-
 .../org/apache/beam/sdk/transforms/ViewFn.java  |   6 +-
 .../apache/beam/sdk/values/PCollectionView.java |   7 +-
 .../beam/sdk/values/PCollectionViews.java       | 256 ++++++-------
 .../sdk/testing/PCollectionViewTesting.java     | 375 +++----------------
 .../beam/sdk/transforms/DoFnTesterTest.java     |  12 +-
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  14 +-
 33 files changed, 809 insertions(+), 856 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
index 73382e3..4a4ca1d 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.apex.translation;
 
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -219,7 +220,7 @@ public class ParDoTranslatorTest {
     operator.beginWindow(0);
     WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
     WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow(
-        Lists.<Integer>newArrayList(22));
+        materializeValuesFor(View.asSingleton(), 22));
     operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input
 
     final List<Object> results = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 0d27241..42ac73f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 
@@ -304,7 +303,7 @@ public class PTransformMatchers {
         }
         CreatePCollectionView<?, ?> createView =
             (CreatePCollectionView<?, ?>) application.getTransform();
-        ViewFn<Iterable<WindowedValue<?>>, ?> viewFn = createView.getView().getViewFn();
+        ViewFn<?, ?> viewFn = createView.getView().getViewFn();
         return viewFn.getClass().equals(viewFnType);
       }
     };

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index f88cbe5..e00b912 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -50,7 +50,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput.Builder;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -73,8 +72,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
@@ -561,25 +558,19 @@ public class ParDoTranslation {
     ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
 
     WindowingStrategy<?, ?> windowingStrategy = pCollection.getWindowingStrategy().fixDefaults();
-    Coder<Iterable<WindowedValue<?>>> coder =
-        (Coder)
-            IterableCoder.of(
-                FullWindowedValueCoder.of(
-                    pCollection.getCoder(),
-                    pCollection.getWindowingStrategy().getWindowFn().windowCoder()));
     checkArgument(
-        sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
+        sideInput.getAccessPattern().getUrn().equals(Materializations.MULTIMAP_MATERIALIZATION_URN),
         "Unknown View Materialization URN %s",
         sideInput.getAccessPattern().getUrn());
 
     PCollectionView<?> view =
         new RunnerPCollectionView<>(
             pCollection,
-            (TupleTag<Iterable<WindowedValue<?>>>) tag,
-            (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
+            (TupleTag) tag,
+            (ViewFn) viewFn,
             windowMappingFn,
             windowingStrategy,
-            coder);
+            (Coder) pCollection.getCoder());
     return view;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
index df659a8..690e3ca 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
@@ -23,12 +23,14 @@ import static org.junit.Assert.assertThat;
 import com.google.common.collect.ImmutableList;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
@@ -63,12 +65,11 @@ public class CreatePCollectionViewTranslationTest {
                   testPCollection.getWindowingStrategy(),
                   false,
                   null,
-                  testPCollection.getCoder())),
+                  StringUtf8Coder.of())),
           CreatePCollectionView.of(
               PCollectionViews.listView(
                   testPCollection,
-                  testPCollection.getWindowingStrategy(),
-                  testPCollection.getCoder())));
+                  testPCollection.getWindowingStrategy())));
     }
 
     @Parameter(0)
@@ -76,7 +77,8 @@ public class CreatePCollectionViewTranslationTest {
 
     public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
-    private static final PCollection<String> testPCollection = p.apply(Create.of("one"));
+    private static final PCollection<KV<Void, String>> testPCollection =
+        p.apply(Create.of(KV.of((Void) null, "one")));
 
     @Test
     public void testEncodedProto() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 324e38d..c2dab4c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -54,8 +54,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -67,7 +65,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -376,9 +373,8 @@ public class PTransformMatchersTest implements Serializable {
   @Test
   public void createViewWithViewFn() {
     PCollection<Integer> input = p.apply(Create.of(1));
-    PCollectionView<Iterable<Integer>> view =
-        PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-    ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn = view.getViewFn();
+    PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
+    ViewFn<?, ?> viewFn = view.getViewFn();
     CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
 
     PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
@@ -388,23 +384,10 @@ public class PTransformMatchersTest implements Serializable {
   @Test
   public void createViewWithViewFnDifferentViewFn() {
     PCollection<Integer> input = p.apply(Create.of(1));
-    PCollectionView<Iterable<Integer>> view =
-        PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-    ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn =
-        new ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>>() {
-          @Override
-          public Materialization<Iterable<WindowedValue<?>>> getMaterialization() {
-            @SuppressWarnings({"rawtypes", "unchecked"})
-            Materialization<Iterable<WindowedValue<?>>> materialization =
-                (Materialization) Materializations.iterable();
-            return materialization;
-          }
-
-          @Override
-          public Iterable<Integer> apply(Iterable<WindowedValue<?>> contents) {
-            return Collections.emptyList();
-          }
-        };
+    PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
+
+    // Purposely create a subclass to get a different class then what was expected.
+    ViewFn<?, ?> viewFn = new PCollectionViews.IterableViewFn() {};
     CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
 
     PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
@@ -414,9 +397,7 @@ public class PTransformMatchersTest implements Serializable {
   @Test
   public void createViewWithViewFnNotCreatePCollectionView() {
     PCollection<Integer> input = p.apply(Create.of(1));
-    PCollectionView<Iterable<Integer>> view =
-        PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-
+    PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
     PTransformMatcher matcher =
         PTransformMatchers.createViewWithViewFn(view.getViewFn().getClass());
     assertThat(matcher.matches(getAppliedTransform(View.asIterable())), is(false));

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index b79947e..83594f1 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -166,7 +167,8 @@ public class ParDoTranslationTest {
                 view.getPCollection(),
                 protoTransform,
                 rehydratedComponents);
-        assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
+        assertThat(restoredView.getTagInternal(),
+            Matchers.<TupleTag<?>>equalTo(view.getTagInternal()));
         assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
         assertThat(
             restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
@@ -174,7 +176,8 @@ public class ParDoTranslationTest {
             restoredView.getWindowingStrategyInternal(),
             Matchers.<WindowingStrategy<?, ?>>equalTo(
                 view.getWindowingStrategyInternal().fixDefaults()));
-        assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+        assertThat(restoredView.getCoderInternal(),
+            Matchers.<Coder<?>>equalTo(view.getCoderInternal()));
       }
       String mainInputId = sdkComponents.registerPCollection(mainInput);
       assertThat(

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
new file mode 100644
index 0000000..b451547
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+  */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * An in-memory representation of {@link MultimapView}.
+ */
+public class InMemoryMultimapSideInputView<K, V> implements Materializations.MultimapView<K, V> {
+
+  /**
+   * Creates a {@link MultimapView} from the provided values. The provided {@link Coder} is used
+   * to guarantee structural equality for keys instead of assuming Java object equality.
+   */
+  public static <K, V> MultimapView<K, V> fromIterable(
+      Coder<K> keyCoder, Iterable<KV<K, V>> values) {
+    // We specifically use an array list multimap to allow for:
+    //  * null keys
+    //  * null values
+    //  * duplicate values
+    Multimap<Object, Object> multimap = ArrayListMultimap.create();
+    for (KV<K, V> value : values) {
+      multimap.put(keyCoder.structuralValue(value.getKey()), value.getValue());
+    }
+    return new InMemoryMultimapSideInputView(keyCoder, Multimaps.unmodifiableMultimap(multimap));
+  }
+
+  private final Coder<K> keyCoder;
+  private final Multimap<Object, V> structuralKeyToValuesMap;
+
+  private InMemoryMultimapSideInputView(Coder<K> keyCoder, Multimap<Object, V> data) {
+    this.keyCoder = keyCoder;
+    this.structuralKeyToValuesMap = data;
+  }
+
+  @Override
+  public Iterable<V> get(K k) {
+    return structuralKeyToValuesMap.get(keyCoder.structuralValue(k));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 3b37702..3ff4c94 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -17,22 +17,28 @@
  */
 package org.apache.beam.runners.core;
 
-import java.util.ArrayList;
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
@@ -58,7 +64,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
 
   /**
    * State internals that are scoped not to the key of a value but are global. The state can still
-   * be keep locally but if side inputs are broadcast to all parallel operators then all will
+   * be kept locally but if side inputs are broadcast to all parallel operators then all will
    * have the same view of the state.
    */
   private final StateInternals stateInternals;
@@ -80,7 +86,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
    */
   private final Map<
       PCollectionView<?>,
-      StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
+      StateTag<ValueState<Iterable<?>>>> sideInputContentsTags;
 
   /**
    * Creates a new {@code SideInputHandler} for the given side inputs that uses
@@ -94,7 +100,15 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
     this.availableWindowsTags = new HashMap<>();
     this.sideInputContentsTags = new HashMap<>();
 
-    for (PCollectionView<?> sideInput: sideInputs) {
+    for (PCollectionView<?> sideInput : sideInputs) {
+      checkArgument(
+          Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+              sideInput.getViewFn().getMaterialization().getUrn()),
+          "This handler is only capable of dealing with %s materializations "
+              + "but was asked to handle %s for PCollectionView with tag %s.",
+          Materializations.MULTIMAP_MATERIALIZATION_URN,
+          sideInput.getViewFn().getMaterialization().getUrn(),
+          sideInput.getTagInternal().getId());
 
       @SuppressWarnings("unchecked")
       Coder<BoundedWindow> windowCoder =
@@ -114,9 +128,9 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
 
       availableWindowsTags.put(sideInput, availableTag);
 
-      Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
-      StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
-          StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder);
+      StateTag<ValueState<Iterable<?>>> stateTag =
+          StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(),
+              (Coder) IterableCoder.of(sideInput.getCoderInternal()));
       sideInputContentsTags.put(sideInput, stateTag);
     }
   }
@@ -129,7 +143,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
   public void addSideInputValue(
       PCollectionView<?> sideInput,
       WindowedValue<Iterable<?>> value) {
-
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> windowCoder =
         (Coder<BoundedWindow>) sideInput
@@ -137,19 +150,13 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
             .getWindowFn()
             .windowCoder();
 
-    // reify the WindowedValue
-    List<WindowedValue<?>> inputWithReifiedWindows = new ArrayList<>();
-    for (Object e: value.getValue()) {
-      inputWithReifiedWindows.add(value.withValue(e));
-    }
-
-    StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
+    StateTag<ValueState<Iterable<?>>> stateTag =
         sideInputContentsTags.get(sideInput);
 
-    for (BoundedWindow window: value.getWindows()) {
+    for (BoundedWindow window : value.getWindows()) {
       stateInternals
           .state(StateNamespaces.window(windowCoder, window), stateTag)
-          .write(inputWithReifiedWindows);
+          .write(value.getValue());
 
       stateInternals
           .state(StateNamespaces.global(), availableWindowsTags.get(sideInput))
@@ -159,28 +166,32 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
 
   @Nullable
   @Override
-  public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
-
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> windowCoder =
-        (Coder<BoundedWindow>) sideInput
+        (Coder<BoundedWindow>) view
             .getWindowingStrategyInternal()
             .getWindowFn()
             .windowCoder();
 
-    StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
-        sideInputContentsTags.get(sideInput);
+    StateTag<ValueState<Iterable<?>>> stateTag =
+        sideInputContentsTags.get(view);
 
-    ValueState<Iterable<WindowedValue<?>>> state =
+    ValueState<Iterable<?>> state =
         stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag);
 
-    @Nullable Iterable<WindowedValue<?>> elements = state.read();
+    // TODO: Add support for choosing which representation is contained based upon the
+    // side input materialization. We currently can assume that we always have a multimap
+    // materialization as that is the only supported type within the Java SDK.
+    @Nullable Iterable<KV<?, ?>> elements = (Iterable<KV<?, ?>>) state.read();
 
     if (elements == null) {
       elements = Collections.emptyList();
     }
 
-    return sideInput.getViewFn().apply(elements);
+    ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+    Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+    return viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
new file mode 100644
index 0000000..6840355
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link InMemoryMultimapSideInputView}. */
+@RunWith(JUnit4.class)
+public class InMemoryMultimapSideInputViewTest {
+  @Test
+  public void testStructuralKeyEquality() {
+    MultimapView<byte[], Integer> view = InMemoryMultimapSideInputView.fromIterable(
+        ByteArrayCoder.of(),
+        ImmutableList.of(KV.of(new byte[]{ 0x00 }, 0), KV.of(new byte[]{ 0x01 }, 1)));
+    assertEquals(view.get(new byte[]{ 0x00 }), ImmutableList.of(0));
+    assertEquals(view.get(new byte[]{ 0x01 }), ImmutableList.of(1));
+    assertEquals(view.get(new byte[]{ 0x02 }), ImmutableList.of());
+  }
+
+  @Test
+  public void testValueGrouping() {
+    MultimapView<String, String> view = InMemoryMultimapSideInputView.fromIterable(
+        StringUtf8Coder.of(),
+        ImmutableList.of(KV.of("A", "a1"), KV.of("A", "a2"), KV.of("B", "b1")));
+    assertEquals(view.get("A"), ImmutableList.of("a1", "a2"));
+    assertEquals(view.get("B"), ImmutableList.of("b1"));
+    assertEquals(view.get("C"), ImmutableList.of());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
index f9e0aaf..7cbd1b0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
@@ -17,24 +17,28 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -47,26 +51,19 @@ public class SideInputHandlerTest {
 
   private static final long WINDOW_MSECS_1 = 100;
   private static final long WINDOW_MSECS_2 = 500;
-
-  private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
-      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
-
-  private PCollectionView<Iterable<String>> view1 =
-      PCollectionViewTesting.testingView(
-          new TupleTag<Iterable<WindowedValue<String>>>() {},
-          new PCollectionViewTesting.IdentityViewFn<String>(),
-          StringUtf8Coder.of(),
-          windowingStrategy1);
-
-  private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
-      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
-
-  private PCollectionView<Iterable<String>> view2 =
-      PCollectionViewTesting.testingView(
-          new TupleTag<Iterable<WindowedValue<String>>>() {},
-          new PCollectionViewTesting.IdentityViewFn<String>(),
-          StringUtf8Coder.of(),
-          windowingStrategy2);
+  private PCollectionView<Iterable<String>> view1;
+  private PCollectionView<Iterable<String>> view2;
+
+  @Before
+  public void setUp() {
+    PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
+    view1 = pc
+        .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
+        .apply(View.<String>asIterable());
+    view2 = pc
+        .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
+        .apply(View.<String>asIterable());
+  }
 
   @Test
   public void testIsEmpty() {
@@ -113,7 +110,9 @@ public class SideInputHandlerTest {
     // add a value for view1
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+        valuesInWindow(
+            materializeValuesFor(View.asIterable(), "Hello"),
+            new Instant(0), firstWindow));
 
     // now side input should be ready
     assertTrue(sideInputHandler.isReady(view1, firstWindow));
@@ -139,16 +138,20 @@ public class SideInputHandlerTest {
     // add a first value for view1
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), window));
+        valuesInWindow(
+            materializeValuesFor(View.asIterable(), "Hello"),
+            new Instant(0), window));
 
-    Assert.assertThat(sideInputHandler.get(view1, window), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, window), contains("Hello"));
 
     // subsequent values should replace existing values
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Ciao", "Buongiorno"), new Instant(0), window));
+        valuesInWindow(
+            materializeValuesFor(View.asIterable(), "Ciao", "Buongiorno"),
+            new Instant(0), window));
 
-    Assert.assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
+    assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
   }
 
   @Test
@@ -166,19 +169,21 @@ public class SideInputHandlerTest {
     // add a first value for view1 in the first window
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+        valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"),
+            new Instant(0), firstWindow));
 
-    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
 
     // add something for second window of view1
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Arrivederci"), new Instant(0), secondWindow));
+        valuesInWindow(materializeValuesFor(View.asIterable(), "Arrivederci"),
+    new Instant(0), secondWindow));
 
-    Assert.assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
+    assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
 
     // contents for first window should be unaffected
-    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
   }
 
   @Test
@@ -194,9 +199,10 @@ public class SideInputHandlerTest {
     // add value for view1 in the first window
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+        valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"),
+            new Instant(0), firstWindow));
 
-    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
 
     // view2 should not have any data
     assertFalse(sideInputHandler.isReady(view2, firstWindow));
@@ -204,18 +210,19 @@ public class SideInputHandlerTest {
     // also add some data for view2
     sideInputHandler.addSideInputValue(
         view2,
-        valuesInWindow(ImmutableList.of("Salut"), new Instant(0), firstWindow));
+        valuesInWindow(materializeValuesFor(View.asIterable(), "Salut"),
+            new Instant(0), firstWindow));
 
     assertTrue(sideInputHandler.isReady(view2, firstWindow));
-    Assert.assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
+    assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
 
     // view1 should not be affected by that
-    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   private WindowedValue<Iterable<?>> valuesInWindow(
-      Iterable<?> values, Instant timestamp, BoundedWindow window) {
+      List<Object> values, Instant timestamp, BoundedWindow window) {
     return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
index 43da92f..ea8f168 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
@@ -35,11 +36,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
 import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
@@ -60,6 +68,16 @@ class SideInputContainer {
    */
   public static SideInputContainer create(
       final EvaluationContext context, Collection<PCollectionView<?>> containedViews) {
+    for (PCollectionView<?> pCollectionView : containedViews) {
+      checkArgument(
+          Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+              pCollectionView.getViewFn().getMaterialization().getUrn()),
+          "This handler is only capable of dealing with %s materializations "
+              + "but was asked to handle %s for PCollectionView with tag %s.",
+          Materializations.MULTIMAP_MATERIALIZATION_URN,
+          pCollectionView.getViewFn().getMaterialization().getUrn(),
+          pCollectionView.getTagInternal().getId());
+    }
     LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
         viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
     return new SideInputContainer(containedViews, viewByWindows);
@@ -239,11 +257,21 @@ class SideInputContainer {
           "calling get() on PCollectionView %s that is not ready in window %s",
           view,
           window);
-      // Safe covariant cast
-      @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
-          (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
-              window)).get();
-      return view.getViewFn().apply(values);
+      // Safe covariant cast since we know that the view only contains KVs.
+      @SuppressWarnings("unchecked") Iterable<KV<?, ?>> elements = Iterables.transform(
+          (Iterable<WindowedValue<KV<?, ?>>>) viewContents.getUnchecked(
+              PCollectionViewWindow.of(view, window)).get(),
+          new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+            @Override
+            public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+              return windowedValue.getValue();
+            }
+          });
+
+      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+      Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+      return viewFn.apply(
+          InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index cc9ce60..0a1ffe7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
@@ -28,7 +29,6 @@ import static org.junit.Assert.assertThat;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.SideInputReader;
@@ -41,8 +41,10 @@ import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -126,33 +128,47 @@ public class EvaluationContextTest {
 
   @Test
   public void writeToViewWriterThenReadReads() {
-    PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
+    PCollectionViewWriter<?, Iterable<Integer>> viewWriter =
         context.createPCollectionViewWriter(
             PCollection.createPrimitiveOutputInternal(
                 p,
                 WindowingStrategy.globalDefault(),
                 IsBounded.BOUNDED,
-                IterableCoder.of(VarIntCoder.of())),
+                IterableCoder.of(KvCoder.of(VoidCoder.of(), VarIntCoder.of()))),
             view);
     BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
     BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
-    WindowedValue<Integer> firstValue =
-        WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Integer> secondValue =
-        WindowedValue.of(
-            2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0));
-    Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue);
-    viewWriter.add(values);
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asIterable(), 1)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(1222),
+          window,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asIterable(), 2)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(8766L),
+          second,
+          PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
+    }
+    viewWriter.add((Iterable) valuesBuilder.build());
 
     SideInputReader reader =
         context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
     assertThat(reader.get(view, window), containsInAnyOrder(1));
     assertThat(reader.get(view, second), containsInAnyOrder(2));
 
-    WindowedValue<Integer> overrittenSecondValue =
-        WindowedValue.of(
-            4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
-    viewWriter.add(Collections.singleton(overrittenSecondValue));
+    ImmutableList.Builder<WindowedValue<?>> overwrittenValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asIterable(), 4444)) {
+      overwrittenValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(8677L),
+          second,
+          PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
+    }
+    viewWriter.add((Iterable) overwrittenValuesBuilder.build());
     assertThat(reader.get(view, second), containsInAnyOrder(2));
     // The cached value is served in the earlier reader
     reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index 5e7c799..91255e0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
@@ -34,8 +35,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Mean;
@@ -49,9 +48,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
@@ -134,13 +131,22 @@ public class SideInputContainerTest {
 
   @Test
   public void getAfterWriteReturnsPaneInWindow() throws Exception {
-    WindowedValue<KV<String, Integer>> one =
-        WindowedValue.of(
-            KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<KV<String, Integer>> two =
-        WindowedValue.of(
-            KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(1L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(20L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(mapView, valuesBuilder.build());
 
     Map<String, Integer> viewContents =
         container
@@ -153,19 +159,22 @@ public class SideInputContainerTest {
 
   @Test
   public void getReturnsLatestPaneInWindow() throws Exception {
-    WindowedValue<KV<String, Integer>> one =
-        WindowedValue.of(
-            KV.of("one", 1),
-            new Instant(1L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    WindowedValue<KV<String, Integer>> two =
-        WindowedValue.of(
-            KV.of("two", 2),
-            new Instant(20L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(1L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(true, false, Timing.EARLY)));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(20L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(true, false, Timing.EARLY)));
+    }
+    container.write(mapView, valuesBuilder.build());
 
     Map<String, Integer> viewContents =
         container
@@ -175,13 +184,15 @@ public class SideInputContainerTest {
     assertThat(viewContents, hasEntry("two", 2));
     assertThat(viewContents.size(), is(2));
 
-    WindowedValue<KV<String, Integer>> three =
-        WindowedValue.of(
-            KV.of("three", 3),
-            new Instant(300L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
+    ImmutableList.Builder<WindowedValue<?>> overwriteValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("three", 3))) {
+      overwriteValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(300L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
+    }
+    container.write(mapView, overwriteValuesBuilder.build());
 
     Map<String, Integer> overwrittenViewContents =
         container
@@ -209,10 +220,7 @@ public class SideInputContainerTest {
     PCollection<KV<String, String>> input =
         pipeline.apply(Create.empty(new TypeDescriptor<KV<String, String>>() {}));
     PCollectionView<Map<String, Iterable<String>>> newView =
-        PCollectionViews.multimapView(
-            input,
-            WindowingStrategy.globalDefault(),
-            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+        input.apply(View.<String, String>asMultimap());
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("unknown views");
@@ -232,19 +240,22 @@ public class SideInputContainerTest {
 
   @Test
   public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
-    WindowedValue<Double> firstWindowedValue =
-        WindowedValue.of(
-            2.875,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            FIRST_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Double> secondWindowedValue =
-        WindowedValue.of(
-            4.125,
-            SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
-            SECOND_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          FIRST_WINDOW.maxTimestamp().minus(200L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asSingleton(), 4.125)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
+          SECOND_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(singletonView, valuesBuilder.build());
     assertThat(
         container
             .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
@@ -259,20 +270,15 @@ public class SideInputContainerTest {
 
   @Test
   public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
-    WindowedValue<Integer> firstValue =
-        WindowedValue.of(
-            44,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            FIRST_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Integer> secondValue =
-        WindowedValue.of(
-            44,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            FIRST_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-
-    container.write(iterableView, ImmutableList.of(firstValue, secondValue));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asIterable(), 44, 44)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          FIRST_WINDOW.maxTimestamp().minus(200L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(iterableView, valuesBuilder.build());
 
     assertThat(
         container
@@ -283,13 +289,15 @@ public class SideInputContainerTest {
 
   @Test
   public void writeForElementInMultipleWindowsSucceeds() throws Exception {
-    WindowedValue<Double> multiWindowedValue =
-        WindowedValue.of(
-            2.875,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    container.write(singletonView, ImmutableList.of(multiWindowedValue));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          FIRST_WINDOW.maxTimestamp().minus(200L),
+          ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(singletonView, valuesBuilder.build());
     assertThat(
         container
             .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
@@ -304,19 +312,22 @@ public class SideInputContainerTest {
 
   @Test
   public void finishDoesNotOverwriteWrittenElements() throws Exception {
-    WindowedValue<KV<String, Integer>> one =
-        WindowedValue.of(
-            KV.of("one", 1),
-            new Instant(1L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    WindowedValue<KV<String, Integer>> two =
-        WindowedValue.of(
-            KV.of("two", 2),
-            new Instant(20L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(1L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(true, false, Timing.EARLY)));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(20L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(true, false, Timing.EARLY)));
+    }
+    container.write(mapView, valuesBuilder.build());
 
     immediatelyInvokeCallback(mapView, SECOND_WINDOW);
 
@@ -362,14 +373,15 @@ public class SideInputContainerTest {
    */
   @Test
   public void isReadyForSomeNotReadyViewsFalseUntilElements() {
-    container.write(
-        mapView,
-        ImmutableList.of(
-            WindowedValue.of(
-                KV.of("one", 1),
-                SECOND_WINDOW.maxTimestamp().minus(100L),
-                SECOND_WINDOW,
-                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    ImmutableList.Builder<WindowedValue<?>> mapValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+      mapValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          SECOND_WINDOW.maxTimestamp().minus(100L),
+          SECOND_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(mapView, mapValuesBuilder.build());
 
     ReadyCheckingSideInputReader reader =
         container.createReaderForViews(ImmutableList.of(mapView, singletonView));
@@ -378,25 +390,27 @@ public class SideInputContainerTest {
 
     assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
 
-    container.write(
-        mapView,
-        ImmutableList.of(
-            WindowedValue.of(
-                KV.of("too", 2),
-                FIRST_WINDOW.maxTimestamp().minus(100L),
-                FIRST_WINDOW,
-                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    ImmutableList.Builder<WindowedValue<?>> newMapValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("too", 2))) {
+      newMapValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          FIRST_WINDOW.maxTimestamp().minus(100L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(mapView, newMapValuesBuilder.build());
     // Cached value is false
     assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
 
-    container.write(
-        singletonView,
-        ImmutableList.of(
-            WindowedValue.of(
-                1.25,
-                SECOND_WINDOW.maxTimestamp().minus(100L),
-                SECOND_WINDOW,
-                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    ImmutableList.Builder<WindowedValue<?>> singletonValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asSingleton(), 1.25)) {
+      singletonValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          SECOND_WINDOW.maxTimestamp().minus(100L),
+          SECOND_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(singletonView, singletonValuesBuilder.build());
     assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
     assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 5bc48b7..3716ec8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -32,11 +32,11 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -56,10 +56,7 @@ public class ViewEvaluatorFactoryTest {
   @Test
   public void testInMemoryEvaluator() throws Exception {
     PCollection<String> input = p.apply(Create.of("foo", "bar"));
-    CreatePCollectionView<String, Iterable<String>> createView =
-        CreatePCollectionView.of(
-            PCollectionViews.iterableView(
-                input, input.getWindowingStrategy(), StringUtf8Coder.of()));
+    PCollectionView<Iterable<String>> pCollectionView = input.apply(View.<String>asIterable());
     PCollection<Iterable<String>> concat =
         input.apply(WithKeys.<Void, String>of((Void) null))
             .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
@@ -67,11 +64,11 @@ public class ViewEvaluatorFactoryTest {
             .apply(Values.<Iterable<String>>create());
     PCollection<Iterable<String>> view =
         concat.apply(
-            new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
+            new ViewOverrideFactory.WriteView<String, Iterable<String>>(pCollectionView));
 
     EvaluationContext context = mock(EvaluationContext.class);
     TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
-    when(context.createPCollectionViewWriter(concat, createView.getView())).thenReturn(viewWriter);
+    when(context.createPCollectionViewWriter(concat, pCollectionView)).thenReturn(viewWriter);
 
     CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now());
     AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view);

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 94d8d70..556cac5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -34,13 +34,13 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,8 +59,7 @@ public class ViewOverrideFactoryTest implements Serializable {
   @Test
   public void replacementGetViewReturnsOriginal() {
     final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
-    final PCollectionView<List<Integer>> view =
-        PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
+    final PCollectionView<List<Integer>> view = ints.apply(View.<Integer>asList());
     PTransformReplacement<PCollection<Integer>, PCollection<Integer>> replacement =
         factory.getReplacementTransform(
             AppliedPTransform
@@ -89,7 +88,7 @@ public class ViewOverrideFactoryTest implements Serializable {
               // so not asserted one way or the other
               assertThat(
                   replacementView.getTagInternal(),
-                  equalTo(view.getTagInternal()));
+                  equalTo((TupleTag) view.getTagInternal()));
               assertThat(
                   replacementView.getViewFn(),
                   Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 79a23cc..cffcc5a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -38,7 +38,6 @@ import java.util.List;
 import java.util.UUID;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
-import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
@@ -56,15 +55,14 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -217,9 +215,8 @@ public class WriteWithShardingFactoryTest implements Serializable {
   public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
     long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3;
     PCollection<Long> inputCount = p.apply(Create.of(countValue));
-    PCollectionView<Long> elementCountView =
-        PCollectionViews.singletonView(
-            inputCount, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+    PCollectionView<Long> elementCountView = inputCount.apply(
+        View.<Long>asSingleton().withDefaultValue(countValue));
     CalculateShardsFn fn = new CalculateShardsFn(3);
     DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index cec01f8..aa5cc39 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -307,7 +307,6 @@ class FlinkStreamingTransformTranslators {
       intToViewMapping.put(count, sideInput);
       tagToIntMapping.put(tag, count);
       count++;
-      Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
     }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
index f275290..fb3f375 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Collections;
@@ -24,8 +25,10 @@ import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -35,6 +38,13 @@ import org.apache.flink.api.common.functions.RuntimeContext;
  * A {@link SideInputReader} for the Flink Batch Runner.
  */
 public class FlinkSideInputReader implements SideInputReader {
+  /** A {@link MultimapView} which always returns an empty iterable. */
+  private static final MultimapView EMPTY_MULTMAP_VIEW = new MultimapView() {
+    @Override
+    public Iterable get(Object o) {
+      return Collections.EMPTY_LIST;
+    }
+  };
 
   private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
 
@@ -42,6 +52,16 @@ public class FlinkSideInputReader implements SideInputReader {
 
   public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView,
                               RuntimeContext runtimeContext) {
+    for (PCollectionView<?> view : indexByView.keySet()) {
+      checkArgument(
+          Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+              view.getViewFn().getMaterialization().getUrn()),
+          "This handler is only capable of dealing with %s materializations "
+              + "but was asked to handle %s for PCollectionView with tag %s.",
+          Materializations.MULTIMAP_MATERIALIZATION_URN,
+          view.getViewFn().getMaterialization().getUrn(),
+          view.getTagInternal().getId());
+    }
     sideInputs = new HashMap<>();
     for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) {
       sideInputs.put(entry.getKey().getTagInternal(), entry.getValue());
@@ -53,7 +73,7 @@ public class FlinkSideInputReader implements SideInputReader {
   @Override
   public <T> T get(PCollectionView<T> view, BoundedWindow window) {
     checkNotNull(view, "View passed to sideInput cannot be null");
-    TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
+    TupleTag<?> tag = view.getTagInternal();
     checkNotNull(
         sideInputs.get(tag),
         "Side input for " + view + " not available.");
@@ -63,7 +83,8 @@ public class FlinkSideInputReader implements SideInputReader {
             tag.getId(), new SideInputInitializer<>(view));
     T result = sideInputs.get(window);
     if (result == null) {
-      result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+      result = viewFn.apply(EMPTY_MULTMAP_VIEW);
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
index 12222b4..782f72a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
@@ -17,12 +17,23 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 
@@ -30,24 +41,33 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
  * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map}
  * from window to side input.
  */
-public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
-    implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> {
+public class SideInputInitializer<ViewT>
+    implements BroadcastVariableInitializer<WindowedValue<?>, Map<BoundedWindow, ViewT>> {
 
   PCollectionView<ViewT> view;
 
   public SideInputInitializer(PCollectionView<ViewT> view) {
+    checkArgument(
+        Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+            view.getViewFn().getMaterialization().getUrn()),
+        "This handler is only capable of dealing with %s materializations "
+            + "but was asked to handle %s for PCollectionView with tag %s.",
+        Materializations.MULTIMAP_MATERIALIZATION_URN,
+        view.getViewFn().getMaterialization().getUrn(),
+        view.getTagInternal().getId());
     this.view = view;
   }
 
   @Override
   public Map<BoundedWindow, ViewT> initializeBroadcastVariable(
-      Iterable<WindowedValue<ElemT>> inputValues) {
+      Iterable<WindowedValue<?>> inputValues) {
 
     // first partition into windows
-    Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>();
-    for (WindowedValue<ElemT> value: inputValues) {
+    Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = new HashMap<>();
+    for (WindowedValue<KV<?, ?>> value
+        : (Iterable<WindowedValue<KV<?, ?>>>) (Iterable) inputValues) {
       for (BoundedWindow window: value.getWindows()) {
-        List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window);
+        List<WindowedValue<KV<?, ?>>> windowedValues = partitionedElements.get(window);
         if (windowedValues == null) {
           windowedValues = new ArrayList<>();
           partitionedElements.put(window, windowedValues);
@@ -58,14 +78,20 @@ public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
 
     Map<BoundedWindow, ViewT> resultMap = new HashMap<>();
 
-    for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements:
+    for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements:
         partitionedElements.entrySet()) {
 
-      @SuppressWarnings("unchecked")
-      Iterable<WindowedValue<?>> elementsIterable =
-          (List<WindowedValue<?>>) (List<?>) elements.getValue();
-
-      resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable));
+      ViewFn<MultimapView, ViewT> viewFn = (ViewFn<MultimapView, ViewT>) view.getViewFn();
+      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+      resultMap.put(elements.getKey(), viewFn.apply(InMemoryMultimapSideInputView.fromIterable(
+          keyCoder,
+          (Iterable) Iterables.transform(elements.getValue(),
+              new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+                @Override
+                public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+                  return windowedValue.getValue();
+                }
+              }))));
     }
 
     return resultMap;

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index ad17de8..33ac024f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -47,16 +48,19 @@ import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -71,6 +75,7 @@ import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.OutputTag;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -84,26 +89,19 @@ public class DoFnOperatorTest {
   // views and windows for testing side inputs
   private static final long WINDOW_MSECS_1 = 100;
   private static final long WINDOW_MSECS_2 = 500;
-
-  private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
-      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
-
-  private PCollectionView<Iterable<String>> view1 =
-      PCollectionViewTesting.testingView(
-          new TupleTag<Iterable<WindowedValue<String>>>() {},
-          new PCollectionViewTesting.IdentityViewFn<String>(),
-          StringUtf8Coder.of(),
-          windowingStrategy1);
-
-  private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
-      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
-
-  private PCollectionView<Iterable<String>> view2 =
-      PCollectionViewTesting.testingView(
-          new TupleTag<Iterable<WindowedValue<String>>>() {},
-          new PCollectionViewTesting.IdentityViewFn<String>(),
-          StringUtf8Coder.of(),
-          windowingStrategy2);
+  private PCollectionView<Iterable<String>> view1;
+  private PCollectionView<Iterable<String>> view2;
+
+  @Before
+  public void setUp() {
+    PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
+    view1 = pc
+        .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
+        .apply(View.<String>asIterable());
+    view2 = pc
+        .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
+        .apply(View.<String>asIterable());
+  }
 
   @Test
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 81e7a97..cc43e27 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(9, steps.size());
+    assertEquals(10, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
-    Step collectionToSingletonStep = steps.get(8);
+    Step collectionToSingletonStep = steps.get(9);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 
@@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(3, steps.size());
+    assertEquals(4, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
 
-    Step collectionToSingletonStep = steps.get(2);
+    Step collectionToSingletonStep = steps.get(3);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7cb8628..68e3e3c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Combine;
@@ -527,7 +528,11 @@ public final class TransformTranslator {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
         PCollectionView<WriteT> output = transform.getView();
-        Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+        Coder<Iterable<WindowedValue<?>>> coderInternal =
+            (Coder) IterableCoder.of(
+                WindowedValue.getFullCoder(
+                    output.getCoderInternal(),
+                    output.getWindowingStrategyInternal().getWindowFn().windowCoder()));
 
         @SuppressWarnings("unchecked")
         Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;


[6/7] beam git commit: Update Dataflow worker image to be compatible with side input changes.

Posted by lc...@apache.org.
Update Dataflow worker image to be compatible with side input changes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d5a8aea0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d5a8aea0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d5a8aea0

Branch: refs/heads/master
Commit: d5a8aea016286106950135743c1c6de667b10e17
Parents: 12246ad
Author: Luke Cwik <lc...@google.com>
Authored: Mon Nov 13 13:09:13 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d5a8aea0/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 2e08181..9312a8e 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <dataflow.container_version>beam-master-20170926</dataflow.container_version>
+    <dataflow.container_version>beam-master-20171113</dataflow.container_version>
     <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>


[2/7] beam git commit: Replace the View.As transforms for Dataflow batch because the entire implementation is specialized.

Posted by lc...@apache.org.
Replace the View.As transforms for Dataflow batch because the entire implementation is specialized.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2f815c5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2f815c5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2f815c5

Branch: refs/heads/master
Commit: c2f815c581cf5d6cd72b600cef42f436c9702d85
Parents: d5a8aea
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 14 07:47:53 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800

----------------------------------------------------------------------
 .../runners/dataflow/BatchViewOverrides.java    |  53 ++++-----
 .../runners/dataflow/CreateDataflowView.java    |  19 +++-
 .../beam/runners/dataflow/DataflowRunner.java   | 110 ++++++++++++++++---
 .../dataflow/StreamingViewOverrides.java        |   2 +-
 .../DataflowPipelineTranslatorTest.java         |  12 +-
 5 files changed, 142 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 9a77b4b..8ed41cb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -116,7 +116,7 @@ class BatchViewOverrides {
    * a warning to users to specify a deterministic key coder.
    */
   static class BatchViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+      extends PTransform<PCollection<KV<K, V>>, PCollection<?>> {
 
     /**
      * A {@link DoFn} which groups elements by window boundaries. For each group,
@@ -193,11 +193,11 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+    public PCollection<?> expand(PCollection<KV<K, V>> input) {
       return this.<BoundedWindow>applyInternal(input);
     }
 
-    private <W extends BoundedWindow> PCollectionView<Map<K, V>>
+    private <W extends BoundedWindow> PCollection<?>
     applyInternal(PCollection<KV<K, V>> input) {
       try {
         return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */);
@@ -216,7 +216,7 @@ class BatchViewOverrides {
     }
 
     /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
-    private <W extends BoundedWindow> PCollectionView<Map<K, V>>
+    private <W extends BoundedWindow> PCollection<?>
     applyForSingletonFallback(PCollection<KV<K, V>> input) {
       @SuppressWarnings("unchecked")
       Coder<W> windowCoder = (Coder<W>)
@@ -280,7 +280,7 @@ class BatchViewOverrides {
    * a warning to users to specify a deterministic key coder.
    */
   static class BatchViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+      extends PTransform<PCollection<KV<K, V>>, PCollection<?>> {
     /**
      * A {@link PTransform} that groups elements by the hash of window's byte representation
      * if the input {@link PCollection} is not within the global window. Otherwise by the hash
@@ -672,11 +672,11 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+    public PCollection<?> expand(PCollection<KV<K, V>> input) {
       return this.<BoundedWindow>applyInternal(input);
     }
 
-    private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
+    private <W extends BoundedWindow> PCollection<?>
     applyInternal(PCollection<KV<K, V>> input) {
       try {
         return applyForMapLike(runner, input, view, false /* unique keys not expected */);
@@ -690,7 +690,7 @@ class BatchViewOverrides {
     }
 
     /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
-    private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
+    private <W extends BoundedWindow> PCollection<?>
     applyForSingletonFallback(PCollection<KV<K, V>> input) {
       @SuppressWarnings("unchecked")
       Coder<W> windowCoder = (Coder<W>)
@@ -727,7 +727,7 @@ class BatchViewOverrides {
                   view);
     }
 
-    private static <K, V, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForMapLike(
+    private static <K, V, W extends BoundedWindow, ViewT> PCollection<?> applyForMapLike(
         DataflowRunner runner,
         PCollection<KV<K, V>> input,
         PCollectionView<ViewT> view,
@@ -804,9 +804,10 @@ class BatchViewOverrides {
           PCollectionList.of(ImmutableList.of(
               perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata));
 
-      Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
-          .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>of(view));
-      return view;
+      PCollection<IsmRecord<WindowedValue<V>>> flattenedOutputs =
+          Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections());
+      flattenedOutputs.apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>forBatch(view));
+      return flattenedOutputs;
     }
 
     @Override
@@ -843,7 +844,7 @@ class BatchViewOverrides {
    * </ul>
    */
   static class BatchViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
+      extends PTransform<PCollection<T>, PCollection<?>> {
 
     /**
      * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
@@ -900,7 +901,7 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
+    public PCollection<?> expand(PCollection<T> input) {
       @SuppressWarnings("unchecked")
       Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
           input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -913,7 +914,7 @@ class BatchViewOverrides {
           view);
     }
 
-    static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT>
+    static <T, FinalT, ViewT, W extends BoundedWindow> PCollection<?>
     applyForSingleton(
         DataflowRunner runner,
         PCollection<T> input,
@@ -936,8 +937,8 @@ class BatchViewOverrides {
 
       runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
       reifiedPerWindowAndSorted.apply(
-          CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
-      return view;
+          CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>forBatch(view));
+      return reifiedPerWindowAndSorted;
     }
 
     @Override
@@ -969,7 +970,7 @@ class BatchViewOverrides {
    * </ul>
    */
   static class BatchViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+      extends PTransform<PCollection<T>, PCollection<?>> {
     /**
      * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the
      * global window. Each {@link IsmRecord} has
@@ -1054,11 +1055,11 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
+    public PCollection<?> expand(PCollection<T> input) {
       return applyForIterableLike(runner, input, view);
     }
 
-    static <T, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForIterableLike(
+    static <T, W extends BoundedWindow, ViewT> PCollection<?> applyForIterableLike(
         DataflowRunner runner,
         PCollection<T> input,
         PCollectionView<ViewT> view) {
@@ -1081,8 +1082,8 @@ class BatchViewOverrides {
 
         runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
         reifiedPerWindowAndSorted.apply(
-            CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
-        return view;
+            CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view));
+        return reifiedPerWindowAndSorted;
       }
 
       PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input
@@ -1092,8 +1093,8 @@ class BatchViewOverrides {
 
       runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
       reifiedPerWindowAndSorted.apply(
-          CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
-      return view;
+          CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view));
+      return reifiedPerWindowAndSorted;
     }
 
     @Override
@@ -1127,7 +1128,7 @@ class BatchViewOverrides {
    * </ul>
    */
   static class BatchViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+      extends PTransform<PCollection<T>, PCollection<?>> {
 
     private final DataflowRunner runner;
     private final PCollectionView<Iterable<T>> view;
@@ -1140,7 +1141,7 @@ class BatchViewOverrides {
     }
 
     @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+    public PCollection<?> expand(PCollection<T> input) {
       return BatchViewAsList.applyForIterableLike(runner, input, view);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index 3b01d69..10888c2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -25,20 +25,29 @@ import org.apache.beam.sdk.values.PCollectionView;
 /** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
 public class CreateDataflowView<ElemT, ViewT>
     extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
-  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) {
-    return new CreateDataflowView<>(view);
+  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) {
+    return new CreateDataflowView<>(view, false);
+  }
+
+  public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) {
+    return new CreateDataflowView<>(view, true);
   }
 
   private final PCollectionView<ViewT> view;
+  private final boolean streaming;
 
-  private CreateDataflowView(PCollectionView<ViewT> view) {
+  private CreateDataflowView(PCollectionView<ViewT> view, boolean streaming) {
     this.view = view;
+    this.streaming = streaming;
   }
 
   @Override
   public PCollection<ElemT> expand(PCollection<ElemT> input) {
-    return PCollection.createPrimitiveOutputInternal(
-        input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
+    if (streaming) {
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
+    }
+    return (PCollection) view.getPCollection();
   }
 
   public PCollectionView<ViewT> getView() {

http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0a20a0f..72e4f83 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -58,6 +58,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
@@ -124,6 +125,7 @@ import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -139,7 +141,6 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.PValue;
@@ -395,28 +396,28 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                   BatchStatefulParDoOverrides.singleOutputOverrideFactory(options)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsMap.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsMap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsMultimap.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsSingleton.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsList.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsList.class, this)))
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
-                  new ReflectiveOneToOneOverrideFactory(
+                  PTransformMatchers.classEqualTo(View.AsIterable.class),
+                  new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsIterable.class, this)));
     }
     overridesBuilder
@@ -435,6 +436,81 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     return overridesBuilder.build();
   }
 
+  /**
+   * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the
+   * new replacement transform uses the supplied PCollectionView and does not create another instance.
+   */
+  private static class ReflectiveViewOverrideFactory<InputT, ViewT>
+      implements PTransformOverrideFactory<PCollection<InputT>,
+      PValue, PTransform<PCollection<InputT>, PValue>> {
+
+    private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement;
+    private final DataflowRunner runner;
+
+    private ReflectiveViewOverrideFactory(
+        Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement,
+        DataflowRunner runner) {
+      this.replacement = replacement;
+      this.runner = runner;
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+         final AppliedPTransform<PCollection<InputT>,
+             PValue,
+             PTransform<PCollection<InputT>, PValue>> transform) {
+
+      final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>();
+      transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() {
+        // Stores whether we have entered the expected composite view transform.
+        private boolean tracking = false;
+
+        @Override
+        public CompositeBehavior enterCompositeTransform(Node node) {
+          if (transform.getTransform() == node.getTransform()) {
+            tracking = true;
+          }
+          return super.enterCompositeTransform(node);
+        }
+
+        @Override
+        public void visitPrimitiveTransform(Node node) {
+          if (tracking && node.getTransform() instanceof CreatePCollectionView) {
+            checkState(
+                viewTransformRef.compareAndSet(null, (CreatePCollectionView) node.getTransform()),
+                "Found more then one instance of a CreatePCollectionView when "
+                    + "attempting to replace %s, found [%s, %s]",
+                replacement, viewTransformRef.get(), node.getTransform());
+          }
+        }
+
+        @Override
+        public void leaveCompositeTransform(Node node) {
+          if (transform.getTransform() == node.getTransform()) {
+            tracking = false;
+          }
+        }
+      });
+
+      checkState(viewTransformRef.get() != null,
+          "Expected to find CreatePCollectionView contained within %s",
+          transform.getTransform());
+      PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep =
+          InstanceBuilder.ofType(replacement)
+              .withArg(DataflowRunner.class, runner)
+              .withArg(CreatePCollectionView.class, viewTransformRef.get())
+              .build();
+      return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
+      // We do not replace any of the outputs because we expect that the new PTransform will re-use the original
+      // PCollectionView that was returned.
+      return ImmutableMap.of();
+    }
+  }
+
   private static class ReflectiveOneToOneOverrideFactory<
           InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
       extends SingleInputOutputOverrideFactory<
@@ -1258,19 +1334,21 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
    */
   private static class Deduplicate<T>
       extends PTransform<PCollection<ValueWithRecordId<T>>, PCollection<T>> {
+
     // Use a finite set of keys to improve bundling.  Without this, the key space
     // will be the space of ids which is potentially very large, which results in much
     // more per-key overhead.
     private static final int NUM_RESHARD_KEYS = 10000;
+
     @Override
     public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
       return input
           .apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() {
-                    @Override
-                    public Integer apply(ValueWithRecordId<T> value) {
-                      return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS;
-                    }
-                  }))
+            @Override
+            public Integer apply(ValueWithRecordId<T> value) {
+              return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS;
+            }
+          }))
           // Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through
           // WindmillSink.
           .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())

http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 1853248..69099c6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -68,7 +68,7 @@ class StreamingViewOverrides {
         return input
             .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
             .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
-            .apply(CreateDataflowView.<ElemT, ViewT>of(view));
+            .apply(CreateDataflowView.<ElemT, ViewT>forStreaming(view));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index cc43e27..e03abb9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(10, steps.size());
+    assertEquals(5, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
-    Step collectionToSingletonStep = steps.get(9);
+    Step collectionToSingletonStep = steps.get(4);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 
@@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(4, steps.size());
+    assertEquals(3, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
 
-    Step collectionToSingletonStep = steps.get(3);
+    Step collectionToSingletonStep = steps.get(2);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 


[5/7] beam git commit: Rebase and update code to honor findbugs @Nullable/@Nonnull conversion

Posted by lc...@apache.org.
Rebase and update code to honor findbugs @Nullable/@Nonnull conversion


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12246ad0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12246ad0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12246ad0

Branch: refs/heads/master
Commit: 12246ad0c6172fa235d96c69466b4876cd649523
Parents: 5e2593d
Author: Luke Cwik <lc...@google.com>
Authored: Fri Nov 10 11:18:13 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/Materializations.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/12246ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
index e606919..0e66c5c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.transforms;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
@@ -43,7 +44,7 @@ public class Materializations {
    * use the {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap materialization}.
    */
   public interface MultimapView<K, V> {
-    Iterable<V> get(K k);
+    Iterable<V> get(@Nullable K k);
   }
 
   /**


[7/7] beam git commit: [BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.

Posted by lc...@apache.org.
[BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.

This closes #4011


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30886ace
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30886ace
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30886ace

Branch: refs/heads/master
Commit: 30886acee9432d3bdfeb679c8dd717244bb86fb9
Parents: 7ce0a82 accb208
Author: Luke Cwik <lc...@google.com>
Authored: Wed Nov 15 09:00:23 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 09:00:23 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/ParDoTranslatorTest.java   |   3 +-
 .../core/construction/PTransformMatchers.java   |   3 +-
 .../core/construction/ParDoTranslation.java     |  17 +-
 .../CreatePCollectionViewTranslationTest.java   |  10 +-
 .../construction/PTransformMatchersTest.java    |  33 +-
 .../core/construction/ParDoTranslationTest.java |   7 +-
 .../core/InMemoryMultimapSideInputView.java     |  62 +++
 .../beam/runners/core/SideInputHandler.java     |  63 ++--
 .../core/InMemoryMultimapSideInputViewTest.java |  53 +++
 .../beam/runners/core/SideInputHandlerTest.java |  89 +++--
 .../beam/runners/direct/SideInputContainer.java |  38 +-
 .../runners/direct/EvaluationContextTest.java   |  44 ++-
 .../runners/direct/SideInputContainerTest.java  | 226 +++++------
 .../direct/ViewEvaluatorFactoryTest.java        |  13 +-
 .../runners/direct/ViewOverrideFactoryTest.java |   9 +-
 .../direct/WriteWithShardingFactoryTest.java    |   9 +-
 .../FlinkStreamingTransformTranslators.java     |   1 -
 .../functions/FlinkSideInputReader.java         |  27 +-
 .../functions/SideInputInitializer.java         |  50 ++-
 .../flink/streaming/DoFnOperatorTest.java       |  40 +-
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../runners/dataflow/BatchViewOverrides.java    |  69 ++--
 .../runners/dataflow/CreateDataflowView.java    |  21 +-
 .../beam/runners/dataflow/DataflowRunner.java   | 152 +++++++-
 .../dataflow/StreamingViewOverrides.java        |   2 +-
 .../spark/translation/TransformTranslator.java  |   7 +-
 .../spark/util/SparkSideInputReader.java        |  50 ++-
 .../org/apache/beam/sdk/transforms/Combine.java |  13 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  20 +-
 .../beam/sdk/transforms/Materializations.java   |  30 +-
 .../org/apache/beam/sdk/transforms/View.java    |  67 +++-
 .../org/apache/beam/sdk/transforms/ViewFn.java  |   6 +-
 .../apache/beam/sdk/values/PCollectionView.java |   7 +-
 .../beam/sdk/values/PCollectionViews.java       | 256 ++++++-------
 .../sdk/testing/PCollectionViewTesting.java     | 375 +++----------------
 .../beam/sdk/transforms/DoFnTesterTest.java     |  12 +-
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  14 +-
 37 files changed, 997 insertions(+), 903 deletions(-)
----------------------------------------------------------------------



[3/7] beam git commit: [BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.

Posted by lc...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 6c91088..932ccd6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -20,11 +20,17 @@ package org.apache.beam.runners.spark.util;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -34,7 +40,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 
 
 /**
- * A {@link SideInputReader} for thw SparkRunner.
+ * A {@link SideInputReader} for the SparkRunner.
  */
 public class SparkSideInputReader implements SideInputReader {
   private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
@@ -60,26 +66,30 @@ public class SparkSideInputReader implements SideInputReader {
     //--- match the appropriate sideInput window.
     // a tag will point to all matching sideInputs, that is all windows.
     // now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
-    Iterable<WindowedValue<?>> availableSideInputs =
-        (Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue();
-    Iterable<WindowedValue<?>> sideInputForWindow =
-        Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
-          @Override
-          public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
-            if (sideInputCandidate == null) {
-              return false;
-            }
-            // first match of a sideInputWindow to the elementWindow is good enough.
-            for (BoundedWindow sideInputCandidateWindow: sideInputCandidate.getWindows()) {
-              if (sideInputCandidateWindow.equals(sideInputWindow)) {
-                return true;
+    Iterable<WindowedValue<KV<?, ?>>> availableSideInputs =
+        (Iterable<WindowedValue<KV<?, ?>>>) windowedBroadcastHelper.getValue().getValue();
+    Iterable<KV<?, ?>> sideInputForWindow =
+        Iterables.transform(
+            Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
+              @Override
+              public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
+                if (sideInputCandidate == null) {
+                  return false;
+                }
+                return Iterables.contains(sideInputCandidate.getWindows(), sideInputWindow);
               }
-            }
-            // no match found.
-            return false;
-          }
-        });
-    return view.getViewFn().apply(sideInputForWindow);
+            }),
+            new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+              @Override
+              public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+                return windowedValue.getValue();
+              }
+            });
+
+    ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+    Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+    return viewFn.apply(
+        InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) sideInputForWindow));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 3c5b55b..f86e9cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View.VoidKeyToMultimapMaterialization;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -1274,14 +1275,16 @@ public class Combine {
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined =
           input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
-      PCollectionView<OutputT> view =
-          PCollectionViews.singletonView(
-              combined,
+      PCollection<KV<Void, OutputT>> materializationInput =
+          combined.apply(new VoidKeyToMultimapMaterialization<OutputT>());
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          materializationInput,
               input.getWindowingStrategy(),
               insertDefault,
               insertDefault ? fn.defaultValue() : null,
-              combined.getCoder());
-      combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view));
+          combined.getCoder());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, OutputT>, OutputT>of(view));
       return view;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 6168710..d71f0fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -602,7 +601,24 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
           return windowValue;
         }
       }
-      return view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+      // Fallback to returning the default materialization if no data was supplied.
+      // This is really to support singleton views with default values.
+
+      // TODO: Update this to supply a materialization dependent on actual URN of materialization.
+      // Currently the SDK only supports the multimap materialization and it expects a
+      // mapping function.
+      checkState(Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+          view.getViewFn().getMaterialization().getUrn()),
+          "Only materializations of type %s supported, received %s",
+          Materializations.MULTIMAP_MATERIALIZATION_URN,
+          view.getViewFn().getMaterialization().getUrn());
+      return ((ViewFn<Materializations.MultimapView, T>) view.getViewFn()).apply(
+          new Materializations.MultimapView<Object, Object>() {
+            @Override
+            public Iterable<Object> get(Object o) {
+              return Collections.emptyList();
+            }
+          });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
index 6e4f83d..e606919 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.transforms;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
@@ -32,29 +31,37 @@ import org.apache.beam.sdk.util.WindowedValue;
 @Internal
 public class Materializations {
   /**
-   * The URN for a {@link Materialization} where the primitive view type is an iterable of fully
+   * The URN for a {@link Materialization} where the primitive view type is an multimap of fully
    * specified windowed values.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static final String ITERABLE_MATERIALIZATION_URN =
-      "urn:beam:sideinput:materialization:iterable:0.1";
+  public static final String MULTIMAP_MATERIALIZATION_URN =
+      "urn:beam:sideinput:materialization:multimap:0.1";
+
+  /**
+   * Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when it declares to
+   * use the {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap materialization}.
+   */
+  public interface MultimapView<K, V> {
+    Iterable<V> get(K k);
+  }
 
   /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
-   * <p>A {@link Materialization} where the primitive view type is an iterable of fully specified
-   * windowed values.
+   * <p>A {@link Materialization} where the primitive view type is a multimap with fully
+   * specified windowed keys.
    */
   @Internal
-  public static <T> Materialization<Iterable<WindowedValue<T>>> iterable() {
-    return new IterableMaterialization<>();
+  public static <K, V> Materialization<MultimapView<K, V>> multimap() {
+    return new MultimapMaterialization<>();
   }
 
-  private static class IterableMaterialization<T>
-      implements Materialization<Iterable<WindowedValue<T>>> {
+  private static class MultimapMaterialization<K, V>
+      implements Materialization<MultimapView<K, V>> {
     @Override
     public String getUrn() {
-      return ITERABLE_MATERIALIZATION_URN;
+      return MULTIMAP_MATERIALIZATION_URN;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index eaa7925..ec8233e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
@@ -258,9 +260,13 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<T, List<T>>of(view));
+      PCollection<KV<Void, T>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<T>());
+      PCollectionView<List<T>> view = PCollectionViews.listView(
+          materializationInput,
+          materializationInput.getWindowingStrategy());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, T>, List<T>>of(view));
       return view;
     }
   }
@@ -285,9 +291,13 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<T, Iterable<T>>of(view));
+      PCollection<KV<Void, T>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<T>());
+      PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
+          materializationInput,
+          materializationInput.getWindowingStrategy());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, T>, Iterable<T>>of(view));
       return view;
     }
   }
@@ -428,9 +438,13 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+      PCollection<KV<Void, KV<K, V>>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>());
+      PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView(
+          materializationInput,
+          materializationInput.getWindowingStrategy());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, Iterable<V>>>of(view));
       return view;
     }
   }
@@ -463,9 +477,13 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
+      PCollection<KV<Void, KV<K, V>>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>());
+      PCollectionView<Map<K, V>> view = PCollectionViews.mapView(
+          materializationInput,
+          materializationInput.getWindowingStrategy());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, V>>of(view));
       return view;
     }
   }
@@ -474,6 +492,31 @@ public class View {
   // Internal details below
 
   /**
+   * A {@link PTransform} which converts all values into {@link KV}s with {@link Void} keys.
+   *
+   * <p>TODO: Replace this materialization with specializations that optimize the various SDK
+   * requested views.
+   */
+  @Internal
+  static class VoidKeyToMultimapMaterialization<T>
+      extends PTransform<PCollection<T>, PCollection<KV<Void, T>>> {
+
+    private static class VoidKeyToMultimapMaterializationDoFn<T> extends DoFn<T, KV<Void, T>> {
+      @ProcessElement
+      public void processElement(ProcessContext ctxt) {
+        ctxt.output(KV.of((Void) null, ctxt.element()));
+      }
+    }
+
+    @Override
+    public PCollection<KV<Void, T>> expand(PCollection<T> input) {
+      PCollection output = input.apply(ParDo.of(new VoidKeyToMultimapMaterializationDoFn<>()));
+      output.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
+      return output;
+    }
+  }
+
+  /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
    * <p>Creates a primitive {@link PCollectionView}.

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
index d51a917..9291bc6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.values.PCollectionView;
  * {@link View#asIterable()}, and {@link View#asMap()} for more detail on specific views
  * available in the SDK.
  *
- * @param <PrimitiveViewT> the type of the underlying primitive view, provided by the runner
- *        {@code <ViewT>} the type of the value(s) accessible via this {@link PCollectionView}
+ * @param <PrimitiveViewT> the type of the underlying primitive view required
+ * @param <ViewT> the type of the value(s) accessible via this {@link PCollectionView}
  */
 @Internal
 public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {
@@ -49,5 +49,5 @@ public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {
   /**
    * A function to adapt a primitive view type to a desired view type.
    */
-  public abstract ViewT apply(PrimitiveViewT contents);
+  public abstract ViewT apply(PrimitiveViewT primitiveViewT);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
index 7d87412..c212c34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * A {@link PCollectionView PCollectionView&lt;T&gt;} is an immutable view of a {@link PCollection}
@@ -72,7 +71,7 @@ public interface PCollectionView<T> extends PValue, Serializable {
    */
   @Deprecated
   @Internal
-  TupleTag<Iterable<WindowedValue<?>>> getTagInternal();
+  TupleTag<?> getTagInternal();
 
   /**
    * <b>For internal use only.</b>
@@ -83,7 +82,7 @@ public interface PCollectionView<T> extends PValue, Serializable {
    */
   @Deprecated
   @Internal
-  ViewFn<Iterable<WindowedValue<?>>, T> getViewFn();
+  ViewFn<?, T> getViewFn();
 
   /**
    * <b>For internal use only.</b>
@@ -116,5 +115,5 @@ public interface PCollectionView<T> extends PValue, Serializable {
    */
   @Deprecated
   @Internal
-  Coder<Iterable<WindowedValue<?>>> getCoderInternal();
+  Coder<?> getCoderInternal();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index ed8fb76..30277f0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -17,14 +17,13 @@
  */
 package org.apache.beam.sdk.values;
 
-import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -36,16 +35,15 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * <b>For internal use only; no backwards compatibility guarantees.</b>
@@ -56,88 +54,79 @@ import org.apache.beam.sdk.util.WindowedValue;
 public class PCollectionViews {
 
   /**
-   * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided
-   * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<T>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    *
    * <p>If {@code hasDefault} is {@code true}, then the view will take on the value
    * {@code defaultValue} for any empty windows.
    */
   public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
-      PCollection<T> pCollection,
+      PCollection<KV<Void, T>> pCollection,
       WindowingStrategy<?, W> windowingStrategy,
       boolean hasDefault,
       @Nullable T defaultValue,
-      Coder<T> valueCoder) {
+      Coder<T> defaultValueCoder) {
     return new SimplePCollectionView<>(
         pCollection,
-        new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
+        new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
-   * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    */
   public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
-      PCollection<T> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<T> valueCoder) {
+      PCollection<KV<Void, T>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
         new IterableViewFn<T>(),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
-   * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    */
   public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
-      PCollection<T> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<T> valueCoder) {
+      PCollection<KV<Void, T>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
         new ListViewFn<T>(),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
-   * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    */
   public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
-      PCollection<KV<K, V>> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<KV<K, V>> valueCoder) {
+      PCollection<KV<Void, KV<K, V>>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
         new MapViewFn<K, V>(),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
-   * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded
-   * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    */
   public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
-      PCollection<KV<K, V>> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<KV<K, V>> valueCoder) {
+      PCollection<KV<Void, KV<K, V>>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
         new MultimapViewFn<K, V>(),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
@@ -153,18 +142,15 @@ public class PCollectionViews {
   }
 
   /**
-   * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
+   * Implementation which is able to adapt a multimap materialization to a {@code T}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#singletonView}.
-   *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> {
+  public static class SingletonViewFn<T>
+      extends ViewFn<MultimapView<Void, T>, T> {
     @Nullable private byte[] encodedDefaultValue;
     @Nullable private transient T defaultValue;
     @Nullable private Coder<T> valueCoder;
@@ -204,9 +190,12 @@ public class PCollectionViews {
       }
       // Lazily decode the default value once
       synchronized (this) {
-        if (encodedDefaultValue != null && defaultValue == null) {
+        if (encodedDefaultValue != null) {
           try {
             defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
+            // Clear the encoded default value to free the reference once we have the object
+            // version. Also, this will guarantee that the value will only be decoded once.
+            encodedDefaultValue = null;
           } catch (IOException e) {
             throw new RuntimeException("Unexpected IOException: ", e);
           }
@@ -216,84 +205,67 @@ public class PCollectionViews {
     }
 
     @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, T>> getMaterialization() {
+      return Materializations.multimap();
     }
 
     @Override
-    public T apply(Iterable<WindowedValue<T>> contents) {
+    public T apply(MultimapView<Void, T> primitiveViewT) {
       try {
-        return Iterables.getOnlyElement(contents).getValue();
+        return Iterables.getOnlyElement(primitiveViewT.get(null));
       } catch (NoSuchElementException exc) {
         return getDefaultValue();
       } catch (IllegalArgumentException exc) {
         throw new IllegalArgumentException(
-            "PCollection with more than one element "
-                + "accessed as a singleton view.");
+            "PCollection with more than one element accessed as a singleton view.");
       }
     }
   }
 
   /**
-   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}.
+   * Implementation which is able to adapt a multimap materialization to a {@code Iterable<T>}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#iterableView}.
-   *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
   public static class IterableViewFn<T>
-      extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
+      extends ViewFn<MultimapView<Void, T>, Iterable<T>> {
+
     @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, T>> getMaterialization() {
+      return Materializations.multimap();
     }
 
     @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
-      return Iterables.unmodifiableIterable(
-          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-        @SuppressWarnings("unchecked")
-        @Override
-        public T apply(WindowedValue<T> input) {
-          return input.getValue();
-        }
-      }));
+    public Iterable<T> apply(MultimapView<Void, T> primitiveViewT) {
+      return Iterables.unmodifiableIterable(primitiveViewT.get(null));
     }
   }
 
   /**
-   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}.
+   * Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#listView}.
-   *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> {
+  public static class ListViewFn<T>
+      extends ViewFn<MultimapView<Void, T>, List<T>> {
     @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, T>> getMaterialization() {
+      return Materializations.multimap();
     }
 
     @Override
-    public List<T> apply(Iterable<WindowedValue<T>> contents) {
-      return ImmutableList.copyOf(
-          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-            @SuppressWarnings("unchecked")
-            @Override
-            public T apply(WindowedValue<T> input) {
-              return input.getValue();
+    public List<T> apply(MultimapView<Void, T> primitiveViewT) {
+      List<T> list = new ArrayList<>();
+      for (T t : primitiveViewT.get(null)) {
+        list.add(t);
             }
-          }));
+      return Collections.unmodifiableList(list);
     }
 
     @Override
@@ -308,27 +280,29 @@ public class PCollectionViews {
   }
 
   /**
-   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
-   * to {@code Map<K, Iterable<V>>}.
+   * Implementation which is able to adapt a multimap materialization to a
+   * {@code Map<K, Iterable<V>>}.
+   *
+   * <p>For internal use only.
    *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
+   * <p>Instantiate via {@link PCollectionViews#multimapView}.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
   public static class MultimapViewFn<K, V>
-      extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> {
+      extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, Iterable<V>>> {
     @Override
-    public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
+      return Materializations.multimap();
     }
 
     @Override
-    public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+    public Map<K, Iterable<V>> apply(
+        MultimapView<Void, KV<K, V>> primitiveViewT) {
+      // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
+      // using structural value equality.
       Multimap<K, V> multimap = HashMultimap.create();
-      for (WindowedValue<KV<K, V>> elem : elements) {
-        KV<K, V> kv = elem.getValue();
-        multimap.put(kv.getKey(), kv.getValue());
+      for (KV<K, V> elem : primitiveViewT.get(null)) {
+        multimap.put(elem.getKey(), elem.getValue());
       }
       // Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts
       @SuppressWarnings({"unchecked", "rawtypes"})
@@ -338,32 +312,31 @@ public class PCollectionViews {
   }
 
   /**
-   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key to
-   * {@code Map<K, V>}.
+   * Implementation which is able to adapt a multimap materialization to a {@code Map<K, V>}.
+   *
+   * <p>For internal use only.
    *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
+   * <p>Instantiate via {@link PCollectionViews#mapView}.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class MapViewFn<K, V> extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> {
+  public static class MapViewFn<K, V>
+      extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, V>> {
+
     @Override
-    public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
+      return Materializations.multimap();
     }
 
-    /**
-     * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}.
-     */
     @Override
-    public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+    public Map<K, V> apply(MultimapView<Void, KV<K, V>> primitiveViewT) {
+      // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
+      // using structural value equality.
       Map<K, V> map = new HashMap<>();
-      for (WindowedValue<KV<K, V>> elem : elements) {
-        KV<K, V> kv = elem.getValue();
-        if (map.containsKey(kv.getKey())) {
-          throw new IllegalArgumentException("Duplicate values for " + kv.getKey());
+      for (KV<K, V> elem : primitiveViewT.get(null)) {
+        if (map.containsKey(elem.getKey())) {
+          throw new IllegalArgumentException("Duplicate values for " + elem.getKey());
         }
-        map.put(kv.getKey(), kv.getValue());
+        map.put(elem.getKey(), elem.getValue());
       }
       return Collections.unmodifiableMap(map);
     }
@@ -375,14 +348,14 @@ public class PCollectionViews {
    *
    * <p>For internal use only.
    */
-  public static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow>
+  public static class SimplePCollectionView<ElemT, PrimitiveViewT, ViewT, W extends BoundedWindow>
       extends PValueBase
       implements PCollectionView<ViewT> {
     /** The {@link PCollection} this view was originally created from. */
     private transient PCollection<ElemT> pCollection;
 
     /** A unique tag for the view, typed according to the elements underlying the view. */
-    private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
+    private TupleTag<PrimitiveViewT> tag;
 
     private WindowMappingFn<W> windowMappingFn;
 
@@ -390,12 +363,12 @@ public class PCollectionViews {
     private WindowingStrategy<?, W> windowingStrategy;
 
     /** The coder for the elements underlying the view. */
-    private @Nullable Coder<Iterable<WindowedValue<ElemT>>> coder;
+    private @Nullable Coder<ElemT> coder;
 
     /**
      * The typed {@link ViewFn} for this view.
      */
-    private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
+    private ViewFn<PrimitiveViewT, ViewT> viewFn;
 
     /**
      * Call this constructor to initialize the fields for which this base class provides
@@ -403,11 +376,10 @@ public class PCollectionViews {
      */
     private SimplePCollectionView(
         PCollection<ElemT> pCollection,
-        TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+        TupleTag<PrimitiveViewT> tag,
+        ViewFn<PrimitiveViewT, ViewT> viewFn,
         WindowMappingFn<W> windowMappingFn,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<ElemT> valueCoder) {
+        WindowingStrategy<?, W> windowingStrategy) {
       super(pCollection.getPipeline());
       this.pCollection = pCollection;
       if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
@@ -417,9 +389,7 @@ public class PCollectionViews {
       this.tag = tag;
       this.windowingStrategy = windowingStrategy;
       this.viewFn = viewFn;
-      this.coder =
-          IterableCoder.of(WindowedValue.getFullCoder(
-              valueCoder, windowingStrategy.getWindowFn().windowCoder()));
+      this.coder = pCollection.getCoder();
     }
 
     /**
@@ -428,27 +398,20 @@ public class PCollectionViews {
      */
     private SimplePCollectionView(
         PCollection<ElemT> pCollection,
-        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+        ViewFn<PrimitiveViewT, ViewT> viewFn,
         WindowMappingFn<W> windowMappingFn,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<ElemT> valueCoder) {
+        WindowingStrategy<?, W> windowingStrategy) {
       this(
           pCollection,
-          new TupleTag<Iterable<WindowedValue<ElemT>>>(),
+          new TupleTag<PrimitiveViewT>(),
           viewFn,
           windowMappingFn,
-          windowingStrategy,
-          valueCoder);
+          windowingStrategy);
     }
 
     @Override
-    public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
-      // Safe cast: it is required that the rest of the SDK maintain the invariant
-      // that a PCollectionView is only provided an iterable for the elements of an
-      // appropriately typed PCollection.
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
-      return untypedViewFn;
+    public ViewFn<PrimitiveViewT, ViewT> getViewFn() {
+      return viewFn;
     }
 
     @Override
@@ -467,13 +430,8 @@ public class PCollectionViews {
      * <p>For internal use only by runner implementors.
      */
     @Override
-    public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
-      // Safe cast: It is required that the rest of the SDK maintain the invariant that
-      // this tag is only used to access the contents of an appropriately typed underlying
-      // PCollection
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
-      return untypedTag;
+    public TupleTag<?> getTagInternal() {
+      return tag;
     }
 
     /**
@@ -488,12 +446,8 @@ public class PCollectionViews {
     }
 
     @Override
-    public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
-      // Safe cast: It is required that the rest of the SDK only use this untyped coder
-      // for the elements of an appropriately typed underlying PCollection.
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
-      return untypedCoder;
+    public Coder<?> getCoderInternal() {
+      return coder;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index aaf8b91..e7fd9b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -18,344 +18,57 @@
 
 package org.apache.beam.sdk.testing;
 
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
 
 /**
- * Methods for creating and using {@link PCollectionView} instances.
+ * Methods for testing {@link PCollectionView}s.
  */
 public final class PCollectionViewTesting {
-
-  // Do not instantiate; static methods only
-  private PCollectionViewTesting() { }
-
-  /**
-   * The length of the default window, which is an {@link IntervalWindow}, but kept encapsulated
-   * as it is not for the user to know what sort of window it is.
-   */
-  private static final long DEFAULT_WINDOW_MSECS = 1000 * 60 * 60;
-
-  /**
-   * A default windowing strategy. Tests that are not concerned with the windowing
-   * strategy should not specify it, and all views will use this.
-   */
-  public static final WindowingStrategy<?, ?> DEFAULT_WINDOWING_STRATEGY =
-      WindowingStrategy.of(FixedWindows.of(new Duration(DEFAULT_WINDOW_MSECS)));
-
-  /**
-   * A default window into which test elements will be placed, if the window is
-   * not explicitly overridden.
-   */
-  public static final BoundedWindow DEFAULT_NONEMPTY_WINDOW =
-      new IntervalWindow(new Instant(0), new Instant(DEFAULT_WINDOW_MSECS));
-
-  /**
-   * A timestamp in the {@link #DEFAULT_NONEMPTY_WINDOW}.
-   */
-  public static final Instant DEFAULT_TIMESTAMP = DEFAULT_NONEMPTY_WINDOW.maxTimestamp().minus(1);
-
-  /**
-   * A window into which no element will be placed by methods in this class, unless explicitly
-   * requested.
-   */
-  public static final BoundedWindow DEFAULT_EMPTY_WINDOW = new IntervalWindow(
-      DEFAULT_NONEMPTY_WINDOW.maxTimestamp(),
-      DEFAULT_NONEMPTY_WINDOW.maxTimestamp().plus(DEFAULT_WINDOW_MSECS));
-
-  /**
-   * A {@link ViewFn} that returns the provided contents as a fully lazy iterable.
-   */
-  public static class IdentityViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
-    @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
-      return Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-        @Override
-        public T apply(WindowedValue<T> windowedValue) {
-          return windowedValue.getValue();
-        }
-      });
-    }
-  }
-
-  /**
-   * A {@link ViewFn} that traverses the whole iterable eagerly and returns the number of elements.
-   *
-   * <p>Only for use in testing scenarios with small collections. If there are more elements
-   * provided than {@code Integer.MAX_VALUE} then behavior is unpredictable.
-   */
-  public static class LengthViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Long> {
-    @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public Long apply(Iterable<WindowedValue<T>> contents) {
-      return (long) Iterables.size(contents);
-    }
-  }
-
-  /**
-   * A {@link ViewFn} that always returns the value with which it is instantiated.
-   */
-  public static class ConstantViewFn<ElemT, ViewT>
-      extends ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> {
-    private ViewT value;
-
-    public ConstantViewFn(ViewT value) {
-      this.value = value;
-    }
-
-    @Override
-    public Materialization<Iterable<WindowedValue<ElemT>>> getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public ViewT apply(Iterable<WindowedValue<ElemT>> contents) {
-      return value;
-    }
-  }
-
-  /**
-   * A {@link PCollectionView} explicitly built from a {@link TupleTag}
-   * and conversion {@link ViewFn}, and an element coder, using the
-   * {@link #DEFAULT_WINDOWING_STRATEGY}.
-   *
-   * <p>This method is only recommended for use by runner implementors to test their
-   * implementations. It is very easy to construct a {@link PCollectionView} that does
-   * not respect the invariants required for proper functioning.
-   *
-   * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
-   * values provided to the view during execution, results are unpredictable. It is recommended
-   * that the values be prepared via {@link #contentsInDefaultWindow}.
-   */
-  public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
-      TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-      ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-      Coder<ElemT> elemCoder,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return testingView(null, tag, viewFn, elemCoder, windowingStrategy);
-  }
-
-  /**
-   * The default {@link Coder} used for windowed values, given an element {@link Coder}.
-   */
-  public static <T> Coder<WindowedValue<T>> defaultWindowedValueCoder(Coder<T> elemCoder) {
-    return WindowedValue.getFullCoder(
-        elemCoder, DEFAULT_WINDOWING_STRATEGY.getWindowFn().windowCoder());
-  }
-
-  /**
-   * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
-   * WindowingStrategy}, {@link Coder}, and conversion function.
-   *
-   * <p>This method is only recommended for use by runner implementors to test their
-   * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
-   * the invariants required for proper functioning.
-   *
-   * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
-   * values provided to the view during execution, results are unpredictable.
-   */
-  public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
-      PCollection<ElemT> pCollection,
-      TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-      ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-      Coder<ElemT> elemCoder,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return testingView(
-        pCollection,
-        tag,
-        viewFn,
-        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        elemCoder,
-        windowingStrategy);
-  }
-
-  /**
-   * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
-   * WindowingStrategy}, {@link Coder}, {@link ViewFn} and {@link WindowMappingFn}.
-   *
-   * <p>This method is only recommended for use by runner implementors to test their
-   * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
-   * the invariants required for proper functioning.
-   *
-   * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
-   * values provided to the view during execution, results are unpredictable.
-   */
-  public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
-      PCollection<ElemT> pCollection,
-      TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-      ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-      WindowMappingFn<?> windowMappingFn,
-      Coder<ElemT> elemCoder,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return new PCollectionViewFromParts<>(
-        pCollection,
-        tag,
-        viewFn,
-        windowMappingFn,
-        windowingStrategy,
-        IterableCoder.of(
-            WindowedValue.getFullCoder(elemCoder, windowingStrategy.getWindowFn().windowCoder())));
-  }
-
-  /**
-   * Places the given {@code value} in the {@link #DEFAULT_NONEMPTY_WINDOW}.
-   */
-  public static <T> WindowedValue<T> valueInDefaultWindow(T value) {
-    return WindowedValue.of(value, DEFAULT_TIMESTAMP, DEFAULT_NONEMPTY_WINDOW, PaneInfo.NO_FIRING);
-  }
-
-  /**
-   * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input.
-   */
-  @SafeVarargs
-  public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(T... values)
-      throws Exception {
-    List<WindowedValue<T>> windowedValues = Lists.newArrayList();
-    for (T value : values) {
-      windowedValues.add(valueInDefaultWindow(value));
-    }
-    return windowedValues;
-  }
-
-  /**
-   * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input.
-   */
-  public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(Iterable<T> values)
-      throws Exception {
-    List<WindowedValue<T>> windowedValues = Lists.newArrayList();
-    for (T value : values) {
-      windowedValues.add(valueInDefaultWindow(value));
-    }
-    return windowedValues;
-  }
-
-  // Internal details below here
-
-  /**
-   * A {@link PCollectionView} explicitly built from its {@link TupleTag},
-   * {@link WindowingStrategy}, and conversion function.
-   *
-   * <p>Instantiate via {@link #testingView}.
-   */
-  private static class PCollectionViewFromParts<ElemT, ViewT>
-      extends PValueBase
-      implements PCollectionView<ViewT> {
-    private PCollection<ElemT> pCollection;
-    private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
-    private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
-    private WindowMappingFn<?> windowMappingFn;
-    private WindowingStrategy<?, ?> windowingStrategy;
-    private Coder<Iterable<WindowedValue<ElemT>>> coder;
-
-    public PCollectionViewFromParts(
-        PCollection<ElemT> pCollection,
-        TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-        WindowMappingFn<?> windowMappingFn,
-        WindowingStrategy<?, ?> windowingStrategy,
-        Coder<Iterable<WindowedValue<ElemT>>> coder) {
-      this.pCollection = pCollection;
-      this.tag = tag;
-      this.viewFn = viewFn;
-      this.windowMappingFn = windowMappingFn;
-      this.windowingStrategy = windowingStrategy;
-      this.coder = coder;
-    }
-
-    @Override
-    public PCollection<?> getPCollection() {
-      return pCollection;
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
-      return (TupleTag) tag;
-    }
-
-    @Override
-    public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
-      // Safe cast; runners must maintain type safety
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
-      return untypedViewFn;
-    }
-
-    @Override
-    public WindowMappingFn<?> getWindowMappingFn() {
-      return windowMappingFn;
-    }
-
-    @Override
-    public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
-      return windowingStrategy;
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
-      return (Coder) coder;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(tag);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof PCollectionView)) {
-        return false;
+  public static List<Object> materializeValuesFor(
+      PTransform<?, ? extends PCollectionView<?>> viewTransformClass, Object ... values) {
+    List<Object> rval = new ArrayList<>();
+    // Currently all view materializations are the same where the data is shared underneath
+    // the void/null key. Once this changes, these materializations will differ but test code
+    // should not worry about what these look like if they are relying on the ViewFn to "undo"
+    // the conversion.
+    if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
       }
-      @SuppressWarnings("unchecked")
-      PCollectionView<?> otherView = (PCollectionView<?>) other;
-      return tag.equals(otherView.getTagInternal());
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("tag", tag)
-          .add("viewFn", viewFn)
-          .toString();
-    }
-
-    @Override
-    public Map<TupleTag<?>, PValue> expand() {
-      return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
-    }
+    } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
+      }
+    } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
+      }
+    } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
+      }
+    } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
+      }
+    } else {
+      throw new IllegalArgumentException(String.format(
+          "Unknown type of view %s. Supported views are %s.",
+          viewTransformClass.getClass(),
+          ImmutableSet.of(
+              View.AsSingleton.class,
+              View.AsIterable.class,
+              View.AsList.class,
+              View.AsMap.class,
+              View.AsMultimap.class)));
+    }
+    return Collections.unmodifiableList(rval);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 5cb9e18..cff6b2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -37,9 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -297,9 +295,8 @@ public class DoFnTesterTest {
   @Test
   public void fnWithSideInputDefault() throws Exception {
     PCollection<Integer> pCollection = p.apply(Create.empty(VarIntCoder.of()));
-    final PCollectionView<Integer> value =
-        PCollectionViews.singletonView(
-            pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+    final PCollectionView<Integer> value = pCollection.apply(
+        View.<Integer>asSingleton().withDefaultValue(0));
 
     try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
       tester.processElement(1);
@@ -313,9 +310,8 @@ public class DoFnTesterTest {
   @Test
   public void fnWithSideInputExplicit() throws Exception {
     PCollection<Integer> pCollection = p.apply(Create.of(-2));
-    final PCollectionView<Integer> value =
-        PCollectionViews.singletonView(
-            pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+    final PCollectionView<Integer> value = pCollection.apply(
+        View.<Integer>asSingleton().withDefaultValue(0));
 
     try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
       tester.setSideInput(value, GlobalWindow.INSTANCE, -2);

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 1ccd5d6..7d20532 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -209,7 +209,7 @@ class BatchLoads<DestinationT>
     checkArgument(numFileShards > 0);
     Pipeline p = input.getPipeline();
     final PCollectionView<String> jobIdTokenView = createJobIdView(p);
-    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView);
     // The user-supplied triggeringDuration is often chosen to to control how many BigQuery load
     // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
     // is set to a large value, currently we have to buffer all the data unti the trigger fires.
@@ -295,7 +295,7 @@ class BatchLoads<DestinationT>
   public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> input) {
     Pipeline p = input.getPipeline();
     final PCollectionView<String> jobIdTokenView = createJobIdView(p);
-    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView);
     PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
         input.apply(
             "rewindowIntoGlobal",
@@ -364,8 +364,10 @@ class BatchLoads<DestinationT>
   }
 
   // Generate the temporary-file prefix.
-  private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) {
-    return ((PCollection<String>) jobIdView.getPCollection())
+  private PCollectionView<String> createTempFilePrefixView(
+      Pipeline p, final PCollectionView<String> jobIdView) {
+    return p
+        .apply(Create.of(""))
         .apply(
             "GetTempFilePrefix",
             ParDo.of(
@@ -382,13 +384,13 @@ class BatchLoads<DestinationT>
                         resolveTempLocation(
                             tempLocationRoot,
                             "BigQueryWriteTemp",
-                            c.element());
+                            c.sideInput(jobIdView));
                     LOG.info(
                         "Writing BigQuery temporary files to {} before loading them.",
                         tempLocation);
                     c.output(tempLocation);
                   }
-                }))
+                }).withSideInputs(jobIdView))
         .apply("TempFilePrefixView", View.<String>asSingleton());
   }