You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/05 18:00:55 UTC
[2/2] beam git commit: Add WindowMappingFn to PCollectionView
Add WindowMappingFn to PCollectionView
This exposes the explicit way in which Windows should be mapped.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/73133587
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/73133587
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/73133587
Branch: refs/heads/master
Commit: 73133587217a6a8d1f55a254a59af39409000b31
Parents: e32a025
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 5 09:24:36 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 5 11:00:44 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/PCollectionViews.java | 41 +++++++++++++++---
.../apache/beam/sdk/values/PCollectionView.java | 10 +++++
.../sdk/testing/PCollectionViewTesting.java | 45 ++++++++++++++++++--
3 files changed, 86 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/73133587/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
index c2e3153..7617253 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -64,9 +65,10 @@ public class PCollectionViews {
boolean hasDefault,
@Nullable T defaultValue,
Coder<T> valueCoder) {
- return new SimplePCollectionView<>(
+ return new SimplePCollectionView<>(
pCollection,
new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy,
valueCoder);
}
@@ -80,7 +82,11 @@ public class PCollectionViews {
WindowingStrategy<?, W> windowingStrategy,
Coder<T> valueCoder) {
return new SimplePCollectionView<>(
- pCollection, new IterableViewFn<T>(), windowingStrategy, valueCoder);
+ pCollection,
+ new IterableViewFn<T>(),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ valueCoder);
}
/**
@@ -91,8 +97,12 @@ public class PCollectionViews {
PCollection<T> pCollection,
WindowingStrategy<?, W> windowingStrategy,
Coder<T> valueCoder) {
- return new SimplePCollectionView<>(
- pCollection, new ListViewFn<T>(), windowingStrategy, valueCoder);
+ return new SimplePCollectionView<>(
+ pCollection,
+ new ListViewFn<T>(),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ valueCoder);
}
/**
@@ -104,7 +114,11 @@ public class PCollectionViews {
WindowingStrategy<?, W> windowingStrategy,
Coder<KV<K, V>> valueCoder) {
return new SimplePCollectionView<>(
- pCollection, new MapViewFn<K, V>(), windowingStrategy, valueCoder);
+ pCollection,
+ new MapViewFn<K, V>(),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ valueCoder);
}
/**
@@ -116,7 +130,11 @@ public class PCollectionViews {
WindowingStrategy<?, W> windowingStrategy,
Coder<KV<K, V>> valueCoder) {
return new SimplePCollectionView<>(
- pCollection, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder);
+ pCollection,
+ new MultimapViewFn<K, V>(),
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ valueCoder);
}
/**
@@ -308,6 +326,8 @@ public class PCollectionViews {
/** A unique tag for the view, typed according to the elements underlying the view. */
private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
+ private WindowMappingFn<W> windowMappingFn;
+
/** The windowing strategy for the PCollection underlying the view. */
private WindowingStrategy<?, W> windowingStrategy;
@@ -327,6 +347,7 @@ public class PCollectionViews {
PCollection<ElemT> pCollection,
TupleTag<Iterable<WindowedValue<ElemT>>> tag,
ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ WindowMappingFn<W> windowMappingFn,
WindowingStrategy<?, W> windowingStrategy,
Coder<ElemT> valueCoder) {
super(pCollection.getPipeline());
@@ -334,6 +355,7 @@ public class PCollectionViews {
if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows");
}
+ this.windowMappingFn = windowMappingFn;
this.tag = tag;
this.windowingStrategy = windowingStrategy;
this.viewFn = viewFn;
@@ -349,12 +371,14 @@ public class PCollectionViews {
private SimplePCollectionView(
PCollection<ElemT> pCollection,
ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ WindowMappingFn<W> windowMappingFn,
WindowingStrategy<?, W> windowingStrategy,
Coder<ElemT> valueCoder) {
this(
pCollection,
new TupleTag<Iterable<WindowedValue<ElemT>>>(),
viewFn,
+ windowMappingFn,
windowingStrategy,
valueCoder);
}
@@ -378,6 +402,11 @@ public class PCollectionViews {
}
@Override
+ public WindowMappingFn<?> getWindowMappingFn() {
+ return windowMappingFn;
+ }
+
+ @Override
public PCollection<?> getPCollection() {
return pCollection;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/73133587/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
index f2ddf55..d65912b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
@@ -19,10 +19,13 @@ package org.apache.beam.sdk.values;
import java.io.Serializable;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -69,6 +72,13 @@ public interface PCollectionView<T> extends PValue, Serializable {
ViewFn<Iterable<WindowedValue<?>>, T> getViewFn();
/**
+ * Returns the {@link WindowMappingFn} used to map windows from a main input to the side input of
+ * this {@link PCollectionView}.
+ */
+ @Experimental(Kind.CORE_RUNNERS_ONLY)
+ WindowMappingFn<?> getWindowMappingFn();
+
+ /**
* @deprecated this method will be removed entirely. The {@link PCollection} underlying a side
* input, including its {@link WindowingStrategy}, is part of the side input's specification
* with a {@link ParDo} transform, which will obtain that information via a package-private
http://git-wip-us.apache.org/repos/asf/beam/blob/73133587/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index b544812..e6b13c1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
@@ -155,12 +156,38 @@ public final class PCollectionViewTesting {
}
/**
- * A {@link PCollectionView} explicitly built from its {@link TupleTag},
- * {@link WindowingStrategy}, {@link Coder}, and conversion function.
+ * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
+ * WindowingStrategy}, {@link Coder}, and conversion function.
*
* <p>This method is only recommended for use by runner implementors to test their
- * implementations. It is very easy to construct a {@link PCollectionView} that does
- * not respect the invariants required for proper functioning.
+ * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
+ * the invariants required for proper functioning.
+ *
+ * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
+ * values provided to the view during execution, results are unpredictable.
+ */
+ public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
+ PCollection<ElemT> pCollection,
+ TupleTag<Iterable<WindowedValue<ElemT>>> tag,
+ ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ Coder<ElemT> elemCoder,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ return testingView(
+ pCollection,
+ tag,
+ viewFn,
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ elemCoder,
+ windowingStrategy);
+ }
+
+ /**
+ * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
+ * WindowingStrategy}, {@link Coder}, {@link ViewFn} and {@link WindowMappingFn}.
+ *
+ * <p>This method is only recommended for use by runner implementors to test their
+ * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
+ * the invariants required for proper functioning.
*
* <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
* values provided to the view during execution, results are unpredictable.
@@ -169,12 +196,14 @@ public final class PCollectionViewTesting {
PCollection<ElemT> pCollection,
TupleTag<Iterable<WindowedValue<ElemT>>> tag,
ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ WindowMappingFn<?> windowMappingFn,
Coder<ElemT> elemCoder,
WindowingStrategy<?, ?> windowingStrategy) {
return new PCollectionViewFromParts<>(
pCollection,
tag,
viewFn,
+ windowMappingFn,
windowingStrategy,
IterableCoder.of(
WindowedValue.getFullCoder(elemCoder, windowingStrategy.getWindowFn().windowCoder())));
@@ -226,6 +255,7 @@ public final class PCollectionViewTesting {
private PCollection<ElemT> pCollection;
private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
+ private WindowMappingFn<?> windowMappingFn;
private WindowingStrategy<?, ?> windowingStrategy;
private Coder<Iterable<WindowedValue<ElemT>>> coder;
@@ -233,11 +263,13 @@ public final class PCollectionViewTesting {
PCollection<ElemT> pCollection,
TupleTag<Iterable<WindowedValue<ElemT>>> tag,
ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ WindowMappingFn<?> windowMappingFn,
WindowingStrategy<?, ?> windowingStrategy,
Coder<Iterable<WindowedValue<ElemT>>> coder) {
this.pCollection = pCollection;
this.tag = tag;
this.viewFn = viewFn;
+ this.windowMappingFn = windowMappingFn;
this.windowingStrategy = windowingStrategy;
this.coder = coder;
}
@@ -262,6 +294,11 @@ public final class PCollectionViewTesting {
}
@Override
+ public WindowMappingFn<?> getWindowMappingFn() {
+ return windowMappingFn;
+ }
+
+ @Override
public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
return windowingStrategy;
}