You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/31 16:51:59 UTC

[1/2] beam git commit: This closes #2372

Repository: beam
Updated Branches:
  refs/heads/master d9b053d3a -> 62473ae4b


This closes #2372


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62473ae4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62473ae4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62473ae4

Branch: refs/heads/master
Commit: 62473ae4bdd2cdbbebf47f9ca8a893e84e1732ce
Parents: d9b053d 4bcc641
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 31 09:47:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 31 09:47:55 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |  6 +--
 .../beam/runners/core/SideInputHandlerTest.java | 22 ++++++-----
 .../runners/direct/SideInputContainerTest.java  | 19 ++--------
 .../direct/ViewEvaluatorFactoryTest.java        |  3 +-
 .../direct/WriteWithShardingFactoryTest.java    |  5 ++-
 .../flink/FlinkStreamingViewOverrides.java      | 10 ++---
 .../flink/streaming/DoFnOperatorTest.java       | 23 +++++------
 .../runners/dataflow/BatchViewOverrides.java    | 10 ++---
 .../org/apache/beam/sdk/transforms/Combine.java |  2 +-
 .../org/apache/beam/sdk/transforms/View.java    |  8 ++--
 .../apache/beam/sdk/util/PCollectionViews.java  | 40 ++++++++++++--------
 .../apache/beam/sdk/values/PCollectionView.java |  9 +++++
 .../sdk/testing/PCollectionViewTesting.java     | 20 +++++++---
 .../beam/sdk/transforms/DoFnTesterTest.java     |  7 +++-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 16 +++++---
 15 files changed, 114 insertions(+), 86 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Include the creating PCollection in PCollectionView

Posted by tg...@apache.org.
Include the creating PCollection in PCollectionView

This is available on the client that created the view, but may not be
available elsewhere.

Update signatures and callers to match.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4bcc6413
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4bcc6413
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4bcc6413

Branch: refs/heads/master
Commit: 4bcc64130fa842acdfd4ebb7168bea73b2d8313d
Parents: d9b053d
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 30 10:47:12 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 31 09:47:55 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |  6 +--
 .../beam/runners/core/SideInputHandlerTest.java | 22 ++++++-----
 .../runners/direct/SideInputContainerTest.java  | 19 ++--------
 .../direct/ViewEvaluatorFactoryTest.java        |  3 +-
 .../direct/WriteWithShardingFactoryTest.java    |  5 ++-
 .../flink/FlinkStreamingViewOverrides.java      | 10 ++---
 .../flink/streaming/DoFnOperatorTest.java       | 23 +++++------
 .../runners/dataflow/BatchViewOverrides.java    | 10 ++---
 .../org/apache/beam/sdk/transforms/Combine.java |  2 +-
 .../org/apache/beam/sdk/transforms/View.java    |  8 ++--
 .../apache/beam/sdk/util/PCollectionViews.java  | 40 ++++++++++++--------
 .../apache/beam/sdk/values/PCollectionView.java |  9 +++++
 .../sdk/testing/PCollectionViewTesting.java     | 20 +++++++---
 .../beam/sdk/transforms/DoFnTesterTest.java     |  7 +++-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 16 +++++---
 15 files changed, 114 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 79a2dd7..dfc8f63 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -242,7 +242,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
           .apply(Combine.globally(transform.getCombineFn())
               .withoutDefaults().withFanout(transform.getFanout()));
 
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(combined.getPipeline(),
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(combined,
           combined.getWindowingStrategy(), transform.getInsertDefault(),
           transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null,
               combined.getCoder());
@@ -338,8 +338,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
     @Override
     public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(input.getPipeline(),
-          input.getWindowingStrategy(), input.getCoder());
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
 
       return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
           .apply(CreateApexPCollectionView.<T, Iterable<T>> of(view));

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
index 3a5d346..335aede 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
@@ -51,20 +51,22 @@ public class SideInputHandlerTest {
   private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
       WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
 
-  private PCollectionView<Iterable<String>> view1 = PCollectionViewTesting.testingView(
-      new TupleTag<Iterable<WindowedValue<String>>>() {},
-      new PCollectionViewTesting.IdentityViewFn<String>(),
-      StringUtf8Coder.of(),
-      windowingStrategy1);
+  private PCollectionView<Iterable<String>> view1 =
+      PCollectionViewTesting.testingView(
+          new TupleTag<Iterable<WindowedValue<String>>>() {},
+          new PCollectionViewTesting.IdentityViewFn<String>(),
+          StringUtf8Coder.of(),
+          windowingStrategy1);
 
   private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
       WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
 
-  private PCollectionView<Iterable<String>> view2 = PCollectionViewTesting.testingView(
-      new TupleTag<Iterable<WindowedValue<String>>>() {},
-      new PCollectionViewTesting.IdentityViewFn<String>(),
-      StringUtf8Coder.of(),
-      windowingStrategy2);
+  private PCollectionView<Iterable<String>> view2 =
+      PCollectionViewTesting.testingView(
+          new TupleTag<Iterable<WindowedValue<String>>>() {},
+          new PCollectionViewTesting.IdentityViewFn<String>(),
+          StringUtf8Coder.of(),
+          windowingStrategy2);
 
   @Test
   public void testIsEmpty() {

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index 183decd..f4de883 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
@@ -204,24 +205,12 @@ public class SideInputContainerTest {
   }
 
   @Test
-  public void withPCollectionViewsErrorsForContainsNotInViews() {
-    PCollectionView<Map<String, Iterable<String>>> newView =
-        PCollectionViews.multimapView(
-            pipeline,
-            WindowingStrategy.globalDefault(),
-            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString());
-
-    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
-  }
-
-  @Test
   public void withViewsForViewNotInContainerFails() {
+    PCollection<KV<String, String>> input =
+        pipeline.apply(Create.empty(new TypeDescriptor<KV<String, String>>() {}));
     PCollectionView<Map<String, Iterable<String>>> newView =
         PCollectionViews.multimapView(
-            pipeline,
+            input,
             WindowingStrategy.globalDefault(),
             KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index b094d17..b56bd74 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -63,7 +63,8 @@ public class ViewEvaluatorFactoryTest {
     PCollection<String> input = p.apply(Create.of("foo", "bar"));
     CreatePCollectionView<String, Iterable<String>> createView =
         CreatePCollectionView.of(
-            PCollectionViews.iterableView(p, input.getWindowingStrategy(), StringUtf8Coder.of()));
+            PCollectionViews.iterableView(
+                input, input.getWindowingStrategy(), StringUtf8Coder.of()));
     PCollection<Iterable<String>> concat =
         input.apply(WithKeys.<Void, String>of((Void) null))
             .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 16b6312..8720fd1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -170,13 +170,14 @@ public class WriteWithShardingFactoryTest {
 
   @Test
   public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
+    long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3;
+    PCollection<Long> inputCount = p.apply(Create.of(countValue));
     PCollectionView<Long> elementCountView =
         PCollectionViews.singletonView(
-            p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+            inputCount, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
     CalculateShardsFn fn = new CalculateShardsFn(3);
     DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);
 
-    long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3;
     fnTester.setSideInput(elementCountView, GlobalWindow.INSTANCE, countValue);
 
     List<Integer> kvs = fnTester.processBundle(10L);

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
index 0ff6367..f955f2a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
@@ -59,7 +59,7 @@ class FlinkStreamingViewOverrides {
     public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, V>> view =
           PCollectionViews.mapView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder());
 
@@ -104,7 +104,7 @@ class FlinkStreamingViewOverrides {
     public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, Iterable<V>>> view =
           PCollectionViews.multimapView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder());
 
@@ -144,7 +144,7 @@ class FlinkStreamingViewOverrides {
     public PCollectionView<List<T>> expand(PCollection<T> input) {
       PCollectionView<List<T>> view =
           PCollectionViews.listView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder());
 
@@ -175,7 +175,7 @@ class FlinkStreamingViewOverrides {
     public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view =
           PCollectionViews.iterableView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder());
 
@@ -272,7 +272,7 @@ class FlinkStreamingViewOverrides {
               .withFanout(transform.getFanout()));
 
       PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined.getPipeline(),
+          combined,
           combined.getWindowingStrategy(),
           transform.getInsertDefault(),
           transform.getInsertDefault()

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 25154fa..c1fdea3 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -58,7 +58,6 @@ import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -85,20 +84,22 @@ public class DoFnOperatorTest {
   private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
       WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
 
-  private PCollectionView<Iterable<String>> view1 = PCollectionViewTesting.testingView(
-      new TupleTag<Iterable<WindowedValue<String>>>() {},
-      new PCollectionViewTesting.IdentityViewFn<String>(),
-      StringUtf8Coder.of(),
-      windowingStrategy1);
+  private PCollectionView<Iterable<String>> view1 =
+      PCollectionViewTesting.testingView(
+          new TupleTag<Iterable<WindowedValue<String>>>() {},
+          new PCollectionViewTesting.IdentityViewFn<String>(),
+          StringUtf8Coder.of(),
+          windowingStrategy1);
 
   private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
       WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
 
-  private PCollectionView<Iterable<String>> view2 = PCollectionViewTesting.testingView(
-      new TupleTag<Iterable<WindowedValue<String>>>() {},
-      new PCollectionViewTesting.IdentityViewFn<String>(),
-      StringUtf8Coder.of(),
-      windowingStrategy2);
+  private PCollectionView<Iterable<String>> view2 =
+      PCollectionViewTesting.testingView(
+          new TupleTag<Iterable<WindowedValue<String>>>() {},
+          new PCollectionViewTesting.IdentityViewFn<String>(),
+          StringUtf8Coder.of(),
+          windowingStrategy2);
 
   @Test
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 3689d3d..af96403 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -214,7 +214,7 @@ class BatchViewOverrides {
       KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
       try {
         PCollectionView<Map<K, V>> view = PCollectionViews.mapView(
-            input.getPipeline(), input.getWindowingStrategy(), inputCoder);
+            input, input.getWindowingStrategy(), inputCoder);
         return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */);
       } catch (NonDeterministicException e) {
         runner.recordViewUsesNonDeterministicKeyCoder(this);
@@ -701,7 +701,7 @@ class BatchViewOverrides {
       KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
       try {
         PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView(
-            input.getPipeline(), input.getWindowingStrategy(), inputCoder);
+            input, input.getWindowingStrategy(), inputCoder);
 
         return applyForMapLike(runner, input, view, false /* unique keys not expected */);
       } catch (NonDeterministicException e) {
@@ -959,7 +959,7 @@ class BatchViewOverrides {
       @SuppressWarnings({"rawtypes", "unchecked"})
       PCollectionView<ViewT> view =
           (PCollectionView<ViewT>) PCollectionViews.<FinalT, W>singletonView(
-              input.getPipeline(),
+              (PCollection) input,
               (WindowingStrategy) input.getWindowingStrategy(),
               hasDefault,
               defaultValue,
@@ -1092,7 +1092,7 @@ class BatchViewOverrides {
     @Override
     public PCollectionView<List<T>> expand(PCollection<T> input) {
       PCollectionView<List<T>> view = PCollectionViews.listView(
-          input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
+          input, input.getWindowingStrategy(), input.getCoder());
       return applyForIterableLike(runner, input, view);
     }
 
@@ -1177,7 +1177,7 @@ class BatchViewOverrides {
     @Override
     public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
-          input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
+          input, input.getWindowingStrategy(), input.getCoder());
       return BatchViewAsList.applyForIterableLike(runner, input, view);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index b403691..8fe4831 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1601,7 +1601,7 @@ public class Combine {
       return combined.apply(
           CreatePCollectionView.<OutputT, OutputT>of(
               PCollectionViews.singletonView(
-                  input.getPipeline(),
+                  combined,
                   input.getWindowingStrategy(),
                   insertDefault,
                   insertDefault ? fn.defaultValue() : null,

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 67a41e4..14035b0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -257,7 +257,7 @@ public class View {
     @Override
     public PCollectionView<List<T>> expand(PCollection<T> input) {
       return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView(
-          input.getPipeline(), input.getWindowingStrategy(), input.getCoder())));
+          input, input.getWindowingStrategy(), input.getCoder())));
     }
   }
 
@@ -283,7 +283,7 @@ public class View {
     @Override
     public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView(
-          input.getPipeline(), input.getWindowingStrategy(), input.getCoder())));
+          input, input.getWindowingStrategy(), input.getCoder())));
     }
   }
 
@@ -427,7 +427,7 @@ public class View {
     public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(
           PCollectionViews.multimapView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder())));
     }
@@ -464,7 +464,7 @@ public class View {
     public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(
           PCollectionViews.mapView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder())));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
index 0794703..c2e3153 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
@@ -60,13 +59,13 @@ public class PCollectionViews {
    * {@code defaultValue} for any empty windows.
    */
   public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
-      Pipeline pipeline,
+      PCollection<T> pCollection,
       WindowingStrategy<?, W> windowingStrategy,
       boolean hasDefault,
       @Nullable T defaultValue,
       Coder<T> valueCoder) {
      return new SimplePCollectionView<>(
-        pipeline,
+        pCollection,
         new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
         windowingStrategy,
         valueCoder);
@@ -77,11 +76,11 @@ public class PCollectionViews {
    * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
    */
   public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
-      Pipeline pipeline,
+      PCollection<T> pCollection,
       WindowingStrategy<?, W> windowingStrategy,
       Coder<T> valueCoder) {
     return new SimplePCollectionView<>(
-        pipeline, new IterableViewFn<T>(), windowingStrategy, valueCoder);
+        pCollection, new IterableViewFn<T>(), windowingStrategy, valueCoder);
   }
 
   /**
@@ -89,11 +88,11 @@ public class PCollectionViews {
    * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
    */
   public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
-      Pipeline pipeline,
+      PCollection<T> pCollection,
       WindowingStrategy<?, W> windowingStrategy,
       Coder<T> valueCoder) {
      return new SimplePCollectionView<>(
-        pipeline, new ListViewFn<T>(), windowingStrategy, valueCoder);
+        pCollection, new ListViewFn<T>(), windowingStrategy, valueCoder);
   }
 
   /**
@@ -101,9 +100,11 @@ public class PCollectionViews {
    * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
    */
   public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
-      Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<KV<K, V>> valueCoder) {
+      PCollection<KV<K, V>> pCollection,
+      WindowingStrategy<?, W> windowingStrategy,
+      Coder<KV<K, V>> valueCoder) {
     return new SimplePCollectionView<>(
-        pipeline, new MapViewFn<K, V>(), windowingStrategy, valueCoder);
+        pCollection, new MapViewFn<K, V>(), windowingStrategy, valueCoder);
   }
 
   /**
@@ -111,11 +112,11 @@ public class PCollectionViews {
    * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
    */
   public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
-      Pipeline pipeline,
+      PCollection<KV<K, V>> pCollection,
       WindowingStrategy<?, W> windowingStrategy,
       Coder<KV<K, V>> valueCoder) {
     return new SimplePCollectionView<>(
-        pipeline, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder);
+        pCollection, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder);
   }
 
   /**
@@ -301,6 +302,9 @@ public class PCollectionViews {
   private static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow>
       extends PValueBase
       implements PCollectionView<ViewT> {
+    /** The {@link PCollection} this view was originally created from. */
+    private transient PCollection<ElemT> pCollection;
+
     /** A unique tag for the view, typed according to the elements underlying the view. */
     private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
 
@@ -320,12 +324,13 @@ public class PCollectionViews {
      * boilerplate accessors.
      */
     private SimplePCollectionView(
-        Pipeline pipeline,
+        PCollection<ElemT> pCollection,
         TupleTag<Iterable<WindowedValue<ElemT>>> tag,
         ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
         WindowingStrategy<?, W> windowingStrategy,
         Coder<ElemT> valueCoder) {
-      super(pipeline);
+      super(pCollection.getPipeline());
+      this.pCollection = pCollection;
       if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
         throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows");
       }
@@ -342,12 +347,12 @@ public class PCollectionViews {
      * boilerplate accessors, with an auto-generated tag.
      */
     private SimplePCollectionView(
-        Pipeline pipeline,
+        PCollection<ElemT> pCollection,
         ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
         WindowingStrategy<?, W> windowingStrategy,
         Coder<ElemT> valueCoder) {
       this(
-          pipeline,
+          pCollection,
           new TupleTag<Iterable<WindowedValue<ElemT>>>(),
           viewFn,
           windowingStrategy,
@@ -372,6 +377,11 @@ public class PCollectionViews {
       return untypedViewFn;
     }
 
+    @Override
+    public PCollection<?> getPCollection() {
+      return pCollection;
+    }
+
     /**
      * Returns a unique {@link TupleTag} identifying this {@link PCollectionView}.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
index a351723..f2ddf55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.values;
 
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -44,6 +45,14 @@ import org.apache.beam.sdk.util.WindowingStrategy;
  */
 public interface PCollectionView<T> extends PValue, Serializable {
   /**
+   * Gets the {@link PCollection} this {@link PCollectionView} was created from.
+   *
+   * <p>The {@link PCollection} may not be available in all contexts.
+   */
+  @Nullable
+  PCollection<?> getPCollection();
+
+  /**
    * @deprecated this method will be removed entirely. The {@link PCollection} underlying a side
    *     input, is part of the side input's specification with a {@link ParDo} transform, which will
    *     obtain that information via a package-private channel.

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index 99fb1fb..b544812 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValueBase;
 import org.apache.beam.sdk.values.TupleTag;
@@ -140,12 +141,9 @@ public final class PCollectionViewTesting {
   public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
       TupleTag<Iterable<WindowedValue<ElemT>>> tag,
       ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
-      Coder<ElemT> elemCoder) {
-    return testingView(
-        tag,
-        viewFn,
-        elemCoder,
-        DEFAULT_WINDOWING_STRATEGY);
+      Coder<ElemT> elemCoder,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    return testingView(null, tag, viewFn, elemCoder, windowingStrategy);
   }
 
   /**
@@ -168,11 +166,13 @@ public final class PCollectionViewTesting {
    * values provided to the view during execution, results are unpredictable.
    */
   public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
+      PCollection<ElemT> pCollection,
       TupleTag<Iterable<WindowedValue<ElemT>>> tag,
       ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
       Coder<ElemT> elemCoder,
       WindowingStrategy<?, ?> windowingStrategy) {
     return new PCollectionViewFromParts<>(
+        pCollection,
         tag,
         viewFn,
         windowingStrategy,
@@ -223,22 +223,30 @@ public final class PCollectionViewTesting {
   private static class PCollectionViewFromParts<ElemT, ViewT>
       extends PValueBase
       implements PCollectionView<ViewT> {
+    private PCollection<ElemT> pCollection;
     private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
     private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
     private WindowingStrategy<?, ?> windowingStrategy;
     private Coder<Iterable<WindowedValue<ElemT>>> coder;
 
     public PCollectionViewFromParts(
+        PCollection<ElemT> pCollection,
         TupleTag<Iterable<WindowedValue<ElemT>>> tag,
         ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
         WindowingStrategy<?, ?> windowingStrategy,
         Coder<Iterable<WindowedValue<ElemT>>> coder) {
+      this.pCollection = pCollection;
       this.tag = tag;
       this.viewFn = viewFn;
       this.windowingStrategy = windowingStrategy;
       this.coder = coder;
     }
 
+    @Override
+    public PCollection<?> getPCollection() {
+      return pCollection;
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 699687f..3b6fbfb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.Matchers;
@@ -324,9 +325,10 @@ public class DoFnTesterTest {
 
   @Test
   public void fnWithSideInputDefault() throws Exception {
+    PCollection<Integer> pCollection = p.apply(Create.empty(VarIntCoder.of()));
     final PCollectionView<Integer> value =
         PCollectionViews.singletonView(
-            p, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+            pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
 
     try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
       tester.processElement(1);
@@ -339,9 +341,10 @@ public class DoFnTesterTest {
 
   @Test
   public void fnWithSideInputExplicit() throws Exception {
+    PCollection<Integer> pCollection = p.apply(Create.of(-2));
     final PCollectionView<Integer> value =
         PCollectionViews.singletonView(
-            p, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+            pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
 
     try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
       tester.setSideInput(value, GlobalWindow.INSTANCE, -2);

http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index e7db8a2..5ef8b2c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -153,6 +153,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.Matchers;
@@ -2108,8 +2109,10 @@ public class BigQueryIOTest implements Serializable {
     TupleTag<KV<Long, List<String>>> singlePartitionTag =
         new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
 
+    PCollection<KV<String, Long>> filesPCollection =
+        p.apply(Create.of(files).withType(new TypeDescriptor<KV<String, Long>>() {}));
     PCollectionView<Iterable<KV<String, Long>>> filesView = PCollectionViews.iterableView(
-        p,
+        filesPCollection,
         WindowingStrategy.globalDefault(),
         KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()));
 
@@ -2173,8 +2176,9 @@ public class BigQueryIOTest implements Serializable {
       expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i));
     }
 
+    PCollection<String> expectedTempTablesPCollection = p.apply(Create.of(expectedTempTables));
     PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView(
-        p,
+        expectedTempTablesPCollection,
         WindowingStrategy.globalDefault(),
         StringUtf8Coder.of());
     PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId"));
@@ -2251,10 +2255,10 @@ public class BigQueryIOTest implements Serializable {
       tempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i));
     }
 
-    PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView(
-        p,
-        WindowingStrategy.globalDefault(),
-        StringUtf8Coder.of());
+    PCollection<String> tempTablesPCollection = p.apply(Create.of(tempTables));
+    PCollectionView<Iterable<String>> tempTablesView =
+        PCollectionViews.iterableView(
+            tempTablesPCollection, WindowingStrategy.globalDefault(), StringUtf8Coder.of());
     PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId"));
     PCollectionView<String> jobIdTokenView =
         jobIdTokenCollection.apply(View.<String>asSingleton());