You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/02 23:42:48 UTC
[4/4] incubator-beam git commit: Makes DoFnTester use new DoFn
internally.
Makes DoFnTester use new DoFn internally.
There were 2 remaining users of DoFnTester.of(OldDoFn):
- SplittableParDo.ProcessElements: this is fixed in
https://github.com/apache/incubator-beam/pull/1261
- GroupAlsoByWindowsProperties: this one is harder.
Various GABWDoFn's use OldDoFn.windowingInternals,
and we can't pass that through a new DoFn.
So instead I removed usage of DoFnTester from
GroupAlsoByWindowsProperties in favor of a tiny
hand-coded solution.
So after #1261 DoFnTester.of(OldDoFn) can be deleted.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96455768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96455768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96455768
Branch: refs/heads/master
Commit: 96455768568616141a95833380f37c478a989397
Parents: e04cd47
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Nov 18 13:10:22 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 2 15:42:33 2016 -0800
----------------------------------------------------------------------
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 6 +-
.../core/GroupByKeyViaGroupByKeyOnly.java | 22 +-
.../core/GroupAlsoByWindowsProperties.java | 590 +++++++++++--------
.../beam/sdk/transforms/DoFnAdapters.java | 2 +
.../apache/beam/sdk/transforms/DoFnTester.java | 130 ++--
.../sdk/transforms/reflect/DoFnInvokers.java | 11 -
.../beam/sdk/transforms/DoFnTesterTest.java | 4 +-
7 files changed, 394 insertions(+), 371 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index f8f6207..b4b366c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Iterables;
import java.util.List;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowedValue;
@@ -30,7 +29,6 @@ import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.util.state.TimerCallback;
-import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
/**
@@ -55,9 +53,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
}
@Override
- public void processElement(
- OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
- throws Exception {
+ public void processElement(ProcessContext c) throws Exception {
K key = c.element().getKey();
// Used with Batch, we know that all the data is available for this key. We can't use the
// timer manager from the context because it doesn't exist. So we create one and emulate the
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index 79d2252..43047ca 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -26,15 +26,13 @@ import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -135,10 +133,9 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
return input
.apply(
ParDo.of(
- new OldDoFn<
- KV<K, Iterable<WindowedValue<V>>>,
- KV<K, Iterable<WindowedValue<V>>>>() {
- @Override
+ new DoFn<KV<K, Iterable<WindowedValue<V>>>,
+ KV<K, Iterable<WindowedValue<V>>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
K key = kvs.getKey();
@@ -251,16 +248,5 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
input.getPipeline(), windowingStrategy, input.isBounded())
.setCoder(outputKvCoder);
}
-
- private <W extends BoundedWindow>
- GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
- WindowingStrategy<?, W> strategy,
- StateInternalsFactory<K> stateInternalsFactory,
- Coder<V> inputIterableElementValueCoder) {
- return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
- strategy,
- stateInternalsFactory,
- SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index d1e0c68..97b67c6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -23,50 +23,60 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThat;
+import com.google.common.base.Predicate;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* Properties of {@link GroupAlsoByWindowsDoFn}.
*
- * <p>Some properties may not hold of some implementations, due to restrictions on the context
- * in which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
+ * <p>Some properties may not hold of some implementations, due to restrictions on the context in
+ * which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
* support merging windows.
*/
public class GroupAlsoByWindowsProperties {
/**
- * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide
- * the appropriate windowing strategy under test.
+ * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the
+ * appropriate windowing strategy under test.
*/
public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
- <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>
- forStrategy(WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
+ <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> forStrategy(
+ WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
}
/**
@@ -76,8 +86,7 @@ public class GroupAlsoByWindowsProperties {
* <p>The input type is deliberately left as a wildcard, since it is not relevant.
*/
public static <K, InputT, OutputT> void emptyInputEmptyOutput(
- GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory)
- throws Exception {
+ GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory) throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
@@ -87,13 +96,14 @@ public class GroupAlsoByWindowsProperties {
@SuppressWarnings("unchecked")
K fakeKey = (K) "this key should never be used";
- DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW(
- gabwFactory,
- windowingStrategy,
- fakeKey,
- Collections.<WindowedValue<InputT>>emptyList());
+ List<WindowedValue<KV<K, OutputT>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ fakeKey,
+ Collections.<WindowedValue<InputT>>emptyList());
- assertThat(result.peekOutputElements(), hasSize(0));
+ assertThat(result, hasSize(0));
}
/**
@@ -102,38 +112,32 @@ public class GroupAlsoByWindowsProperties {
*/
public static void groupsElementsIntoFixedWindows(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
- DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
- runGABW(gabwFactory, windowingStrategy, "key",
+ List<WindowedValue<KV<String, Iterable<String>>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "key",
WindowedValue.of(
- "v1",
- new Instant(1),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v2",
- new Instant(2),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v3",
- new Instant(13),
- Arrays.asList(window(10, 20)),
- PaneInfo.NO_FIRING));
+ "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(2));
+ assertThat(result, hasSize(2));
TimestampedValue<KV<String, Iterable<String>>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
+ getOnlyElementInWindow(result, window(0, 10));
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
TimestampedValue<KV<String, Iterable<String>>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
+ getOnlyElementInWindow(result, window(10, 20));
assertThat(item1.getValue().getValue(), contains("v3"));
assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
}
@@ -146,14 +150,17 @@ public class GroupAlsoByWindowsProperties {
*/
public static void groupsElementsIntoSlidingWindowsWithMinTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
- throws Exception {
+ throws Exception {
- WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(
- SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
- DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
- runGABW(gabwFactory, windowingStrategy, "key",
+ List<WindowedValue<KV<String, Iterable<String>>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "key",
WindowedValue.of(
"v1",
new Instant(5),
@@ -165,21 +172,21 @@ public class GroupAlsoByWindowsProperties {
Arrays.asList(window(0, 20), window(10, 30)),
PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(3));
+ assertThat(result, hasSize(3));
TimestampedValue<KV<String, Iterable<String>>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
+ getOnlyElementInWindow(result, window(-10, 10));
assertThat(item0.getValue().getValue(), contains("v1"));
assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
TimestampedValue<KV<String, Iterable<String>>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
+ getOnlyElementInWindow(result, window(0, 20));
assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2"));
// Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
assertThat(item1.getTimestamp(), equalTo(new Instant(10)));
TimestampedValue<KV<String, Iterable<String>>> item2 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
+ getOnlyElementInWindow(result, window(10, 30));
assertThat(item2.getValue().getValue(), contains("v2"));
// Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
assertThat(item2.getTimestamp(), equalTo(new Instant(20)));
@@ -194,14 +201,17 @@ public class GroupAlsoByWindowsProperties {
public static void combinesElementsInSlidingWindows(
GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
CombineFn<Long, ?, Long> combineFn)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
- DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
- runGABW(gabwFactory, windowingStrategy, "k",
+ List<WindowedValue<KV<String, Long>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "k",
WindowedValue.of(
1L,
new Instant(5),
@@ -218,23 +228,20 @@ public class GroupAlsoByWindowsProperties {
Arrays.asList(window(0, 20), window(10, 30)),
PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(3));
+ assertThat(result, hasSize(3));
- TimestampedValue<KV<String, Long>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
+ TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, window(-10, 10));
assertThat(item0.getValue().getKey(), equalTo("k"));
assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L))));
assertThat(item0.getTimestamp(), equalTo(new Instant(5L)));
- TimestampedValue<KV<String, Long>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
+ TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, window(0, 20));
assertThat(item1.getValue().getKey(), equalTo("k"));
assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L))));
// Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
assertThat(item1.getTimestamp(), equalTo(new Instant(10L)));
- TimestampedValue<KV<String, Long>> item2 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
+ TimestampedValue<KV<String, Long>> item2 = getOnlyElementInWindow(result, window(10, 30));
assertThat(item2.getValue().getKey(), equalTo("k"));
assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L))));
// Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
@@ -247,79 +254,63 @@ public class GroupAlsoByWindowsProperties {
*/
public static void groupsIntoOverlappingNonmergingWindows(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
- DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
- runGABW(gabwFactory, windowingStrategy, "key",
- WindowedValue.of(
- "v1",
- new Instant(1),
- Arrays.asList(window(0, 5)),
- PaneInfo.NO_FIRING),
- WindowedValue.of(
- "v2",
- new Instant(4),
- Arrays.asList(window(1, 5)),
- PaneInfo.NO_FIRING),
+ List<WindowedValue<KV<String, Iterable<String>>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "key",
+ WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING),
+ WindowedValue.of("v2", new Instant(4), Arrays.asList(window(1, 5)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v3",
- new Instant(4),
- Arrays.asList(window(0, 5)),
- PaneInfo.NO_FIRING));
+ "v3", new Instant(4), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(2));
+ assertThat(result, hasSize(2));
TimestampedValue<KV<String, Iterable<String>>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 5)));
+ getOnlyElementInWindow(result, window(0, 5));
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));
TimestampedValue<KV<String, Iterable<String>>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(1, 5)));
+ getOnlyElementInWindow(result, window(1, 5));
assertThat(item1.getValue().getValue(), contains("v2"));
assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
}
- /**
- * Tests that the given GABW implementation correctly groups elements into merged sessions.
- */
+ /** Tests that the given GABW implementation correctly groups elements into merged sessions. */
public static void groupsElementsInMergedSessions(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
- DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
- runGABW(gabwFactory, windowingStrategy, "key",
+ List<WindowedValue<KV<String, Iterable<String>>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "key",
WindowedValue.of(
- "v1",
- new Instant(0),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v2",
- new Instant(5),
- Arrays.asList(window(5, 15)),
- PaneInfo.NO_FIRING),
+ "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v3",
- new Instant(15),
- Arrays.asList(window(15, 25)),
- PaneInfo.NO_FIRING));
+ "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(2));
+ assertThat(result, hasSize(2));
TimestampedValue<KV<String, Iterable<String>>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
+ getOnlyElementInWindow(result, window(0, 15));
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
TimestampedValue<KV<String, Iterable<String>>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
+ getOnlyElementInWindow(result, window(15, 25));
assertThat(item1.getValue().getValue(), contains("v3"));
assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
}
@@ -331,39 +322,29 @@ public class GroupAlsoByWindowsProperties {
public static void combinesElementsPerSession(
GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
CombineFn<Long, ?, Long> combineFn)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
- DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
- runGABW(gabwFactory, windowingStrategy, "k",
- WindowedValue.of(
- 1L,
- new Instant(0),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ List<WindowedValue<KV<String, Long>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "k",
+ WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
+ WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
WindowedValue.of(
- 2L,
- new Instant(5),
- Arrays.asList(window(5, 15)),
- PaneInfo.NO_FIRING),
- WindowedValue.of(
- 4L,
- new Instant(15),
- Arrays.asList(window(15, 25)),
- PaneInfo.NO_FIRING));
+ 4L, new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(2));
+ assertThat(result, hasSize(2));
- TimestampedValue<KV<String, Long>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
+ TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, window(0, 15));
assertThat(item0.getValue().getKey(), equalTo("k"));
assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
- TimestampedValue<KV<String, Long>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
+ TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, window(15, 25));
assertThat(item1.getValue().getKey(), equalTo("k"));
assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
@@ -371,176 +352,152 @@ public class GroupAlsoByWindowsProperties {
/**
* Tests that for a simple sequence of elements on the same key, the given GABW implementation
- * correctly groups them according to fixed windows and also sets the output timestamp
- * according to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
+ * correctly groups them according to fixed windows and also sets the output timestamp according
+ * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
*/
public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
- DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
- runGABW(gabwFactory, windowingStrategy, "key",
+ List<WindowedValue<KV<String, Iterable<String>>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "key",
WindowedValue.of(
- "v1",
- new Instant(1),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v2",
- new Instant(2),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v3",
- new Instant(13),
- Arrays.asList(window(10, 20)),
- PaneInfo.NO_FIRING));
+ "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(2));
+ assertThat(result, hasSize(2));
TimestampedValue<KV<String, Iterable<String>>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
+ getOnlyElementInWindow(result, window(0, 10));
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
TimestampedValue<KV<String, Iterable<String>>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
+ getOnlyElementInWindow(result, window(10, 20));
assertThat(item1.getValue().getValue(), contains("v3"));
assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
}
/**
* Tests that for a simple sequence of elements on the same key, the given GABW implementation
- * correctly groups them according to fixed windows and also sets the output timestamp
- * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
+ * correctly groups them according to fixed windows and also sets the output timestamp according
+ * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
*/
public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+ .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
- DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
- runGABW(gabwFactory, windowingStrategy, "k",
+ List<WindowedValue<KV<String, Iterable<String>>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "k",
WindowedValue.of(
- "v1",
- new Instant(1),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v2",
- new Instant(2),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v3",
- new Instant(13),
- Arrays.asList(window(10, 20)),
- PaneInfo.NO_FIRING));
+ "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(2));
+ assertThat(result, hasSize(2));
TimestampedValue<KV<String, Iterable<String>>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
+ getOnlyElementInWindow(result, window(0, 10));
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
assertThat(item0.getTimestamp(), equalTo(new Instant(2)));
TimestampedValue<KV<String, Iterable<String>>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
+ getOnlyElementInWindow(result, window(10, 20));
assertThat(item1.getValue().getValue(), contains("v3"));
assertThat(item1.getTimestamp(), equalTo(new Instant(13)));
}
/**
- * Tests that the given GABW implementation correctly groups elements into merged sessions
- * with output timestamps at the end of the merged window.
+ * Tests that the given GABW implementation correctly groups elements into merged sessions with
+ * output timestamps at the end of the merged window.
*/
public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
- DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
- runGABW(gabwFactory, windowingStrategy, "k",
+ List<WindowedValue<KV<String, Iterable<String>>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "k",
WindowedValue.of(
- "v1",
- new Instant(0),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v2",
- new Instant(5),
- Arrays.asList(window(5, 15)),
- PaneInfo.NO_FIRING),
+ "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v3",
- new Instant(15),
- Arrays.asList(window(15, 25)),
- PaneInfo.NO_FIRING));
+ "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(2));
+ assertThat(result, hasSize(2));
TimestampedValue<KV<String, Iterable<String>>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
+ getOnlyElementInWindow(result, window(0, 15));
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
TimestampedValue<KV<String, Iterable<String>>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
+ getOnlyElementInWindow(result, window(15, 25));
assertThat(item1.getValue().getValue(), contains("v3"));
assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
}
/**
- * Tests that the given GABW implementation correctly groups elements into merged sessions
- * with output timestamps at the end of the merged window.
+ * Tests that the given GABW implementation correctly groups elements into merged sessions with
+ * output timestamps at the end of the merged window.
*/
public static void groupsElementsInMergedSessionsWithLatestTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
.withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
BoundedWindow unmergedWindow = window(15, 25);
- DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
- runGABW(gabwFactory, windowingStrategy, "k",
+ List<WindowedValue<KV<String, Iterable<String>>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "k",
WindowedValue.of(
- "v1",
- new Instant(0),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
+ "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v2",
- new Instant(5),
- Arrays.asList(window(5, 15)),
- PaneInfo.NO_FIRING),
+ "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
WindowedValue.of(
- "v3",
- new Instant(15),
- Arrays.asList(unmergedWindow),
- PaneInfo.NO_FIRING));
+ "v3", new Instant(15), Arrays.asList(unmergedWindow), PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(2));
+ assertThat(result, hasSize(2));
BoundedWindow mergedWindow = window(0, 15);
TimestampedValue<KV<String, Iterable<String>>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(mergedWindow));
+ getOnlyElementInWindow(result, mergedWindow);
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
TimestampedValue<KV<String, Iterable<String>>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(unmergedWindow));
+ getOnlyElementInWindow(result, unmergedWindow);
assertThat(item1.getValue().getValue(), contains("v3"));
assertThat(item1.getTimestamp(), equalTo(new Instant(15)));
}
@@ -552,81 +509,66 @@ public class GroupAlsoByWindowsProperties {
public static void combinesElementsPerSessionWithEndOfWindowTimestamp(
GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
CombineFn<Long, ?, Long> combineFn)
- throws Exception {
+ throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
- .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
BoundedWindow secondWindow = window(15, 25);
- DoFnTester<?, KV<String, Long>> result =
- runGABW(gabwFactory, windowingStrategy, "k",
- WindowedValue.of(
- 1L,
- new Instant(0),
- Arrays.asList(window(0, 10)),
- PaneInfo.NO_FIRING),
- WindowedValue.of(
- 2L,
- new Instant(5),
- Arrays.asList(window(5, 15)),
- PaneInfo.NO_FIRING),
- WindowedValue.of(
- 4L,
- new Instant(15),
- Arrays.asList(secondWindow),
- PaneInfo.NO_FIRING));
+ List<WindowedValue<KV<String, Long>>> result =
+ runGABW(
+ gabwFactory,
+ windowingStrategy,
+ "k",
+ WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
+ WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
+ WindowedValue.of(4L, new Instant(15), Arrays.asList(secondWindow), PaneInfo.NO_FIRING));
- assertThat(result.peekOutputElements(), hasSize(2));
+ assertThat(result, hasSize(2));
BoundedWindow firstResultWindow = window(0, 15);
- TimestampedValue<KV<String, Long>> item0 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(firstResultWindow));
+ TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, firstResultWindow);
assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp()));
- TimestampedValue<KV<String, Long>> item1 =
- Iterables.getOnlyElement(result.peekOutputElementsInWindow(secondWindow));
+ TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, secondWindow);
assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
- assertThat(item1.getTimestamp(),
- equalTo(secondWindow.maxTimestamp()));
+ assertThat(item1.getTimestamp(), equalTo(secondWindow.maxTimestamp()));
}
@SafeVarargs
private static <K, InputT, OutputT, W extends BoundedWindow>
- DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
- GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
- WindowingStrategy<?, W> windowingStrategy,
- K key,
- WindowedValue<InputT>... values) throws Exception {
+ List<WindowedValue<KV<K, OutputT>>> runGABW(
+ GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
+ WindowingStrategy<?, W> windowingStrategy,
+ K key,
+ WindowedValue<InputT>... values)
+ throws Exception {
return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values));
}
private static <K, InputT, OutputT, W extends BoundedWindow>
- DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
- GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
- WindowingStrategy<?, W> windowingStrategy,
- K key,
- Collection<WindowedValue<InputT>> values) throws Exception {
+ List<WindowedValue<KV<K, OutputT>>> runGABW(
+ GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
+ WindowingStrategy<?, W> windowingStrategy,
+ K key,
+ Collection<WindowedValue<InputT>> values)
+ throws Exception {
final StateInternalsFactory<K> stateInternalsCache = new CachingStateInternalsFactory<K>();
- DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester =
- DoFnTester.of(gabwFactory.forStrategy(windowingStrategy, stateInternalsCache));
-
- // Though we use a DoFnTester, the function itself is instantiated directly by the
- // runner and should not be serialized; it may not even be serializable.
- tester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- tester.startBundle();
- tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
- tester.finishBundle();
+ List<WindowedValue<KV<K, OutputT>>> output =
+ processElement(
+ gabwFactory.forStrategy(windowingStrategy, stateInternalsCache),
+ KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
// Sanity check for corruption
- for (KV<K, OutputT> elem : tester.peekOutputElements()) {
- assertThat(elem.getKey(), equalTo(key));
+ for (WindowedValue<KV<K, OutputT>> value : output) {
+ assertThat(value.getValue().getKey(), equalTo(key));
}
- return tester;
+ return output;
}
private static BoundedWindow window(long start, long end) {
@@ -657,4 +599,158 @@ public class GroupAlsoByWindowsProperties {
return InMemoryStateInternals.forKey(key);
}
}
+
+ private static <K, InputT, OutputT, W extends BoundedWindow>
+ List<WindowedValue<KV<K, OutputT>>> processElement(
+ GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
+ KV<K, Iterable<WindowedValue<InputT>>> element)
+ throws Exception {
+ TestProcessContext<K, InputT, OutputT, W> c = new TestProcessContext<>(fn, element);
+ fn.processElement(c);
+ return c.getOutput();
+ }
+
+ private static <K, OutputT> TimestampedValue<KV<K, OutputT>> getOnlyElementInWindow(
+ List<WindowedValue<KV<K, OutputT>>> output, final BoundedWindow window) {
+ WindowedValue<KV<K, OutputT>> res =
+ Iterables.getOnlyElement(
+ Iterables.filter(
+ output,
+ new Predicate<WindowedValue<KV<K, OutputT>>>() {
+ @Override
+ public boolean apply(@Nullable WindowedValue<KV<K, OutputT>> input) {
+ return input.getWindows().contains(window);
+ }
+ }));
+ return TimestampedValue.of(res.getValue(), res.getTimestamp());
+ }
+
+ /**
+ * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link
+ * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link
+ * WindowingInternals}, but no side inputs/outputs and no normal output.
+ */
+ private static class TestProcessContext<K, InputT, OutputT, W extends BoundedWindow>
+ extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>.ProcessContext {
+ private final PipelineOptions options = PipelineOptionsFactory.create();
+ private final KV<K, Iterable<WindowedValue<InputT>>> element;
+ private final List<WindowedValue<KV<K, OutputT>>> output = new ArrayList<>();
+
+ private TestProcessContext(
+ GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
+ KV<K, Iterable<WindowedValue<InputT>>> element) {
+ fn.super();
+ this.element = element;
+ }
+
+ @Override
+ public KV<K, Iterable<WindowedValue<InputT>>> element() {
+ return element;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return GlobalWindow.INSTANCE;
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return PaneInfo.NO_FIRING;
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public WindowingInternals<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>
+ windowingInternals() {
+ return new WindowingInternals<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>() {
+ @Override
+ public void outputWindowedValue(
+ KV<K, OutputT> output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ TestProcessContext.this.output.add(WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return ImmutableList.of(GlobalWindow.INSTANCE);
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return PaneInfo.NO_FIRING;
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ @Override
+ public void output(KV<K, OutputT> output) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void outputWithTimestamp(KV<K, OutputT> output, Instant timestamp) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ throw new UnsupportedOperationException();
+ }
+
+ public List<WindowedValue<KV<K, OutputT>>> getOutput() {
+ return output;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 1a74ae7..6ee42e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
import java.io.IOException;
import java.util.Collection;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn.Context;
@@ -185,6 +186,7 @@ public class DoFnAdapters {
* If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise,
* returns {@code null}.
*/
+ @Nullable
public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) {
if (fn instanceof SimpleDoFnAdapter) {
return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 17fa612..a9f93dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -23,10 +23,10 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -36,6 +36,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ValueInSingleWindow;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -43,7 +45,6 @@ import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
@@ -86,7 +87,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
*/
@SuppressWarnings("unchecked")
public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
- return new DoFnTester<>(DoFnAdapters.toOldDoFn(fn));
+ checkNotNull(fn, "fn can't be null");
+ return new DoFnTester<>(fn);
}
/**
@@ -96,9 +98,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
* @see #of(DoFn)
*/
@SuppressWarnings("unchecked")
- public static <InputT, OutputT> DoFnTester<InputT, OutputT>
+ @Deprecated
+ public static <InputT, OutputT> DoFnTester<InputT, OutputT>
of(OldDoFn<InputT, OutputT> fn) {
- return new DoFnTester<>(fn);
+ checkNotNull(fn, "fn can't be null");
+ return new DoFnTester<>(fn.toDoFn());
}
/**
@@ -238,7 +242,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
stateInternals = InMemoryStateInternals.forKey(new Object());
timerInternals = new InMemoryTimerInternals();
try {
- fn.startBundle(context);
+ fnInvoker.invokeStartBundle(context);
} catch (UserCodeException e) {
unwrapUserCodeException(e);
}
@@ -271,8 +275,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
/**
- * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a
- * context where {@link OldDoFn.ProcessContext#element} returns the
+ * Calls {@link DoFn.ProcessElement} on the {@code DoFn} under test, in a
+ * context where {@link DoFn.ProcessContext#element} returns the
* given element and timestamp.
*
* <p>Will call {@link #startBundle} automatically, if it hasn't
@@ -286,7 +290,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
startBundle();
}
try {
- fn.processElement(createProcessContext(element));
+ final TestProcessContext processContext = createProcessContext(element);
+ fnInvoker.invokeProcessElement(new DoFnInvoker.FakeArgumentProvider<InputT, OutputT>() {
+ @Override
+ public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+ return processContext;
+ }
+ });
} catch (UserCodeException e) {
unwrapUserCodeException(e);
}
@@ -308,13 +318,14 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
"Must be inside bundle to call finishBundle, but was: %s",
state);
try {
- fn.finishBundle(createContext(fn));
+ fnInvoker.invokeFinishBundle(createContext(fn));
} catch (UserCodeException e) {
unwrapUserCodeException(e);
}
if (cloningBehavior == CloningBehavior.CLONE_PER_BUNDLE) {
- fn.teardown();
+ fnInvoker.invokeTeardown();
fn = null;
+ fnInvoker = null;
state = State.UNINITIALIZED;
} else {
state = State.BUNDLE_FINISHED;
@@ -532,11 +543,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return mainOutputTag;
}
- private TestContext createContext(OldDoFn<InputT, OutputT> fn) {
+ private TestContext createContext(DoFn<InputT, OutputT> fn) {
return new TestContext();
}
- private class TestContext extends OldDoFn<InputT, OutputT>.Context {
+ private class TestContext extends DoFn<InputT, OutputT>.Context {
TestContext() {
fn.super();
}
@@ -557,7 +568,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
- protected <AggInT, AggOutT> Aggregator<AggInT, AggOutT> createAggregatorInternal(
+ protected <AggInT, AggOutT> Aggregator<AggInT, AggOutT> createAggregator(
final String name, final CombineFn<AggInT, ?, AggOutT> combiner) {
return aggregator(name, combiner);
}
@@ -624,7 +635,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
elem.getValue(), elem.getTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}
- private class TestProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext {
+ private class TestProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
private final TestContext context;
private final ValueInSingleWindow<InputT> element;
@@ -644,7 +655,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
Map<BoundedWindow, ?> viewValues = sideInputs.get(view);
if (viewValues != null) {
BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window());
+ view.getWindowingStrategyInternal()
+ .getWindowFn()
+ .getSideInputWindow(element.getWindow());
@SuppressWarnings("unchecked")
T windowValue = (T) viewValues.get(sideInputWindow);
if (windowValue != null) {
@@ -660,73 +673,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
- public BoundedWindow window() {
- return element.getWindow();
- }
-
- @Override
public PaneInfo pane() {
return element.getPane();
}
@Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return new WindowingInternals<InputT, OutputT>() {
- @Override
- public StateInternals<?> stateInternals() {
- return stateInternals;
- }
-
- @Override
- public void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- for (BoundedWindow window : windows) {
- context.noteOutput(
- mainOutputTag, ValueInSingleWindow.of(output, timestamp, window, pane));
- }
- }
-
- @Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- for (BoundedWindow window : windows) {
- context.noteOutput(
- tag, ValueInSingleWindow.of(output, timestamp, window, pane));
- }
- }
-
- @Override
- public TimerInternals timerInternals() {
- return timerInternals;
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return Collections.singleton(element.getWindow());
- }
-
- @Override
- public PaneInfo pane() {
- return element.getPane();
- }
-
- @Override
- public <T> T sideInput(
- PCollectionView<T> view, BoundedWindow sideInputWindow) {
- throw new UnsupportedOperationException(
- "SideInput from WindowingInternals is not supported in in the context of DoFnTester");
- }
- };
- }
-
- @Override
public PipelineOptions getPipelineOptions() {
return context.getPipelineOptions();
}
@@ -753,10 +704,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
throw new IllegalStateException("Aggregators should not be created within ProcessContext. "
- + "Instead, create an aggregator at OldDoFn construction time with"
+ + "Instead, create an aggregator at DoFn construction time with"
+ " createAggregator, and ensure they are set up by the time startBundle is"
+ " called with setupDelegateAggregators.");
}
@@ -768,8 +719,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
finishBundle();
}
if (state == State.BUNDLE_FINISHED) {
- fn.teardown();
+ fnInvoker.invokeTeardown();
fn = null;
+ fnInvoker = null;
}
state = State.TORN_DOWN;
}
@@ -786,8 +738,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private final PipelineOptions options = PipelineOptionsFactory.create();
- /** The original {@link OldDoFn} under test. */
- private final OldDoFn<InputT, OutputT> origFn;
+ /** The original {@link DoFn} under test. */
+ private final DoFn<InputT, OutputT> origFn;
/**
* Whether to clone the original {@link DoFn} or just use it as-is.
@@ -805,8 +757,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
/** The output tags used by the {@link DoFn} under test. */
private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
- /** The original OldDoFn under test, if started. */
- OldDoFn<InputT, OutputT> fn;
+ /** The original DoFn under test, if started. */
+ private DoFn<InputT, OutputT> fn;
+ private DoFnInvoker<InputT, OutputT> fnInvoker;
/** The outputs from the {@link DoFn} under test. */
private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
@@ -817,7 +770,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
/** The state of processing of the {@link DoFn} under test. */
private State state = State.UNINITIALIZED;
- private DoFnTester(OldDoFn<InputT, OutputT> origFn) {
+ private DoFnTester(DoFn<InputT, OutputT> origFn) {
this.origFn = origFn;
}
@@ -828,12 +781,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) {
fn = origFn;
} else {
- fn = (OldDoFn<InputT, OutputT>)
+ fn = (DoFn<InputT, OutputT>)
SerializableUtils.deserializeFromByteArray(
SerializableUtils.serializeToByteArray(origFn),
origFn.toString());
}
- fn.setup();
+ fnInvoker = DoFnInvokers.invokerFor(fn);
+ fnInvoker.invokeSetup();
outputs = new HashMap<>();
accumulators = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 4ad7dad..50a7082 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -18,9 +18,6 @@
package org.apache.beam.sdk.transforms.reflect;
import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.util.LinkedHashMap;
-import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
@@ -45,14 +42,6 @@ public class DoFnInvokers {
return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
}
- /**
- * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class.
- * Needed because generating an invoker class is expensive, and to avoid generating an excessive
- * number of classes consuming PermGen memory.
- */
- private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
- new LinkedHashMap<>();
-
private DoFnInvokers() {}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index ac76b2e..ff8a9bc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -350,14 +350,14 @@ public class DoFnTesterTest {
}
}
- private static class SideInputDoFn extends OldDoFn<Integer, Integer> {
+ private static class SideInputDoFn extends DoFn<Integer, Integer> {
private final PCollectionView<Integer> value;
private SideInputDoFn(PCollectionView<Integer> value) {
this.value = value;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.sideInput(value));
}