You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/05 18:00:55 UTC

[2/2] beam git commit: Add WindowMappingFn to PCollectionView

Add WindowMappingFn to PCollectionView

This exposes the explicit way in which Windows should be mapped.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/73133587
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/73133587
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/73133587

Branch: refs/heads/master
Commit: 73133587217a6a8d1f55a254a59af39409000b31
Parents: e32a025
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 5 09:24:36 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 5 11:00:44 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/PCollectionViews.java  | 41 +++++++++++++++---
 .../apache/beam/sdk/values/PCollectionView.java | 10 +++++
 .../sdk/testing/PCollectionViewTesting.java     | 45 ++++++++++++++++++--
 3 files changed, 86 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/73133587/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
index c2e3153..7617253 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -64,9 +65,10 @@ public class PCollectionViews {
       boolean hasDefault,
       @Nullable T defaultValue,
       Coder<T> valueCoder) {
-     return new SimplePCollectionView<>(
+    return new SimplePCollectionView<>(
         pCollection,
         new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy,
         valueCoder);
   }
@@ -80,7 +82,11 @@ public class PCollectionViews {
       WindowingStrategy<?, W> windowingStrategy,
       Coder<T> valueCoder) {
     return new SimplePCollectionView<>(
-        pCollection, new IterableViewFn<T>(), windowingStrategy, valueCoder);
+        pCollection,
+        new IterableViewFn<T>(),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy,
+        valueCoder);
   }
 
   /**
@@ -91,8 +97,12 @@ public class PCollectionViews {
       PCollection<T> pCollection,
       WindowingStrategy<?, W> windowingStrategy,
       Coder<T> valueCoder) {
-     return new SimplePCollectionView<>(
-        pCollection, new ListViewFn<T>(), windowingStrategy, valueCoder);
+    return new SimplePCollectionView<>(
+        pCollection,
+        new ListViewFn<T>(),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy,
+        valueCoder);
   }
 
   /**
@@ -104,7 +114,11 @@ public class PCollectionViews {
       WindowingStrategy<?, W> windowingStrategy,
       Coder<KV<K, V>> valueCoder) {
     return new SimplePCollectionView<>(
-        pCollection, new MapViewFn<K, V>(), windowingStrategy, valueCoder);
+        pCollection,
+        new MapViewFn<K, V>(),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy,
+        valueCoder);
   }
 
   /**
@@ -116,7 +130,11 @@ public class PCollectionViews {
       WindowingStrategy<?, W> windowingStrategy,
       Coder<KV<K, V>> valueCoder) {
     return new SimplePCollectionView<>(
-        pCollection, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder);
+        pCollection,
+        new MultimapViewFn<K, V>(),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy,
+        valueCoder);
   }
 
   /**
@@ -308,6 +326,8 @@ public class PCollectionViews {
     /** A unique tag for the view, typed according to the elements underlying the view. */
     private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
 
+    private WindowMappingFn<W> windowMappingFn;
+
     /** The windowing strategy for the PCollection underlying the view. */
     private WindowingStrategy<?, W> windowingStrategy;
 
@@ -327,6 +347,7 @@ public class PCollectionViews {
         PCollection<ElemT> pCollection,
         TupleTag<Iterable<WindowedValue<ElemT>>> tag,
         ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+        WindowMappingFn<W> windowMappingFn,
         WindowingStrategy<?, W> windowingStrategy,
         Coder<ElemT> valueCoder) {
       super(pCollection.getPipeline());
@@ -334,6 +355,7 @@ public class PCollectionViews {
       if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
         throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows");
       }
+      this.windowMappingFn = windowMappingFn;
       this.tag = tag;
       this.windowingStrategy = windowingStrategy;
       this.viewFn = viewFn;
@@ -349,12 +371,14 @@ public class PCollectionViews {
     private SimplePCollectionView(
         PCollection<ElemT> pCollection,
         ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+        WindowMappingFn<W> windowMappingFn,
         WindowingStrategy<?, W> windowingStrategy,
         Coder<ElemT> valueCoder) {
       this(
           pCollection,
           new TupleTag<Iterable<WindowedValue<ElemT>>>(),
           viewFn,
+          windowMappingFn,
           windowingStrategy,
           valueCoder);
     }
@@ -378,6 +402,11 @@ public class PCollectionViews {
     }
 
     @Override
+    public WindowMappingFn<?> getWindowMappingFn() {
+      return windowMappingFn;
+    }
+
+    @Override
     public PCollection<?> getPCollection() {
       return pCollection;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/73133587/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
index f2ddf55..d65912b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
@@ -19,10 +19,13 @@ package org.apache.beam.sdk.values;
 
 import java.io.Serializable;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 
@@ -69,6 +72,13 @@ public interface PCollectionView<T> extends PValue, Serializable {
   ViewFn<Iterable<WindowedValue<?>>, T> getViewFn();
 
   /**
+   * Returns the {@link WindowMappingFn} used to map windows from a main input to the side input of
+   * this {@link PCollectionView}.
+   */
+  @Experimental(Kind.CORE_RUNNERS_ONLY)
+  WindowMappingFn<?> getWindowMappingFn();
+
+  /**
    * @deprecated this method will be removed entirely. The {@link PCollection} underlying a side
    *     input, including its {@link WindowingStrategy}, is part of the side input's specification
    *     with a {@link ParDo} transform, which will obtain that information via a package-private

http://git-wip-us.apache.org/repos/asf/beam/blob/73133587/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index b544812..e6b13c1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
@@ -155,12 +156,38 @@ public final class PCollectionViewTesting {
   }
 
   /**
-   * A {@link PCollectionView} explicitly built from its {@link TupleTag},
-   * {@link WindowingStrategy}, {@link Coder}, and conversion function.
+   * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
+   * WindowingStrategy}, {@link Coder}, and conversion function.
    *
    * <p>This method is only recommended for use by runner implementors to test their
-   * implementations. It is very easy to construct a {@link PCollectionView} that does
-   * not respect the invariants required for proper functioning.
+   * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
+   * the invariants required for proper functioning.
+   *
+   * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
+   * values provided to the view during execution, results are unpredictable.
+   */
+  public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
+      PCollection<ElemT> pCollection,
+      TupleTag<Iterable<WindowedValue<ElemT>>> tag,
+      ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+      Coder<ElemT> elemCoder,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    return testingView(
+        pCollection,
+        tag,
+        viewFn,
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        elemCoder,
+        windowingStrategy);
+  }
+
+  /**
+   * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
+   * WindowingStrategy}, {@link Coder}, {@link ViewFn} and {@link WindowMappingFn}.
+   *
+   * <p>This method is only recommended for use by runner implementors to test their
+   * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
+   * the invariants required for proper functioning.
    *
    * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
    * values provided to the view during execution, results are unpredictable.
@@ -169,12 +196,14 @@ public final class PCollectionViewTesting {
       PCollection<ElemT> pCollection,
       TupleTag<Iterable<WindowedValue<ElemT>>> tag,
       ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+      WindowMappingFn<?> windowMappingFn,
       Coder<ElemT> elemCoder,
       WindowingStrategy<?, ?> windowingStrategy) {
     return new PCollectionViewFromParts<>(
         pCollection,
         tag,
         viewFn,
+        windowMappingFn,
         windowingStrategy,
         IterableCoder.of(
             WindowedValue.getFullCoder(elemCoder, windowingStrategy.getWindowFn().windowCoder())));
@@ -226,6 +255,7 @@ public final class PCollectionViewTesting {
     private PCollection<ElemT> pCollection;
     private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
     private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
+    private WindowMappingFn<?> windowMappingFn;
     private WindowingStrategy<?, ?> windowingStrategy;
     private Coder<Iterable<WindowedValue<ElemT>>> coder;
 
@@ -233,11 +263,13 @@ public final class PCollectionViewTesting {
         PCollection<ElemT> pCollection,
         TupleTag<Iterable<WindowedValue<ElemT>>> tag,
         ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+        WindowMappingFn<?> windowMappingFn,
         WindowingStrategy<?, ?> windowingStrategy,
         Coder<Iterable<WindowedValue<ElemT>>> coder) {
       this.pCollection = pCollection;
       this.tag = tag;
       this.viewFn = viewFn;
+      this.windowMappingFn = windowMappingFn;
       this.windowingStrategy = windowingStrategy;
       this.coder = coder;
     }
@@ -262,6 +294,11 @@ public final class PCollectionViewTesting {
     }
 
     @Override
+    public WindowMappingFn<?> getWindowMappingFn() {
+      return windowMappingFn;
+    }
+
+    @Override
     public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
       return windowingStrategy;
     }