You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/01 00:26:45 UTC
[1/2] incubator-beam git commit: This closes #1471
Repository: incubator-beam
Updated Branches:
refs/heads/master 565e99fbf -> a20bc4793
This closes #1471
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a20bc479
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a20bc479
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a20bc479
Branch: refs/heads/master
Commit: a20bc479347b2a10307ada45dc7220ee00671fac
Parents: 565e99f 38f0b11
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 30 16:26:33 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 30 16:26:33 2016 -0800
----------------------------------------------------------------------
.../core/PushbackSideInputDoFnRunner.java | 20 ++++++++++++++++----
.../core/PushbackSideInputDoFnRunnerTest.java | 18 +++++++++++-------
2 files changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Preserves compressed windows in
PushbackSideInputDoFnRunner
Posted by tg...@apache.org.
Preserves compressed windows in PushbackSideInputDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38f0b11c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38f0b11c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38f0b11c
Branch: refs/heads/master
Commit: 38f0b11cc9028cf347e3c96b6e6116e5a5a9972d
Parents: 565e99f
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Nov 30 14:28:51 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 30 16:26:33 2016 -0800
----------------------------------------------------------------------
.../core/PushbackSideInputDoFnRunner.java | 20 ++++++++++++++++----
.../core/PushbackSideInputDoFnRunnerTest.java | 18 +++++++++++-------
2 files changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 8c169da..460154d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -74,17 +74,29 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
processElement(elem);
return Collections.emptyList();
}
- ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+ ImmutableList.Builder<BoundedWindow> readyWindowsBuilder = ImmutableList.builder();
+ ImmutableList.Builder<BoundedWindow> pushedBackWindowsBuilder = ImmutableList.builder();
for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
if (isReady(mainInputWindow)) {
- processElement(windowElem);
+ readyWindowsBuilder.add(mainInputWindow);
} else {
notReadyWindows.add(mainInputWindow);
- pushedBack.add(windowElem);
+ pushedBackWindowsBuilder.add(mainInputWindow);
}
}
- return pushedBack.build();
+ ImmutableList<BoundedWindow> readyWindows = readyWindowsBuilder.build();
+ ImmutableList<BoundedWindow> pushedBackWindows = pushedBackWindowsBuilder.build();
+ if (!readyWindows.isEmpty()) {
+ processElement(
+ WindowedValue.of(
+ elem.getValue(), elem.getTimestamp(), readyWindows, elem.getPane()));
+ }
+ return pushedBackWindows.isEmpty()
+ ? ImmutableList.<WindowedValue<InputT>>of()
+ : ImmutableList.of(
+ WindowedValue.of(
+ elem.getValue(), elem.getTimestamp(), pushedBackWindows, elem.getPane()));
}
private boolean isReady(BoundedWindow mainInputWindow) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index 59a7c92..f8f4604 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.core;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
@@ -131,7 +130,7 @@ public class PushbackSideInputDoFnRunnerTest {
PaneInfo.ON_TIME_AND_ONLY_FIRING);
Iterable<WindowedValue<Integer>> multiWindowPushback =
runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+ assertThat(multiWindowPushback, contains(multiWindow));
assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
}
@@ -162,9 +161,14 @@ public class PushbackSideInputDoFnRunnerTest {
assertThat(
multiWindowPushback,
containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
- assertThat(underlying.inputElems,
- containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING),
- WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING)));
+ assertThat(
+ underlying.inputElems,
+ containsInAnyOrder(
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(littleWindow, bigWindow),
+ PaneInfo.NO_FIRING)));
}
@Test
@@ -188,7 +192,7 @@ public class PushbackSideInputDoFnRunnerTest {
runner.processElementInReadyWindows(multiWindow);
assertThat(multiWindowPushback, emptyIterable());
assertThat(underlying.inputElems,
- containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+ containsInAnyOrder(ImmutableList.of(multiWindow).toArray()));
}
@Test