You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/11/15 17:00:37 UTC

[3/7] beam git commit: [BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 6c91088..932ccd6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -20,11 +20,17 @@ package org.apache.beam.runners.spark.util;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -34,7 +40,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 
 
 /**
- * A {@link SideInputReader} for thw SparkRunner.
+ * A {@link SideInputReader} for the SparkRunner.
  */
 public class SparkSideInputReader implements SideInputReader {
   private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
@@ -60,26 +66,30 @@ public class SparkSideInputReader implements SideInputReader {
     //--- match the appropriate sideInput window.
     // a tag will point to all matching sideInputs, that is all windows.
     // now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
-    Iterable<WindowedValue<?>> availableSideInputs =
-        (Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue();
-    Iterable<WindowedValue<?>> sideInputForWindow =
-        Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
-          @Override
-          public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
-            if (sideInputCandidate == null) {
-              return false;
-            }
-            // first match of a sideInputWindow to the elementWindow is good enough.
-            for (BoundedWindow sideInputCandidateWindow: sideInputCandidate.getWindows()) {
-              if (sideInputCandidateWindow.equals(sideInputWindow)) {
-                return true;
+    Iterable<WindowedValue<KV<?, ?>>> availableSideInputs =
+        (Iterable<WindowedValue<KV<?, ?>>>) windowedBroadcastHelper.getValue().getValue();
+    Iterable<KV<?, ?>> sideInputForWindow =
+        Iterables.transform(
+            Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
+              @Override
+              public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
+                if (sideInputCandidate == null) {
+                  return false;
+                }
+                return Iterables.contains(sideInputCandidate.getWindows(), sideInputWindow);
               }
-            }
-            // no match found.
-            return false;
-          }
-        });
-    return view.getViewFn().apply(sideInputForWindow);
+            }),
+            new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+              @Override
+              public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+                return windowedValue.getValue();
+              }
+            });
+
+    ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+    Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+    return viewFn.apply(
+        InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) sideInputForWindow));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 3c5b55b..f86e9cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View.VoidKeyToMultimapMaterialization;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -1274,14 +1275,16 @@ public class Combine {
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined =
           input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
-      PCollectionView<OutputT> view =
-          PCollectionViews.singletonView(
-              combined,
+      PCollection<KV<Void, OutputT>> materializationInput =
+          combined.apply(new VoidKeyToMultimapMaterialization<OutputT>());
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          materializationInput,
               input.getWindowingStrategy(),
               insertDefault,
               insertDefault ? fn.defaultValue() : null,
-              combined.getCoder());
-      combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view));
+          combined.getCoder());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, OutputT>, OutputT>of(view));
       return view;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 6168710..d71f0fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -602,7 +601,24 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
           return windowValue;
         }
       }
-      return view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+      // Fallback to returning the default materialization if no data was supplied.
+      // This is really to support singleton views with default values.
+
+      // TODO: Update this to supply a materialization dependent on actual URN of materialization.
+      // Currently the SDK only supports the multimap materialization and it expects a
+      // mapping function.
+      checkState(Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+          view.getViewFn().getMaterialization().getUrn()),
+          "Only materializations of type %s supported, received %s",
+          Materializations.MULTIMAP_MATERIALIZATION_URN,
+          view.getViewFn().getMaterialization().getUrn());
+      return ((ViewFn<Materializations.MultimapView, T>) view.getViewFn()).apply(
+          new Materializations.MultimapView<Object, Object>() {
+            @Override
+            public Iterable<Object> get(Object o) {
+              return Collections.emptyList();
+            }
+          });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
index 6e4f83d..e606919 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.transforms;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
@@ -32,29 +31,37 @@ import org.apache.beam.sdk.util.WindowedValue;
 @Internal
 public class Materializations {
   /**
-   * The URN for a {@link Materialization} where the primitive view type is an iterable of fully
+   * The URN for a {@link Materialization} where the primitive view type is an multimap of fully
    * specified windowed values.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static final String ITERABLE_MATERIALIZATION_URN =
-      "urn:beam:sideinput:materialization:iterable:0.1";
+  public static final String MULTIMAP_MATERIALIZATION_URN =
+      "urn:beam:sideinput:materialization:multimap:0.1";
+
+  /**
+   * Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when it declares to
+   * use the {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap materialization}.
+   */
+  public interface MultimapView<K, V> {
+    Iterable<V> get(K k);
+  }
 
   /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
-   * <p>A {@link Materialization} where the primitive view type is an iterable of fully specified
-   * windowed values.
+   * <p>A {@link Materialization} where the primitive view type is a multimap with fully
+   * specified windowed keys.
    */
   @Internal
-  public static <T> Materialization<Iterable<WindowedValue<T>>> iterable() {
-    return new IterableMaterialization<>();
+  public static <K, V> Materialization<MultimapView<K, V>> multimap() {
+    return new MultimapMaterialization<>();
   }
 
-  private static class IterableMaterialization<T>
-      implements Materialization<Iterable<WindowedValue<T>>> {
+  private static class MultimapMaterialization<K, V>
+      implements Materialization<MultimapView<K, V>> {
     @Override
     public String getUrn() {
-      return ITERABLE_MATERIALIZATION_URN;
+      return MULTIMAP_MATERIALIZATION_URN;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index eaa7925..ec8233e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
@@ -258,9 +260,13 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<T, List<T>>of(view));
+      PCollection<KV<Void, T>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<T>());
+      PCollectionView<List<T>> view = PCollectionViews.listView(
+          materializationInput,
+          materializationInput.getWindowingStrategy());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, T>, List<T>>of(view));
       return view;
     }
   }
@@ -285,9 +291,13 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<T, Iterable<T>>of(view));
+      PCollection<KV<Void, T>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<T>());
+      PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
+          materializationInput,
+          materializationInput.getWindowingStrategy());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, T>, Iterable<T>>of(view));
       return view;
     }
   }
@@ -428,9 +438,13 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+      PCollection<KV<Void, KV<K, V>>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>());
+      PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView(
+          materializationInput,
+          materializationInput.getWindowingStrategy());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, Iterable<V>>>of(view));
       return view;
     }
   }
@@ -463,9 +477,13 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
+      PCollection<KV<Void, KV<K, V>>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>());
+      PCollectionView<Map<K, V>> view = PCollectionViews.mapView(
+          materializationInput,
+          materializationInput.getWindowingStrategy());
+      materializationInput.apply(
+          CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, V>>of(view));
       return view;
     }
   }
@@ -474,6 +492,31 @@ public class View {
   // Internal details below
 
   /**
+   * A {@link PTransform} which converts all values into {@link KV}s with {@link Void} keys.
+   *
+   * <p>TODO: Replace this materialization with specializations that optimize the various SDK
+   * requested views.
+   */
+  @Internal
+  static class VoidKeyToMultimapMaterialization<T>
+      extends PTransform<PCollection<T>, PCollection<KV<Void, T>>> {
+
+    private static class VoidKeyToMultimapMaterializationDoFn<T> extends DoFn<T, KV<Void, T>> {
+      @ProcessElement
+      public void processElement(ProcessContext ctxt) {
+        ctxt.output(KV.of((Void) null, ctxt.element()));
+      }
+    }
+
+    @Override
+    public PCollection<KV<Void, T>> expand(PCollection<T> input) {
+      PCollection output = input.apply(ParDo.of(new VoidKeyToMultimapMaterializationDoFn<>()));
+      output.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
+      return output;
+    }
+  }
+
+  /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
    * <p>Creates a primitive {@link PCollectionView}.

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
index d51a917..9291bc6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.values.PCollectionView;
  * {@link View#asIterable()}, and {@link View#asMap()} for more detail on specific views
  * available in the SDK.
  *
- * @param <PrimitiveViewT> the type of the underlying primitive view, provided by the runner
- *        {@code <ViewT>} the type of the value(s) accessible via this {@link PCollectionView}
+ * @param <PrimitiveViewT> the type of the underlying primitive view required
+ * @param <ViewT> the type of the value(s) accessible via this {@link PCollectionView}
  */
 @Internal
 public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {
@@ -49,5 +49,5 @@ public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {
   /**
    * A function to adapt a primitive view type to a desired view type.
    */
-  public abstract ViewT apply(PrimitiveViewT contents);
+  public abstract ViewT apply(PrimitiveViewT primitiveViewT);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
index 7d87412..c212c34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * A {@link PCollectionView PCollectionView&lt;T&gt;} is an immutable view of a {@link PCollection}
@@ -72,7 +71,7 @@ public interface PCollectionView<T> extends PValue, Serializable {
    */
   @Deprecated
   @Internal
-  TupleTag<Iterable<WindowedValue<?>>> getTagInternal();
+  TupleTag<?> getTagInternal();
 
   /**
    * <b>For internal use only.</b>
@@ -83,7 +82,7 @@ public interface PCollectionView<T> extends PValue, Serializable {
    */
   @Deprecated
   @Internal
-  ViewFn<Iterable<WindowedValue<?>>, T> getViewFn();
+  ViewFn<?, T> getViewFn();
 
   /**
    * <b>For internal use only.</b>
@@ -116,5 +115,5 @@ public interface PCollectionView<T> extends PValue, Serializable {
    */
   @Deprecated
   @Internal
-  Coder<Iterable<WindowedValue<?>>> getCoderInternal();
+  Coder<?> getCoderInternal();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index ed8fb76..30277f0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -17,14 +17,13 @@
  */
 package org.apache.beam.sdk.values;
 
-import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -36,16 +35,15 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * <b>For internal use only; no backwards compatibility guarantees.</b>
@@ -56,88 +54,79 @@ import org.apache.beam.sdk.util.WindowedValue;
 public class PCollectionViews {
 
   /**
-   * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided
-   * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<T>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    *
    * <p>If {@code hasDefault} is {@code true}, then the view will take on the value
    * {@code defaultValue} for any empty windows.
    */
   public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
-      PCollection<T> pCollection,
+      PCollection<KV<Void, T>> pCollection,
       WindowingStrategy<?, W> windowingStrategy,
       boolean hasDefault,
       @Nullable T defaultValue,
-      Coder<T> valueCoder) {
+      Coder<T> defaultValueCoder) {
     return new SimplePCollectionView<>(
         pCollection,
-        new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
+        new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
-   * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    */
   public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
-      PCollection<T> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<T> valueCoder) {
+      PCollection<KV<Void, T>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
         new IterableViewFn<T>(),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
-   * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    */
   public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
-      PCollection<T> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<T> valueCoder) {
+      PCollection<KV<Void, T>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
         new ListViewFn<T>(),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
-   * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the
-   * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    */
   public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
-      PCollection<KV<K, V>> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<KV<K, V>> valueCoder) {
+      PCollection<KV<Void, KV<K, V>>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
         new MapViewFn<K, V>(),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
-   * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded
-   * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+   * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements windowed
+   * using the provided {@link WindowingStrategy}.
    */
   public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
-      PCollection<KV<K, V>> pCollection,
-      WindowingStrategy<?, W> windowingStrategy,
-      Coder<KV<K, V>> valueCoder) {
+      PCollection<KV<Void, KV<K, V>>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
         new MultimapViewFn<K, V>(),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy,
-        valueCoder);
+        windowingStrategy);
   }
 
   /**
@@ -153,18 +142,15 @@ public class PCollectionViews {
   }
 
   /**
-   * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
+   * Implementation which is able to adapt a multimap materialization to a {@code T}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#singletonView}.
-   *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> {
+  public static class SingletonViewFn<T>
+      extends ViewFn<MultimapView<Void, T>, T> {
     @Nullable private byte[] encodedDefaultValue;
     @Nullable private transient T defaultValue;
     @Nullable private Coder<T> valueCoder;
@@ -204,9 +190,12 @@ public class PCollectionViews {
       }
       // Lazily decode the default value once
       synchronized (this) {
-        if (encodedDefaultValue != null && defaultValue == null) {
+        if (encodedDefaultValue != null) {
           try {
             defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
+            // Clear the encoded default value to free the reference once we have the object
+            // version. Also, this will guarantee that the value will only be decoded once.
+            encodedDefaultValue = null;
           } catch (IOException e) {
             throw new RuntimeException("Unexpected IOException: ", e);
           }
@@ -216,84 +205,67 @@ public class PCollectionViews {
     }
 
     @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, T>> getMaterialization() {
+      return Materializations.multimap();
     }
 
     @Override
-    public T apply(Iterable<WindowedValue<T>> contents) {
+    public T apply(MultimapView<Void, T> primitiveViewT) {
       try {
-        return Iterables.getOnlyElement(contents).getValue();
+        return Iterables.getOnlyElement(primitiveViewT.get(null));
       } catch (NoSuchElementException exc) {
         return getDefaultValue();
       } catch (IllegalArgumentException exc) {
         throw new IllegalArgumentException(
-            "PCollection with more than one element "
-                + "accessed as a singleton view.");
+            "PCollection with more than one element accessed as a singleton view.");
       }
     }
   }
 
   /**
-   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}.
+   * Implementation which is able to adapt a multimap materialization to a {@code Iterable<T>}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#iterableView}.
-   *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
   public static class IterableViewFn<T>
-      extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
+      extends ViewFn<MultimapView<Void, T>, Iterable<T>> {
+
     @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, T>> getMaterialization() {
+      return Materializations.multimap();
     }
 
     @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
-      return Iterables.unmodifiableIterable(
-          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-        @SuppressWarnings("unchecked")
-        @Override
-        public T apply(WindowedValue<T> input) {
-          return input.getValue();
-        }
-      }));
+    public Iterable<T> apply(MultimapView<Void, T> primitiveViewT) {
+      return Iterables.unmodifiableIterable(primitiveViewT.get(null));
     }
   }
 
   /**
-   * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}.
+   * Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
    *
    * <p>For internal use only.
    *
    * <p>Instantiate via {@link PCollectionViews#listView}.
-   *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> {
+  public static class ListViewFn<T>
+      extends ViewFn<MultimapView<Void, T>, List<T>> {
     @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, T>> getMaterialization() {
+      return Materializations.multimap();
     }
 
     @Override
-    public List<T> apply(Iterable<WindowedValue<T>> contents) {
-      return ImmutableList.copyOf(
-          Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-            @SuppressWarnings("unchecked")
-            @Override
-            public T apply(WindowedValue<T> input) {
-              return input.getValue();
+    public List<T> apply(MultimapView<Void, T> primitiveViewT) {
+      List<T> list = new ArrayList<>();
+      for (T t : primitiveViewT.get(null)) {
+        list.add(t);
             }
-          }));
+      return Collections.unmodifiableList(list);
     }
 
     @Override
@@ -308,27 +280,29 @@ public class PCollectionViews {
   }
 
   /**
-   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
-   * to {@code Map<K, Iterable<V>>}.
+   * Implementation which is able to adapt a multimap materialization to a
+   * {@code Map<K, Iterable<V>>}.
+   *
+   * <p>For internal use only.
    *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
+   * <p>Instantiate via {@link PCollectionViews#multimapView}.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
   public static class MultimapViewFn<K, V>
-      extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> {
+      extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, Iterable<V>>> {
     @Override
-    public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
+      return Materializations.multimap();
     }
 
     @Override
-    public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+    public Map<K, Iterable<V>> apply(
+        MultimapView<Void, KV<K, V>> primitiveViewT) {
+      // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
+      // using structural value equality.
       Multimap<K, V> multimap = HashMultimap.create();
-      for (WindowedValue<KV<K, V>> elem : elements) {
-        KV<K, V> kv = elem.getValue();
-        multimap.put(kv.getKey(), kv.getValue());
+      for (KV<K, V> elem : primitiveViewT.get(null)) {
+        multimap.put(elem.getKey(), elem.getValue());
       }
       // Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts
       @SuppressWarnings({"unchecked", "rawtypes"})
@@ -338,32 +312,31 @@ public class PCollectionViews {
   }
 
   /**
-   * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key to
-   * {@code Map<K, V>}.
+   * Implementation which is able to adapt a multimap materialization to a {@code Map<K, V>}.
+   *
+   * <p>For internal use only.
    *
-   * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
-   *     view type.
+   * <p>Instantiate via {@link PCollectionViews#mapView}.
    */
-  @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class MapViewFn<K, V> extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> {
+  public static class MapViewFn<K, V>
+      extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, V>> {
+
     @Override
-    public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
-      return Materializations.iterable();
+    public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
+      return Materializations.multimap();
     }
 
-    /**
-     * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}.
-     */
     @Override
-    public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+    public Map<K, V> apply(MultimapView<Void, KV<K, V>> primitiveViewT) {
+      // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
+      // using structural value equality.
       Map<K, V> map = new HashMap<>();
-      for (WindowedValue<KV<K, V>> elem : elements) {
-        KV<K, V> kv = elem.getValue();
-        if (map.containsKey(kv.getKey())) {
-          throw new IllegalArgumentException("Duplicate values for " + kv.getKey());
+      for (KV<K, V> elem : primitiveViewT.get(null)) {
+        if (map.containsKey(elem.getKey())) {
+          throw new IllegalArgumentException("Duplicate values for " + elem.getKey());
         }
-        map.put(kv.getKey(), kv.getValue());
+        map.put(elem.getKey(), elem.getValue());
       }
       return Collections.unmodifiableMap(map);
     }
@@ -375,14 +348,14 @@ public class PCollectionViews {
    *
    * <p>For internal use only.
    */
-  public static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow>
+  public static class SimplePCollectionView<ElemT, PrimitiveViewT, ViewT, W extends BoundedWindow>
       extends PValueBase
       implements PCollectionView<ViewT> {
     /** The {@link PCollection} this view was originally created from. */
     private transient PCollection<ElemT> pCollection;
 
     /** A unique tag for the view, typed according to the elements underlying the view. */
-    private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
+    private TupleTag<PrimitiveViewT> tag;
 
     private WindowMappingFn<W> windowMappingFn;
 
@@ -390,12 +363,12 @@ public class PCollectionViews {
     private WindowingStrategy<?, W> windowingStrategy;
 
     /** The coder for the elements underlying the view. */
-    private @Nullable Coder<Iterable<WindowedValue<ElemT>>> coder;
+    private @Nullable Coder<ElemT> coder;
 
     /**
      * The typed {@link ViewFn} for this view.
      */
-    private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
+    private ViewFn<PrimitiveViewT, ViewT> viewFn;
 
     /**
      * Call this constructor to initialize the fields for which this base class provides
@@ -403,11 +376,10 @@ public class PCollectionViews {
      */
     private SimplePCollectionView(
         PCollection<ElemT> pCollection,
-        TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+        TupleTag<PrimitiveViewT> tag,
+        ViewFn<PrimitiveViewT, ViewT> viewFn,
         WindowMappingFn<W> windowMappingFn,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<ElemT> valueCoder) {
+        WindowingStrategy<?, W> windowingStrategy) {
       super(pCollection.getPipeline());
       this.pCollection = pCollection;
       if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
@@ -417,9 +389,7 @@ public class PCollectionViews {
       this.tag = tag;
       this.windowingStrategy = windowingStrategy;
       this.viewFn = viewFn;
-      this.coder =
-          IterableCoder.of(WindowedValue.getFullCoder(
-              valueCoder, windowingStrategy.getWindowFn().windowCoder()));
+      this.coder = pCollection.getCoder();
     }
 
     /**
@@ -428,27 +398,20 @@ public class PCollectionViews {
      */
     private SimplePCollectionView(
         PCollection<ElemT> pCollection,
-        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+        ViewFn<PrimitiveViewT, ViewT> viewFn,
         WindowMappingFn<W> windowMappingFn,
-        WindowingStrategy<?, W> windowingStrategy,
-        Coder<ElemT> valueCoder) {
+        WindowingStrategy<?, W> windowingStrategy) {
       this(
           pCollection,
-          new TupleTag<Iterable<WindowedValue<ElemT>>>(),
+          new TupleTag<PrimitiveViewT>(),
           viewFn,
           windowMappingFn,
-          windowingStrategy,
-          valueCoder);
+          windowingStrategy);
     }
 
     @Override
-    public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
-      // Safe cast: it is required that the rest of the SDK maintain the invariant
-      // that a PCollectionView is only provided an iterable for the elements of an
-      // appropriately typed PCollection.
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
-      return untypedViewFn;
+    public ViewFn<PrimitiveViewT, ViewT> getViewFn() {
+      return viewFn;
     }
 
     @Override
@@ -467,13 +430,8 @@ public class PCollectionViews {
      * <p>For internal use only by runner implementors.
      */
     @Override
-    public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
-      // Safe cast: It is required that the rest of the SDK maintain the invariant that
-      // this tag is only used to access the contents of an appropriately typed underlying
-      // PCollection
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
-      return untypedTag;
+    public TupleTag<?> getTagInternal() {
+      return tag;
     }
 
     /**
@@ -488,12 +446,8 @@ public class PCollectionViews {
     }
 
     @Override
-    public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
-      // Safe cast: It is required that the rest of the SDK only use this untyped coder
-      // for the elements of an appropriately typed underlying PCollection.
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
-      return untypedCoder;
+    public Coder<?> getCoderInternal() {
+      return coder;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index aaf8b91..e7fd9b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -18,344 +18,57 @@
 
 package org.apache.beam.sdk.testing;
 
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
 
 /**
- * Methods for creating and using {@link PCollectionView} instances.
+ * Methods for testing {@link PCollectionView}s.
  */
 public final class PCollectionViewTesting {
-
-  // Do not instantiate; static methods only
-  private PCollectionViewTesting() { }
-
-  /**
-   * The length of the default window, which is an {@link IntervalWindow}, but kept encapsulated
-   * as it is not for the user to know what sort of window it is.
-   */
-  private static final long DEFAULT_WINDOW_MSECS = 1000 * 60 * 60;
-
-  /**
-   * A default windowing strategy. Tests that are not concerned with the windowing
-   * strategy should not specify it, and all views will use this.
-   */
-  public static final WindowingStrategy<?, ?> DEFAULT_WINDOWING_STRATEGY =
-      WindowingStrategy.of(FixedWindows.of(new Duration(DEFAULT_WINDOW_MSECS)));
-
-  /**
-   * A default window into which test elements will be placed, if the window is
-   * not explicitly overridden.
-   */
-  public static final BoundedWindow DEFAULT_NONEMPTY_WINDOW =
-      new IntervalWindow(new Instant(0), new Instant(DEFAULT_WINDOW_MSECS));
-
-  /**
-   * A timestamp in the {@link #DEFAULT_NONEMPTY_WINDOW}.
-   */
-  public static final Instant DEFAULT_TIMESTAMP = DEFAULT_NONEMPTY_WINDOW.maxTimestamp().minus(1);
-
-  /**
-   * A window into which no element will be placed by methods in this class, unless explicitly
-   * requested.
-   */
-  public static final BoundedWindow DEFAULT_EMPTY_WINDOW = new IntervalWindow(
-      DEFAULT_NONEMPTY_WINDOW.maxTimestamp(),
-      DEFAULT_NONEMPTY_WINDOW.maxTimestamp().plus(DEFAULT_WINDOW_MSECS));
-
-  /**
-   * A {@link ViewFn} that returns the provided contents as a fully lazy iterable.
-   */
-  public static class IdentityViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
-    @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
-      return Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
-        @Override
-        public T apply(WindowedValue<T> windowedValue) {
-          return windowedValue.getValue();
-        }
-      });
-    }
-  }
-
-  /**
-   * A {@link ViewFn} that traverses the whole iterable eagerly and returns the number of elements.
-   *
-   * <p>Only for use in testing scenarios with small collections. If there are more elements
-   * provided than {@code Integer.MAX_VALUE} then behavior is unpredictable.
-   */
-  public static class LengthViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Long> {
-    @Override
-    public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public Long apply(Iterable<WindowedValue<T>> contents) {
-      return (long) Iterables.size(contents);
-    }
-  }
-
-  /**
-   * A {@link ViewFn} that always returns the value with which it is instantiated.
-   */
-  public static class ConstantViewFn<ElemT, ViewT>
-      extends ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> {
-    private ViewT value;
-
-    public ConstantViewFn(ViewT value) {
-      this.value = value;
-    }
-
-    @Override
-    public Materialization<Iterable<WindowedValue<ElemT>>> getMaterialization() {
-      return Materializations.iterable();
-    }
-
-    @Override
-    public ViewT apply(Iterable<WindowedValue<ElemT>> contents) {
-      return value;
-    }
-  }
-
-  /**
-   * A {@link PCollectionView} explicitly built from a {@link TupleTag}
-   * and conversion {@link ViewFn}, and an element coder, using the
-   * {@link #DEFAULT_WINDOWING_STRATEGY}.
-   *
-   * <p>This method is only recommended for use by runner implementors to test their
-   * implementations. It is very easy to construct a {@link PCollectionView} that does
-   * not respect the invariants required for proper functioning.
-   *
-   * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
-   * values provided to the view during execution, results are unpredictable. It is recommended
-   * that the values be prepared via {@link #contentsInDefaultWindow}.
-   */
-  public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
-      TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-      ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-      Coder<ElemT> elemCoder,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return testingView(null, tag, viewFn, elemCoder, windowingStrategy);
-  }
-
-  /**
-   * The default {@link Coder} used for windowed values, given an element {@link Coder}.
-   */
-  public static <T> Coder<WindowedValue<T>> defaultWindowedValueCoder(Coder<T> elemCoder) {
-    return WindowedValue.getFullCoder(
-        elemCoder, DEFAULT_WINDOWING_STRATEGY.getWindowFn().windowCoder());
-  }
-
-  /**
-   * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
-   * WindowingStrategy}, {@link Coder}, and conversion function.
-   *
-   * <p>This method is only recommended for use by runner implementors to test their
-   * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
-   * the invariants required for proper functioning.
-   *
-   * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
-   * values provided to the view during execution, results are unpredictable.
-   */
-  public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
-      PCollection<ElemT> pCollection,
-      TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-      ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-      Coder<ElemT> elemCoder,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return testingView(
-        pCollection,
-        tag,
-        viewFn,
-        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        elemCoder,
-        windowingStrategy);
-  }
-
-  /**
-   * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
-   * WindowingStrategy}, {@link Coder}, {@link ViewFn} and {@link WindowMappingFn}.
-   *
-   * <p>This method is only recommended for use by runner implementors to test their
-   * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
-   * the invariants required for proper functioning.
-   *
-   * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
-   * values provided to the view during execution, results are unpredictable.
-   */
-  public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
-      PCollection<ElemT> pCollection,
-      TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-      ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-      WindowMappingFn<?> windowMappingFn,
-      Coder<ElemT> elemCoder,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return new PCollectionViewFromParts<>(
-        pCollection,
-        tag,
-        viewFn,
-        windowMappingFn,
-        windowingStrategy,
-        IterableCoder.of(
-            WindowedValue.getFullCoder(elemCoder, windowingStrategy.getWindowFn().windowCoder())));
-  }
-
-  /**
-   * Places the given {@code value} in the {@link #DEFAULT_NONEMPTY_WINDOW}.
-   */
-  public static <T> WindowedValue<T> valueInDefaultWindow(T value) {
-    return WindowedValue.of(value, DEFAULT_TIMESTAMP, DEFAULT_NONEMPTY_WINDOW, PaneInfo.NO_FIRING);
-  }
-
-  /**
-   * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input.
-   */
-  @SafeVarargs
-  public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(T... values)
-      throws Exception {
-    List<WindowedValue<T>> windowedValues = Lists.newArrayList();
-    for (T value : values) {
-      windowedValues.add(valueInDefaultWindow(value));
-    }
-    return windowedValues;
-  }
-
-  /**
-   * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input.
-   */
-  public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(Iterable<T> values)
-      throws Exception {
-    List<WindowedValue<T>> windowedValues = Lists.newArrayList();
-    for (T value : values) {
-      windowedValues.add(valueInDefaultWindow(value));
-    }
-    return windowedValues;
-  }
-
-  // Internal details below here
-
-  /**
-   * A {@link PCollectionView} explicitly built from its {@link TupleTag},
-   * {@link WindowingStrategy}, and conversion function.
-   *
-   * <p>Instantiate via {@link #testingView}.
-   */
-  private static class PCollectionViewFromParts<ElemT, ViewT>
-      extends PValueBase
-      implements PCollectionView<ViewT> {
-    private PCollection<ElemT> pCollection;
-    private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
-    private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
-    private WindowMappingFn<?> windowMappingFn;
-    private WindowingStrategy<?, ?> windowingStrategy;
-    private Coder<Iterable<WindowedValue<ElemT>>> coder;
-
-    public PCollectionViewFromParts(
-        PCollection<ElemT> pCollection,
-        TupleTag<Iterable<WindowedValue<ElemT>>> tag,
-        ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-        WindowMappingFn<?> windowMappingFn,
-        WindowingStrategy<?, ?> windowingStrategy,
-        Coder<Iterable<WindowedValue<ElemT>>> coder) {
-      this.pCollection = pCollection;
-      this.tag = tag;
-      this.viewFn = viewFn;
-      this.windowMappingFn = windowMappingFn;
-      this.windowingStrategy = windowingStrategy;
-      this.coder = coder;
-    }
-
-    @Override
-    public PCollection<?> getPCollection() {
-      return pCollection;
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
-      return (TupleTag) tag;
-    }
-
-    @Override
-    public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
-      // Safe cast; runners must maintain type safety
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
-      return untypedViewFn;
-    }
-
-    @Override
-    public WindowMappingFn<?> getWindowMappingFn() {
-      return windowMappingFn;
-    }
-
-    @Override
-    public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
-      return windowingStrategy;
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
-      return (Coder) coder;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(tag);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof PCollectionView)) {
-        return false;
+  public static List<Object> materializeValuesFor(
+      PTransform<?, ? extends PCollectionView<?>> viewTransformClass, Object ... values) {
+    List<Object> rval = new ArrayList<>();
+    // Currently all view materializations are the same where the data is shared underneath
+    // the void/null key. Once this changes, these materializations will differ but test code
+    // should not worry about what these look like if they are relying on the ViewFn to "undo"
+    // the conversion.
+    if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
       }
-      @SuppressWarnings("unchecked")
-      PCollectionView<?> otherView = (PCollectionView<?>) other;
-      return tag.equals(otherView.getTagInternal());
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("tag", tag)
-          .add("viewFn", viewFn)
-          .toString();
-    }
-
-    @Override
-    public Map<TupleTag<?>, PValue> expand() {
-      return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
-    }
+    } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
+      }
+    } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
+      }
+    } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
+      }
+    } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(KV.of(null, value));
+      }
+    } else {
+      throw new IllegalArgumentException(String.format(
+          "Unknown type of view %s. Supported views are %s.",
+          viewTransformClass.getClass(),
+          ImmutableSet.of(
+              View.AsSingleton.class,
+              View.AsIterable.class,
+              View.AsList.class,
+              View.AsMap.class,
+              View.AsMultimap.class)));
+    }
+    return Collections.unmodifiableList(rval);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 5cb9e18..cff6b2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -37,9 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -297,9 +295,8 @@ public class DoFnTesterTest {
   @Test
   public void fnWithSideInputDefault() throws Exception {
     PCollection<Integer> pCollection = p.apply(Create.empty(VarIntCoder.of()));
-    final PCollectionView<Integer> value =
-        PCollectionViews.singletonView(
-            pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+    final PCollectionView<Integer> value = pCollection.apply(
+        View.<Integer>asSingleton().withDefaultValue(0));
 
     try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
       tester.processElement(1);
@@ -313,9 +310,8 @@ public class DoFnTesterTest {
   @Test
   public void fnWithSideInputExplicit() throws Exception {
     PCollection<Integer> pCollection = p.apply(Create.of(-2));
-    final PCollectionView<Integer> value =
-        PCollectionViews.singletonView(
-            pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+    final PCollectionView<Integer> value = pCollection.apply(
+        View.<Integer>asSingleton().withDefaultValue(0));
 
     try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
       tester.setSideInput(value, GlobalWindow.INSTANCE, -2);

http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 1ccd5d6..7d20532 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -209,7 +209,7 @@ class BatchLoads<DestinationT>
     checkArgument(numFileShards > 0);
     Pipeline p = input.getPipeline();
     final PCollectionView<String> jobIdTokenView = createJobIdView(p);
-    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView);
     // The user-supplied triggeringDuration is often chosen to to control how many BigQuery load
     // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
     // is set to a large value, currently we have to buffer all the data unti the trigger fires.
@@ -295,7 +295,7 @@ class BatchLoads<DestinationT>
   public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> input) {
     Pipeline p = input.getPipeline();
     final PCollectionView<String> jobIdTokenView = createJobIdView(p);
-    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView);
     PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
         input.apply(
             "rewindowIntoGlobal",
@@ -364,8 +364,10 @@ class BatchLoads<DestinationT>
   }
 
   // Generate the temporary-file prefix.
-  private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) {
-    return ((PCollection<String>) jobIdView.getPCollection())
+  private PCollectionView<String> createTempFilePrefixView(
+      Pipeline p, final PCollectionView<String> jobIdView) {
+    return p
+        .apply(Create.of(""))
         .apply(
             "GetTempFilePrefix",
             ParDo.of(
@@ -382,13 +384,13 @@ class BatchLoads<DestinationT>
                         resolveTempLocation(
                             tempLocationRoot,
                             "BigQueryWriteTemp",
-                            c.element());
+                            c.sideInput(jobIdView));
                     LOG.info(
                         "Writing BigQuery temporary files to {} before loading them.",
                         tempLocation);
                     c.output(tempLocation);
                   }
-                }))
+                }).withSideInputs(jobIdView))
         .apply("TempFilePrefixView", View.<String>asSingleton());
   }