You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:26 UTC

[35/50] [abbrv] beam git commit: [BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.

[BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e2593da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e2593da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e2593da

Branch: refs/heads/tez-runner
Commit: 5e2593daacec83e876b747d56d8c335531a54d1d
Parents: 7ce0a82
Author: Luke Cwik <lc...@google.com>
Authored: Fri Nov 10 11:08:24 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/ParDoTranslatorTest.java   |   3 +-
 .../core/construction/PTransformMatchers.java   |   3 +-
 .../core/construction/ParDoTranslation.java     |  17 +-
 .../CreatePCollectionViewTranslationTest.java   |  10 +-
 .../construction/PTransformMatchersTest.java    |  33 +-
 .../core/construction/ParDoTranslationTest.java |   7 +-
 .../core/InMemoryMultimapSideInputView.java     |  62 +++
 .../beam/runners/core/SideInputHandler.java     |  63 ++--
 .../core/InMemoryMultimapSideInputViewTest.java |  53 +++
 .../beam/runners/core/SideInputHandlerTest.java |  89 +++--
 .../beam/runners/direct/SideInputContainer.java |  38 +-
 .../runners/direct/EvaluationContextTest.java   |  44 ++-
 .../runners/direct/SideInputContainerTest.java  | 226 +++++------
 .../direct/ViewEvaluatorFactoryTest.java        |  13 +-
 .../runners/direct/ViewOverrideFactoryTest.java |   9 +-
 .../direct/WriteWithShardingFactoryTest.java    |   9 +-
 .../FlinkStreamingTransformTranslators.java     |   1 -
 .../functions/FlinkSideInputReader.java         |  27 +-
 .../functions/SideInputInitializer.java         |  50 ++-
 .../flink/streaming/DoFnOperatorTest.java       |  40 +-
 .../DataflowPipelineTranslatorTest.java         |  12 +-
 .../spark/translation/TransformTranslator.java  |   7 +-
 .../spark/util/SparkSideInputReader.java        |  50 ++-
 .../org/apache/beam/sdk/transforms/Combine.java |  13 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  20 +-
 .../beam/sdk/transforms/Materializations.java   |  29 +-
 .../org/apache/beam/sdk/transforms/View.java    |  67 +++-
 .../org/apache/beam/sdk/transforms/ViewFn.java  |   6 +-
 .../apache/beam/sdk/values/PCollectionView.java |   7 +-
 .../beam/sdk/values/PCollectionViews.java       | 256 ++++++-------
 .../sdk/testing/PCollectionViewTesting.java     | 375 +++----------------
 .../beam/sdk/transforms/DoFnTesterTest.java     |  12 +-
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  14 +-
 33 files changed, 809 insertions(+), 856 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
index 73382e3..4a4ca1d 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.apex.translation;
 
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -219,7 +220,7 @@ public class ParDoTranslatorTest {
     operator.beginWindow(0);
     WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
     WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow(
-        Lists.<Integer>newArrayList(22));
+        materializeValuesFor(View.asSingleton(), 22));
     operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input
 
     final List<Object> results = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 0d27241..42ac73f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 
@@ -304,7 +303,7 @@ public class PTransformMatchers {
         }
         CreatePCollectionView<?, ?> createView =
             (CreatePCollectionView<?, ?>) application.getTransform();
-        ViewFn<Iterable<WindowedValue<?>>, ?> viewFn = createView.getView().getViewFn();
+        ViewFn<?, ?> viewFn = createView.getView().getViewFn();
         return viewFn.getClass().equals(viewFnType);
       }
     };

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index f88cbe5..e00b912 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -50,7 +50,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput.Builder;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -73,8 +72,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
@@ -561,25 +558,19 @@ public class ParDoTranslation {
     ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
 
     WindowingStrategy<?, ?> windowingStrategy = pCollection.getWindowingStrategy().fixDefaults();
-    Coder<Iterable<WindowedValue<?>>> coder =
-        (Coder)
-            IterableCoder.of(
-                FullWindowedValueCoder.of(
-                    pCollection.getCoder(),
-                    pCollection.getWindowingStrategy().getWindowFn().windowCoder()));
     checkArgument(
-        sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
+        sideInput.getAccessPattern().getUrn().equals(Materializations.MULTIMAP_MATERIALIZATION_URN),
         "Unknown View Materialization URN %s",
         sideInput.getAccessPattern().getUrn());
 
     PCollectionView<?> view =
         new RunnerPCollectionView<>(
             pCollection,
-            (TupleTag<Iterable<WindowedValue<?>>>) tag,
-            (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
+            (TupleTag) tag,
+            (ViewFn) viewFn,
             windowMappingFn,
             windowingStrategy,
-            coder);
+            (Coder) pCollection.getCoder());
     return view;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
index df659a8..690e3ca 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
@@ -23,12 +23,14 @@ import static org.junit.Assert.assertThat;
 import com.google.common.collect.ImmutableList;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PCollectionViews;
@@ -63,12 +65,11 @@ public class CreatePCollectionViewTranslationTest {
                   testPCollection.getWindowingStrategy(),
                   false,
                   null,
-                  testPCollection.getCoder())),
+                  StringUtf8Coder.of())),
           CreatePCollectionView.of(
               PCollectionViews.listView(
                   testPCollection,
-                  testPCollection.getWindowingStrategy(),
-                  testPCollection.getCoder())));
+                  testPCollection.getWindowingStrategy())));
     }
 
     @Parameter(0)
@@ -76,7 +77,8 @@ public class CreatePCollectionViewTranslationTest {
 
     public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
-    private static final PCollection<String> testPCollection = p.apply(Create.of("one"));
+    private static final PCollection<KV<Void, String>> testPCollection =
+        p.apply(Create.of(KV.of((Void) null, "one")));
 
     @Test
     public void testEncodedProto() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 324e38d..c2dab4c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -54,8 +54,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -67,7 +65,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -376,9 +373,8 @@ public class PTransformMatchersTest implements Serializable {
   @Test
   public void createViewWithViewFn() {
     PCollection<Integer> input = p.apply(Create.of(1));
-    PCollectionView<Iterable<Integer>> view =
-        PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-    ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn = view.getViewFn();
+    PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
+    ViewFn<?, ?> viewFn = view.getViewFn();
     CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
 
     PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
@@ -388,23 +384,10 @@ public class PTransformMatchersTest implements Serializable {
   @Test
   public void createViewWithViewFnDifferentViewFn() {
     PCollection<Integer> input = p.apply(Create.of(1));
-    PCollectionView<Iterable<Integer>> view =
-        PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-    ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn =
-        new ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>>() {
-          @Override
-          public Materialization<Iterable<WindowedValue<?>>> getMaterialization() {
-            @SuppressWarnings({"rawtypes", "unchecked"})
-            Materialization<Iterable<WindowedValue<?>>> materialization =
-                (Materialization) Materializations.iterable();
-            return materialization;
-          }
-
-          @Override
-          public Iterable<Integer> apply(Iterable<WindowedValue<?>> contents) {
-            return Collections.emptyList();
-          }
-        };
+    PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
+
+    // Purposely create a subclass to get a different class then what was expected.
+    ViewFn<?, ?> viewFn = new PCollectionViews.IterableViewFn() {};
     CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
 
     PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
@@ -414,9 +397,7 @@ public class PTransformMatchersTest implements Serializable {
   @Test
   public void createViewWithViewFnNotCreatePCollectionView() {
     PCollection<Integer> input = p.apply(Create.of(1));
-    PCollectionView<Iterable<Integer>> view =
-        PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-
+    PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
     PTransformMatcher matcher =
         PTransformMatchers.createViewWithViewFn(view.getViewFn().getClass());
     assertThat(matcher.matches(getAppliedTransform(View.asIterable())), is(false));

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index b79947e..83594f1 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -166,7 +167,8 @@ public class ParDoTranslationTest {
                 view.getPCollection(),
                 protoTransform,
                 rehydratedComponents);
-        assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
+        assertThat(restoredView.getTagInternal(),
+            Matchers.<TupleTag<?>>equalTo(view.getTagInternal()));
         assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
         assertThat(
             restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
@@ -174,7 +176,8 @@ public class ParDoTranslationTest {
             restoredView.getWindowingStrategyInternal(),
             Matchers.<WindowingStrategy<?, ?>>equalTo(
                 view.getWindowingStrategyInternal().fixDefaults()));
-        assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+        assertThat(restoredView.getCoderInternal(),
+            Matchers.<Coder<?>>equalTo(view.getCoderInternal()));
       }
       String mainInputId = sdkComponents.registerPCollection(mainInput);
       assertThat(

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
new file mode 100644
index 0000000..b451547
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+  */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * An in-memory representation of {@link MultimapView}.
+ */
+public class InMemoryMultimapSideInputView<K, V> implements Materializations.MultimapView<K, V> {
+
+  /**
+   * Creates a {@link MultimapView} from the provided values. The provided {@link Coder} is used
+   * to guarantee structural equality for keys instead of assuming Java object equality.
+   */
+  public static <K, V> MultimapView<K, V> fromIterable(
+      Coder<K> keyCoder, Iterable<KV<K, V>> values) {
+    // We specifically use an array list multimap to allow for:
+    //  * null keys
+    //  * null values
+    //  * duplicate values
+    Multimap<Object, Object> multimap = ArrayListMultimap.create();
+    for (KV<K, V> value : values) {
+      multimap.put(keyCoder.structuralValue(value.getKey()), value.getValue());
+    }
+    return new InMemoryMultimapSideInputView(keyCoder, Multimaps.unmodifiableMultimap(multimap));
+  }
+
+  private final Coder<K> keyCoder;
+  private final Multimap<Object, V> structuralKeyToValuesMap;
+
+  private InMemoryMultimapSideInputView(Coder<K> keyCoder, Multimap<Object, V> data) {
+    this.keyCoder = keyCoder;
+    this.structuralKeyToValuesMap = data;
+  }
+
+  @Override
+  public Iterable<V> get(K k) {
+    return structuralKeyToValuesMap.get(keyCoder.structuralValue(k));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 3b37702..3ff4c94 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -17,22 +17,28 @@
  */
 package org.apache.beam.runners.core;
 
-import java.util.ArrayList;
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
@@ -58,7 +64,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
 
   /**
    * State internals that are scoped not to the key of a value but are global. The state can still
-   * be keep locally but if side inputs are broadcast to all parallel operators then all will
+   * be kept locally but if side inputs are broadcast to all parallel operators then all will
    * have the same view of the state.
    */
   private final StateInternals stateInternals;
@@ -80,7 +86,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
    */
   private final Map<
       PCollectionView<?>,
-      StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
+      StateTag<ValueState<Iterable<?>>>> sideInputContentsTags;
 
   /**
    * Creates a new {@code SideInputHandler} for the given side inputs that uses
@@ -94,7 +100,15 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
     this.availableWindowsTags = new HashMap<>();
     this.sideInputContentsTags = new HashMap<>();
 
-    for (PCollectionView<?> sideInput: sideInputs) {
+    for (PCollectionView<?> sideInput : sideInputs) {
+      checkArgument(
+          Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+              sideInput.getViewFn().getMaterialization().getUrn()),
+          "This handler is only capable of dealing with %s materializations "
+              + "but was asked to handle %s for PCollectionView with tag %s.",
+          Materializations.MULTIMAP_MATERIALIZATION_URN,
+          sideInput.getViewFn().getMaterialization().getUrn(),
+          sideInput.getTagInternal().getId());
 
       @SuppressWarnings("unchecked")
       Coder<BoundedWindow> windowCoder =
@@ -114,9 +128,9 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
 
       availableWindowsTags.put(sideInput, availableTag);
 
-      Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
-      StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
-          StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder);
+      StateTag<ValueState<Iterable<?>>> stateTag =
+          StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(),
+              (Coder) IterableCoder.of(sideInput.getCoderInternal()));
       sideInputContentsTags.put(sideInput, stateTag);
     }
   }
@@ -129,7 +143,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
   public void addSideInputValue(
       PCollectionView<?> sideInput,
       WindowedValue<Iterable<?>> value) {
-
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> windowCoder =
         (Coder<BoundedWindow>) sideInput
@@ -137,19 +150,13 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
             .getWindowFn()
             .windowCoder();
 
-    // reify the WindowedValue
-    List<WindowedValue<?>> inputWithReifiedWindows = new ArrayList<>();
-    for (Object e: value.getValue()) {
-      inputWithReifiedWindows.add(value.withValue(e));
-    }
-
-    StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
+    StateTag<ValueState<Iterable<?>>> stateTag =
         sideInputContentsTags.get(sideInput);
 
-    for (BoundedWindow window: value.getWindows()) {
+    for (BoundedWindow window : value.getWindows()) {
       stateInternals
           .state(StateNamespaces.window(windowCoder, window), stateTag)
-          .write(inputWithReifiedWindows);
+          .write(value.getValue());
 
       stateInternals
           .state(StateNamespaces.global(), availableWindowsTags.get(sideInput))
@@ -159,28 +166,32 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
 
   @Nullable
   @Override
-  public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
-
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> windowCoder =
-        (Coder<BoundedWindow>) sideInput
+        (Coder<BoundedWindow>) view
             .getWindowingStrategyInternal()
             .getWindowFn()
             .windowCoder();
 
-    StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
-        sideInputContentsTags.get(sideInput);
+    StateTag<ValueState<Iterable<?>>> stateTag =
+        sideInputContentsTags.get(view);
 
-    ValueState<Iterable<WindowedValue<?>>> state =
+    ValueState<Iterable<?>> state =
         stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag);
 
-    @Nullable Iterable<WindowedValue<?>> elements = state.read();
+    // TODO: Add support for choosing which representation is contained based upon the
+    // side input materialization. We currently can assume that we always have a multimap
+    // materialization as that is the only supported type within the Java SDK.
+    @Nullable Iterable<KV<?, ?>> elements = (Iterable<KV<?, ?>>) state.read();
 
     if (elements == null) {
       elements = Collections.emptyList();
     }
 
-    return sideInput.getViewFn().apply(elements);
+    ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+    Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+    return viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
new file mode 100644
index 0000000..6840355
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link InMemoryMultimapSideInputView}. */
+@RunWith(JUnit4.class)
+public class InMemoryMultimapSideInputViewTest {
+  @Test
+  public void testStructuralKeyEquality() {
+    MultimapView<byte[], Integer> view = InMemoryMultimapSideInputView.fromIterable(
+        ByteArrayCoder.of(),
+        ImmutableList.of(KV.of(new byte[]{ 0x00 }, 0), KV.of(new byte[]{ 0x01 }, 1)));
+    assertEquals(view.get(new byte[]{ 0x00 }), ImmutableList.of(0));
+    assertEquals(view.get(new byte[]{ 0x01 }), ImmutableList.of(1));
+    assertEquals(view.get(new byte[]{ 0x02 }), ImmutableList.of());
+  }
+
+  @Test
+  public void testValueGrouping() {
+    MultimapView<String, String> view = InMemoryMultimapSideInputView.fromIterable(
+        StringUtf8Coder.of(),
+        ImmutableList.of(KV.of("A", "a1"), KV.of("A", "a2"), KV.of("B", "b1")));
+    assertEquals(view.get("A"), ImmutableList.of("a1", "a2"));
+    assertEquals(view.get("B"), ImmutableList.of("b1"));
+    assertEquals(view.get("C"), ImmutableList.of());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
index f9e0aaf..7cbd1b0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
@@ -17,24 +17,28 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -47,26 +51,19 @@ public class SideInputHandlerTest {
 
   private static final long WINDOW_MSECS_1 = 100;
   private static final long WINDOW_MSECS_2 = 500;
-
-  private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
-      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
-
-  private PCollectionView<Iterable<String>> view1 =
-      PCollectionViewTesting.testingView(
-          new TupleTag<Iterable<WindowedValue<String>>>() {},
-          new PCollectionViewTesting.IdentityViewFn<String>(),
-          StringUtf8Coder.of(),
-          windowingStrategy1);
-
-  private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
-      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
-
-  private PCollectionView<Iterable<String>> view2 =
-      PCollectionViewTesting.testingView(
-          new TupleTag<Iterable<WindowedValue<String>>>() {},
-          new PCollectionViewTesting.IdentityViewFn<String>(),
-          StringUtf8Coder.of(),
-          windowingStrategy2);
+  private PCollectionView<Iterable<String>> view1;
+  private PCollectionView<Iterable<String>> view2;
+
+  @Before
+  public void setUp() {
+    PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
+    view1 = pc
+        .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
+        .apply(View.<String>asIterable());
+    view2 = pc
+        .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
+        .apply(View.<String>asIterable());
+  }
 
   @Test
   public void testIsEmpty() {
@@ -113,7 +110,9 @@ public class SideInputHandlerTest {
     // add a value for view1
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+        valuesInWindow(
+            materializeValuesFor(View.asIterable(), "Hello"),
+            new Instant(0), firstWindow));
 
     // now side input should be ready
     assertTrue(sideInputHandler.isReady(view1, firstWindow));
@@ -139,16 +138,20 @@ public class SideInputHandlerTest {
     // add a first value for view1
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), window));
+        valuesInWindow(
+            materializeValuesFor(View.asIterable(), "Hello"),
+            new Instant(0), window));
 
-    Assert.assertThat(sideInputHandler.get(view1, window), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, window), contains("Hello"));
 
     // subsequent values should replace existing values
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Ciao", "Buongiorno"), new Instant(0), window));
+        valuesInWindow(
+            materializeValuesFor(View.asIterable(), "Ciao", "Buongiorno"),
+            new Instant(0), window));
 
-    Assert.assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
+    assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
   }
 
   @Test
@@ -166,19 +169,21 @@ public class SideInputHandlerTest {
     // add a first value for view1 in the first window
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+        valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"),
+            new Instant(0), firstWindow));
 
-    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
 
     // add something for second window of view1
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Arrivederci"), new Instant(0), secondWindow));
+        valuesInWindow(materializeValuesFor(View.asIterable(), "Arrivederci"),
+    new Instant(0), secondWindow));
 
-    Assert.assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
+    assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
 
     // contents for first window should be unaffected
-    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
   }
 
   @Test
@@ -194,9 +199,10 @@ public class SideInputHandlerTest {
     // add value for view1 in the first window
     sideInputHandler.addSideInputValue(
         view1,
-        valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+        valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"),
+            new Instant(0), firstWindow));
 
-    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
 
     // view2 should not have any data
     assertFalse(sideInputHandler.isReady(view2, firstWindow));
@@ -204,18 +210,19 @@ public class SideInputHandlerTest {
     // also add some data for view2
     sideInputHandler.addSideInputValue(
         view2,
-        valuesInWindow(ImmutableList.of("Salut"), new Instant(0), firstWindow));
+        valuesInWindow(materializeValuesFor(View.asIterable(), "Salut"),
+            new Instant(0), firstWindow));
 
     assertTrue(sideInputHandler.isReady(view2, firstWindow));
-    Assert.assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
+    assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
 
     // view1 should not be affected by that
-    Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+    assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   private WindowedValue<Iterable<?>> valuesInWindow(
-      Iterable<?> values, Instant timestamp, BoundedWindow window) {
+      List<Object> values, Instant timestamp, BoundedWindow window) {
     return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
index 43da92f..ea8f168 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
@@ -35,11 +36,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
 import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
@@ -60,6 +68,16 @@ class SideInputContainer {
    */
   public static SideInputContainer create(
       final EvaluationContext context, Collection<PCollectionView<?>> containedViews) {
+    for (PCollectionView<?> pCollectionView : containedViews) {
+      checkArgument(
+          Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+              pCollectionView.getViewFn().getMaterialization().getUrn()),
+          "This handler is only capable of dealing with %s materializations "
+              + "but was asked to handle %s for PCollectionView with tag %s.",
+          Materializations.MULTIMAP_MATERIALIZATION_URN,
+          pCollectionView.getViewFn().getMaterialization().getUrn(),
+          pCollectionView.getTagInternal().getId());
+    }
     LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
         viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
     return new SideInputContainer(containedViews, viewByWindows);
@@ -239,11 +257,21 @@ class SideInputContainer {
           "calling get() on PCollectionView %s that is not ready in window %s",
           view,
           window);
-      // Safe covariant cast
-      @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
-          (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
-              window)).get();
-      return view.getViewFn().apply(values);
+      // Safe covariant cast since we know that the view only contains KVs.
+      @SuppressWarnings("unchecked") Iterable<KV<?, ?>> elements = Iterables.transform(
+          (Iterable<WindowedValue<KV<?, ?>>>) viewContents.getUnchecked(
+              PCollectionViewWindow.of(view, window)).get(),
+          new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+            @Override
+            public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+              return windowedValue.getValue();
+            }
+          });
+
+      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+      Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+      return viewFn.apply(
+          InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index cc9ce60..0a1ffe7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
@@ -28,7 +29,6 @@ import static org.junit.Assert.assertThat;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.SideInputReader;
@@ -41,8 +41,10 @@ import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -126,33 +128,47 @@ public class EvaluationContextTest {
 
   @Test
   public void writeToViewWriterThenReadReads() {
-    PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
+    PCollectionViewWriter<?, Iterable<Integer>> viewWriter =
         context.createPCollectionViewWriter(
             PCollection.createPrimitiveOutputInternal(
                 p,
                 WindowingStrategy.globalDefault(),
                 IsBounded.BOUNDED,
-                IterableCoder.of(VarIntCoder.of())),
+                IterableCoder.of(KvCoder.of(VoidCoder.of(), VarIntCoder.of()))),
             view);
     BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
     BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
-    WindowedValue<Integer> firstValue =
-        WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Integer> secondValue =
-        WindowedValue.of(
-            2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0));
-    Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue);
-    viewWriter.add(values);
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asIterable(), 1)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(1222),
+          window,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asIterable(), 2)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(8766L),
+          second,
+          PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
+    }
+    viewWriter.add((Iterable) valuesBuilder.build());
 
     SideInputReader reader =
         context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
     assertThat(reader.get(view, window), containsInAnyOrder(1));
     assertThat(reader.get(view, second), containsInAnyOrder(2));
 
-    WindowedValue<Integer> overrittenSecondValue =
-        WindowedValue.of(
-            4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
-    viewWriter.add(Collections.singleton(overrittenSecondValue));
+    ImmutableList.Builder<WindowedValue<?>> overwrittenValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asIterable(), 4444)) {
+      overwrittenValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(8677L),
+          second,
+          PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
+    }
+    viewWriter.add((Iterable) overwrittenValuesBuilder.build());
     assertThat(reader.get(view, second), containsInAnyOrder(2));
     // The cached value is served in the earlier reader
     reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index 5e7c799..91255e0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
@@ -34,8 +35,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Mean;
@@ -49,9 +48,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
@@ -134,13 +131,22 @@ public class SideInputContainerTest {
 
   @Test
   public void getAfterWriteReturnsPaneInWindow() throws Exception {
-    WindowedValue<KV<String, Integer>> one =
-        WindowedValue.of(
-            KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<KV<String, Integer>> two =
-        WindowedValue.of(
-            KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(1L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(20L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(mapView, valuesBuilder.build());
 
     Map<String, Integer> viewContents =
         container
@@ -153,19 +159,22 @@ public class SideInputContainerTest {
 
   @Test
   public void getReturnsLatestPaneInWindow() throws Exception {
-    WindowedValue<KV<String, Integer>> one =
-        WindowedValue.of(
-            KV.of("one", 1),
-            new Instant(1L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    WindowedValue<KV<String, Integer>> two =
-        WindowedValue.of(
-            KV.of("two", 2),
-            new Instant(20L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(1L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(true, false, Timing.EARLY)));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(20L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(true, false, Timing.EARLY)));
+    }
+    container.write(mapView, valuesBuilder.build());
 
     Map<String, Integer> viewContents =
         container
@@ -175,13 +184,15 @@ public class SideInputContainerTest {
     assertThat(viewContents, hasEntry("two", 2));
     assertThat(viewContents.size(), is(2));
 
-    WindowedValue<KV<String, Integer>> three =
-        WindowedValue.of(
-            KV.of("three", 3),
-            new Instant(300L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
+    ImmutableList.Builder<WindowedValue<?>> overwriteValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("three", 3))) {
+      overwriteValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(300L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
+    }
+    container.write(mapView, overwriteValuesBuilder.build());
 
     Map<String, Integer> overwrittenViewContents =
         container
@@ -209,10 +220,7 @@ public class SideInputContainerTest {
     PCollection<KV<String, String>> input =
         pipeline.apply(Create.empty(new TypeDescriptor<KV<String, String>>() {}));
     PCollectionView<Map<String, Iterable<String>>> newView =
-        PCollectionViews.multimapView(
-            input,
-            WindowingStrategy.globalDefault(),
-            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+        input.apply(View.<String, String>asMultimap());
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("unknown views");
@@ -232,19 +240,22 @@ public class SideInputContainerTest {
 
   @Test
   public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
-    WindowedValue<Double> firstWindowedValue =
-        WindowedValue.of(
-            2.875,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            FIRST_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Double> secondWindowedValue =
-        WindowedValue.of(
-            4.125,
-            SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
-            SECOND_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          FIRST_WINDOW.maxTimestamp().minus(200L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asSingleton(), 4.125)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
+          SECOND_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(singletonView, valuesBuilder.build());
     assertThat(
         container
             .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
@@ -259,20 +270,15 @@ public class SideInputContainerTest {
 
   @Test
   public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
-    WindowedValue<Integer> firstValue =
-        WindowedValue.of(
-            44,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            FIRST_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Integer> secondValue =
-        WindowedValue.of(
-            44,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            FIRST_WINDOW,
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-
-    container.write(iterableView, ImmutableList.of(firstValue, secondValue));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asIterable(), 44, 44)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          FIRST_WINDOW.maxTimestamp().minus(200L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(iterableView, valuesBuilder.build());
 
     assertThat(
         container
@@ -283,13 +289,15 @@ public class SideInputContainerTest {
 
   @Test
   public void writeForElementInMultipleWindowsSucceeds() throws Exception {
-    WindowedValue<Double> multiWindowedValue =
-        WindowedValue.of(
-            2.875,
-            FIRST_WINDOW.maxTimestamp().minus(200L),
-            ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
-            PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    container.write(singletonView, ImmutableList.of(multiWindowedValue));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          FIRST_WINDOW.maxTimestamp().minus(200L),
+          ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(singletonView, valuesBuilder.build());
     assertThat(
         container
             .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
@@ -304,19 +312,22 @@ public class SideInputContainerTest {
 
   @Test
   public void finishDoesNotOverwriteWrittenElements() throws Exception {
-    WindowedValue<KV<String, Integer>> one =
-        WindowedValue.of(
-            KV.of("one", 1),
-            new Instant(1L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    WindowedValue<KV<String, Integer>> two =
-        WindowedValue.of(
-            KV.of("two", 2),
-            new Instant(20L),
-            SECOND_WINDOW,
-            PaneInfo.createPane(true, false, Timing.EARLY));
-    container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+    ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(1L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(true, false, Timing.EARLY)));
+    }
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+      valuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          new Instant(20L),
+          SECOND_WINDOW,
+          PaneInfo.createPane(true, false, Timing.EARLY)));
+    }
+    container.write(mapView, valuesBuilder.build());
 
     immediatelyInvokeCallback(mapView, SECOND_WINDOW);
 
@@ -362,14 +373,15 @@ public class SideInputContainerTest {
    */
   @Test
   public void isReadyForSomeNotReadyViewsFalseUntilElements() {
-    container.write(
-        mapView,
-        ImmutableList.of(
-            WindowedValue.of(
-                KV.of("one", 1),
-                SECOND_WINDOW.maxTimestamp().minus(100L),
-                SECOND_WINDOW,
-                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    ImmutableList.Builder<WindowedValue<?>> mapValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+      mapValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          SECOND_WINDOW.maxTimestamp().minus(100L),
+          SECOND_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(mapView, mapValuesBuilder.build());
 
     ReadyCheckingSideInputReader reader =
         container.createReaderForViews(ImmutableList.of(mapView, singletonView));
@@ -378,25 +390,27 @@ public class SideInputContainerTest {
 
     assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
 
-    container.write(
-        mapView,
-        ImmutableList.of(
-            WindowedValue.of(
-                KV.of("too", 2),
-                FIRST_WINDOW.maxTimestamp().minus(100L),
-                FIRST_WINDOW,
-                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    ImmutableList.Builder<WindowedValue<?>> newMapValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("too", 2))) {
+      newMapValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          FIRST_WINDOW.maxTimestamp().minus(100L),
+          FIRST_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(mapView, newMapValuesBuilder.build());
     // Cached value is false
     assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
 
-    container.write(
-        singletonView,
-        ImmutableList.of(
-            WindowedValue.of(
-                1.25,
-                SECOND_WINDOW.maxTimestamp().minus(100L),
-                SECOND_WINDOW,
-                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    ImmutableList.Builder<WindowedValue<?>> singletonValuesBuilder = ImmutableList.builder();
+    for (Object materializedValue : materializeValuesFor(View.asSingleton(), 1.25)) {
+      singletonValuesBuilder.add(WindowedValue.of(
+          materializedValue,
+          SECOND_WINDOW.maxTimestamp().minus(100L),
+          SECOND_WINDOW,
+          PaneInfo.ON_TIME_AND_ONLY_FIRING));
+    }
+    container.write(singletonView, singletonValuesBuilder.build());
     assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
     assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 5bc48b7..3716ec8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -32,11 +32,11 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -56,10 +56,7 @@ public class ViewEvaluatorFactoryTest {
   @Test
   public void testInMemoryEvaluator() throws Exception {
     PCollection<String> input = p.apply(Create.of("foo", "bar"));
-    CreatePCollectionView<String, Iterable<String>> createView =
-        CreatePCollectionView.of(
-            PCollectionViews.iterableView(
-                input, input.getWindowingStrategy(), StringUtf8Coder.of()));
+    PCollectionView<Iterable<String>> pCollectionView = input.apply(View.<String>asIterable());
     PCollection<Iterable<String>> concat =
         input.apply(WithKeys.<Void, String>of((Void) null))
             .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
@@ -67,11 +64,11 @@ public class ViewEvaluatorFactoryTest {
             .apply(Values.<Iterable<String>>create());
     PCollection<Iterable<String>> view =
         concat.apply(
-            new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
+            new ViewOverrideFactory.WriteView<String, Iterable<String>>(pCollectionView));
 
     EvaluationContext context = mock(EvaluationContext.class);
     TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
-    when(context.createPCollectionViewWriter(concat, createView.getView())).thenReturn(viewWriter);
+    when(context.createPCollectionViewWriter(concat, pCollectionView)).thenReturn(viewWriter);
 
     CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now());
     AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view);

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 94d8d70..556cac5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -34,13 +34,13 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,8 +59,7 @@ public class ViewOverrideFactoryTest implements Serializable {
   @Test
   public void replacementGetViewReturnsOriginal() {
     final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
-    final PCollectionView<List<Integer>> view =
-        PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
+    final PCollectionView<List<Integer>> view = ints.apply(View.<Integer>asList());
     PTransformReplacement<PCollection<Integer>, PCollection<Integer>> replacement =
         factory.getReplacementTransform(
             AppliedPTransform
@@ -89,7 +88,7 @@ public class ViewOverrideFactoryTest implements Serializable {
               // so not asserted one way or the other
               assertThat(
                   replacementView.getTagInternal(),
-                  equalTo(view.getTagInternal()));
+                  equalTo((TupleTag) view.getTagInternal()));
               assertThat(
                   replacementView.getViewFn(),
                   Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 79a23cc..cffcc5a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -38,7 +38,6 @@ import java.util.List;
 import java.util.UUID;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
-import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
@@ -56,15 +55,14 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -217,9 +215,8 @@ public class WriteWithShardingFactoryTest implements Serializable {
   public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
     long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3;
     PCollection<Long> inputCount = p.apply(Create.of(countValue));
-    PCollectionView<Long> elementCountView =
-        PCollectionViews.singletonView(
-            inputCount, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+    PCollectionView<Long> elementCountView = inputCount.apply(
+        View.<Long>asSingleton().withDefaultValue(countValue));
     CalculateShardsFn fn = new CalculateShardsFn(3);
     DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index cec01f8..aa5cc39 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -307,7 +307,6 @@ class FlinkStreamingTransformTranslators {
       intToViewMapping.put(count, sideInput);
       tagToIntMapping.put(tag, count);
       count++;
-      Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
     }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
index f275290..fb3f375 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Collections;
@@ -24,8 +25,10 @@ import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -35,6 +38,13 @@ import org.apache.flink.api.common.functions.RuntimeContext;
  * A {@link SideInputReader} for the Flink Batch Runner.
  */
 public class FlinkSideInputReader implements SideInputReader {
+  /** A {@link MultimapView} which always returns an empty iterable. */
+  private static final MultimapView EMPTY_MULTMAP_VIEW = new MultimapView() {
+    @Override
+    public Iterable get(Object o) {
+      return Collections.EMPTY_LIST;
+    }
+  };
 
   private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
 
@@ -42,6 +52,16 @@ public class FlinkSideInputReader implements SideInputReader {
 
   public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView,
                               RuntimeContext runtimeContext) {
+    for (PCollectionView<?> view : indexByView.keySet()) {
+      checkArgument(
+          Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+              view.getViewFn().getMaterialization().getUrn()),
+          "This handler is only capable of dealing with %s materializations "
+              + "but was asked to handle %s for PCollectionView with tag %s.",
+          Materializations.MULTIMAP_MATERIALIZATION_URN,
+          view.getViewFn().getMaterialization().getUrn(),
+          view.getTagInternal().getId());
+    }
     sideInputs = new HashMap<>();
     for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) {
       sideInputs.put(entry.getKey().getTagInternal(), entry.getValue());
@@ -53,7 +73,7 @@ public class FlinkSideInputReader implements SideInputReader {
   @Override
   public <T> T get(PCollectionView<T> view, BoundedWindow window) {
     checkNotNull(view, "View passed to sideInput cannot be null");
-    TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
+    TupleTag<?> tag = view.getTagInternal();
     checkNotNull(
         sideInputs.get(tag),
         "Side input for " + view + " not available.");
@@ -63,7 +83,8 @@ public class FlinkSideInputReader implements SideInputReader {
             tag.getId(), new SideInputInitializer<>(view));
     T result = sideInputs.get(window);
     if (result == null) {
-      result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+      result = viewFn.apply(EMPTY_MULTMAP_VIEW);
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
index 12222b4..782f72a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
@@ -17,12 +17,23 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 
@@ -30,24 +41,33 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
  * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map}
  * from window to side input.
  */
-public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
-    implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> {
+public class SideInputInitializer<ViewT>
+    implements BroadcastVariableInitializer<WindowedValue<?>, Map<BoundedWindow, ViewT>> {
 
   PCollectionView<ViewT> view;
 
   public SideInputInitializer(PCollectionView<ViewT> view) {
+    checkArgument(
+        Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+            view.getViewFn().getMaterialization().getUrn()),
+        "This handler is only capable of dealing with %s materializations "
+            + "but was asked to handle %s for PCollectionView with tag %s.",
+        Materializations.MULTIMAP_MATERIALIZATION_URN,
+        view.getViewFn().getMaterialization().getUrn(),
+        view.getTagInternal().getId());
     this.view = view;
   }
 
   @Override
   public Map<BoundedWindow, ViewT> initializeBroadcastVariable(
-      Iterable<WindowedValue<ElemT>> inputValues) {
+      Iterable<WindowedValue<?>> inputValues) {
 
     // first partition into windows
-    Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>();
-    for (WindowedValue<ElemT> value: inputValues) {
+    Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = new HashMap<>();
+    for (WindowedValue<KV<?, ?>> value
+        : (Iterable<WindowedValue<KV<?, ?>>>) (Iterable) inputValues) {
       for (BoundedWindow window: value.getWindows()) {
-        List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window);
+        List<WindowedValue<KV<?, ?>>> windowedValues = partitionedElements.get(window);
         if (windowedValues == null) {
           windowedValues = new ArrayList<>();
           partitionedElements.put(window, windowedValues);
@@ -58,14 +78,20 @@ public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
 
     Map<BoundedWindow, ViewT> resultMap = new HashMap<>();
 
-    for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements:
+    for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements:
         partitionedElements.entrySet()) {
 
-      @SuppressWarnings("unchecked")
-      Iterable<WindowedValue<?>> elementsIterable =
-          (List<WindowedValue<?>>) (List<?>) elements.getValue();
-
-      resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable));
+      ViewFn<MultimapView, ViewT> viewFn = (ViewFn<MultimapView, ViewT>) view.getViewFn();
+      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+      resultMap.put(elements.getKey(), viewFn.apply(InMemoryMultimapSideInputView.fromIterable(
+          keyCoder,
+          (Iterable) Iterables.transform(elements.getValue(),
+              new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+                @Override
+                public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+                  return windowedValue.getValue();
+                }
+              }))));
     }
 
     return resultMap;

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index ad17de8..33ac024f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -47,16 +48,19 @@ import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -71,6 +75,7 @@ import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.OutputTag;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -84,26 +89,19 @@ public class DoFnOperatorTest {
   // views and windows for testing side inputs
   private static final long WINDOW_MSECS_1 = 100;
   private static final long WINDOW_MSECS_2 = 500;
-
-  private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
-      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
-
-  private PCollectionView<Iterable<String>> view1 =
-      PCollectionViewTesting.testingView(
-          new TupleTag<Iterable<WindowedValue<String>>>() {},
-          new PCollectionViewTesting.IdentityViewFn<String>(),
-          StringUtf8Coder.of(),
-          windowingStrategy1);
-
-  private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
-      WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
-
-  private PCollectionView<Iterable<String>> view2 =
-      PCollectionViewTesting.testingView(
-          new TupleTag<Iterable<WindowedValue<String>>>() {},
-          new PCollectionViewTesting.IdentityViewFn<String>(),
-          StringUtf8Coder.of(),
-          windowingStrategy2);
+  private PCollectionView<Iterable<String>> view1;
+  private PCollectionView<Iterable<String>> view2;
+
+  @Before
+  public void setUp() {
+    PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
+    view1 = pc
+        .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
+        .apply(View.<String>asIterable());
+    view2 = pc
+        .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
+        .apply(View.<String>asIterable());
+  }
 
   @Test
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 81e7a97..cc43e27 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(9, steps.size());
+    assertEquals(10, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
-    Step collectionToSingletonStep = steps.get(8);
+    Step collectionToSingletonStep = steps.get(9);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 
@@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
-    assertEquals(3, steps.size());
+    assertEquals(4, steps.size());
 
     @SuppressWarnings("unchecked")
     List<Map<String, Object>> toIsmRecordOutputs =
-        (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
+        (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO);
     assertTrue(
         Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
 
 
-    Step collectionToSingletonStep = steps.get(2);
+    Step collectionToSingletonStep = steps.get(3);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7cb8628..68e3e3c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Combine;
@@ -527,7 +528,11 @@ public final class TransformTranslator {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
         PCollectionView<WriteT> output = transform.getView();
-        Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+        Coder<Iterable<WindowedValue<?>>> coderInternal =
+            (Coder) IterableCoder.of(
+                WindowedValue.getFullCoder(
+                    output.getCoderInternal(),
+                    output.getWindowingStrategyInternal().getWindowFn().windowCoder()));
 
         @SuppressWarnings("unchecked")
         Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;