You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/13 23:48:02 UTC
[1/2] incubator-beam git commit: [BEAM-1149] Explode windows when fn
uses side inputs
Repository: incubator-beam
Updated Branches:
refs/heads/release-0.4.0-incubating 9f3b06359 -> b2780881a
[BEAM-1149] Explode windows when fn uses side inputs
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d89bfce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d89bfce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d89bfce
Branch: refs/heads/release-0.4.0-incubating
Commit: 8d89bfce251d27c7fc6ff522a2cfeb676ec2c633
Parents: 9f3b063
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Dec 13 14:35:33 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 13 15:17:18 2016 -0800
----------------------------------------------------------------------
.../core/PushbackSideInputDoFnRunner.java | 23 +++-------
.../core/PushbackSideInputDoFnRunnerTest.java | 16 +++----
.../apache/beam/sdk/transforms/ParDoTest.java | 45 ++++++++++++++++++++
3 files changed, 60 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d89bfce/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 460154d..0bb9153 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -71,32 +71,23 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
*/
public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
if (views.isEmpty()) {
+ // When there are no side inputs, we can preserve the compressed representation.
processElement(elem);
return Collections.emptyList();
}
- ImmutableList.Builder<BoundedWindow> readyWindowsBuilder = ImmutableList.builder();
- ImmutableList.Builder<BoundedWindow> pushedBackWindowsBuilder = ImmutableList.builder();
+ ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
if (isReady(mainInputWindow)) {
- readyWindowsBuilder.add(mainInputWindow);
+ // When there are any side inputs, we have to process the element in each window
+ // individually, to disambiguate access to per-window side inputs.
+ processElement(windowElem);
} else {
notReadyWindows.add(mainInputWindow);
- pushedBackWindowsBuilder.add(mainInputWindow);
+ pushedBack.add(windowElem);
}
}
- ImmutableList<BoundedWindow> readyWindows = readyWindowsBuilder.build();
- ImmutableList<BoundedWindow> pushedBackWindows = pushedBackWindowsBuilder.build();
- if (!readyWindows.isEmpty()) {
- processElement(
- WindowedValue.of(
- elem.getValue(), elem.getTimestamp(), readyWindows, elem.getPane()));
- }
- return pushedBackWindows.isEmpty()
- ? ImmutableList.<WindowedValue<InputT>>of()
- : ImmutableList.of(
- WindowedValue.of(
- elem.getValue(), elem.getTimestamp(), pushedBackWindows, elem.getPane()));
+ return pushedBack.build();
}
private boolean isReady(BoundedWindow mainInputWindow) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d89bfce/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index f8f4604..176ab26 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.core;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
@@ -130,7 +130,7 @@ public class PushbackSideInputDoFnRunnerTest {
PaneInfo.ON_TIME_AND_ONLY_FIRING);
Iterable<WindowedValue<Integer>> multiWindowPushback =
runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, contains(multiWindow));
+ assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
}
@@ -165,10 +165,8 @@ public class PushbackSideInputDoFnRunnerTest {
underlying.inputElems,
containsInAnyOrder(
WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(littleWindow, bigWindow),
- PaneInfo.NO_FIRING)));
+ 2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING),
+ WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING)));
}
@Test
@@ -191,8 +189,9 @@ public class PushbackSideInputDoFnRunnerTest {
Iterable<WindowedValue<Integer>> multiWindowPushback =
runner.processElementInReadyWindows(multiWindow);
assertThat(multiWindowPushback, emptyIterable());
- assertThat(underlying.inputElems,
- containsInAnyOrder(ImmutableList.of(multiWindow).toArray()));
+ assertThat(
+ underlying.inputElems,
+ containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
}
@Test
@@ -212,6 +211,7 @@ public class PushbackSideInputDoFnRunnerTest {
Iterable<WindowedValue<Integer>> multiWindowPushback =
runner.processElementInReadyWindows(multiWindow);
assertThat(multiWindowPushback, emptyIterable());
+ // Should preserve the compressed representation when there's no side inputs.
assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d89bfce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 2d118e4..4a3e2dd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
@@ -88,6 +89,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.joda.time.MutableDateTime;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -724,6 +726,49 @@ public class ParDoTest implements Serializable {
pipeline.run();
}
+ private static class FnWithSideInputs extends DoFn<String, String> {
+ private final PCollectionView<Integer> view;
+
+ private FnWithSideInputs(PCollectionView<Integer> view) {
+ this.view = view;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element() + ":" + c.sideInput(view));
+ }
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testSideInputsWithMultipleWindows() {
+ // Tests that the runner can safely run a DoFn that uses side inputs
+ // on an input where the element is in multiple windows. The complication is
+ // that side inputs are per-window, so the runner has to make sure
+ // to process each window individually.
+ Pipeline p = TestPipeline.create();
+
+ MutableDateTime mutableNow = Instant.now().toMutableDateTime();
+ mutableNow.setMillisOfSecond(0);
+ Instant now = mutableNow.toInstant();
+
+ SlidingWindows windowFn =
+ SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
+ PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
+ PCollection<String> res =
+ p.apply(Create.timestamped(TimestampedValue.of("a", now)))
+ .apply(Window.<String>into(windowFn))
+ .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));
+
+ for (int i = 0; i < 4; ++i) {
+ Instant base = now.minus(Duration.standardSeconds(i));
+ IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5)));
+ PAssert.that(res).inWindow(window).containsInAnyOrder("a:1");
+ }
+
+ p.run();
+ }
+
@Test
@Category(NeedsRunner.class)
public void testParDoWithErrorInStartBatch() {
[2/2] incubator-beam git commit: This closes #1602
Posted by ke...@apache.org.
This closes #1602
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2780881
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2780881
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2780881
Branch: refs/heads/release-0.4.0-incubating
Commit: b2780881a6d735420a2d29c3270b781ea8a3b74f
Parents: 9f3b063 8d89bfc
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 13 15:47:41 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 13 15:47:41 2016 -0800
----------------------------------------------------------------------
.../core/PushbackSideInputDoFnRunner.java | 23 +++-------
.../core/PushbackSideInputDoFnRunnerTest.java | 16 +++----
.../apache/beam/sdk/transforms/ParDoTest.java | 45 ++++++++++++++++++++
3 files changed, 60 insertions(+), 24 deletions(-)
----------------------------------------------------------------------