You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/01 17:45:42 UTC
[1/2] incubator-beam git commit: Short-circuit side input window
checks in PushbackDoFnRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master fab7b2402 -> 68623e91f
Short-circuit side input window checks in PushbackDoFnRunner
This uses the collection of not-ready windows to avoid checking when the
answer must be false.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5bf23ac0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5bf23ac0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5bf23ac0
Branch: refs/heads/master
Commit: 5bf23ac0d3f41a3b3e2088024996b1247d246131
Parents: fab7b24
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 28 09:21:51 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 1 10:44:56 2016 -0700
----------------------------------------------------------------------
.../core/PushbackSideInputDoFnRunner.java | 28 +++++++++++---------
1 file changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5bf23ac0/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index deeac3c..8c169da 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
@@ -78,18 +77,7 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
- boolean isReady = !notReadyWindows.contains(mainInputWindow);
- for (PCollectionView<?> view : views) {
- BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal()
- .getWindowFn()
- .getSideInputWindow(mainInputWindow);
- if (!sideInputReader.isReady(view, sideInputWindow)) {
- isReady = false;
- break;
- }
- }
- if (isReady) {
+ if (isReady(mainInputWindow)) {
processElement(windowElem);
} else {
notReadyWindows.add(mainInputWindow);
@@ -99,6 +87,20 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
return pushedBack.build();
}
+ private boolean isReady(BoundedWindow mainInputWindow) {
+ if (notReadyWindows.contains(mainInputWindow)) {
+ return false;
+ }
+ for (PCollectionView<?> view : views) {
+ BoundedWindow sideInputWindow =
+ view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+ if (!sideInputReader.isReady(view, sideInputWindow)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public void processElement(WindowedValue<InputT> elem) {
underlying.processElement(elem);
[2/2] incubator-beam git commit: [BEAM-447] Remove Window.Unbound
Posted by tg...@apache.org.
[BEAM-447] Remove Window.Unbound
This closes #1188
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/68623e91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/68623e91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/68623e91
Branch: refs/heads/master
Commit: 68623e91fe46bf1aa87ce057e7cffc221d687c1f
Parents: fab7b24 5bf23ac
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 1 10:45:25 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 1 10:45:25 2016 -0700
----------------------------------------------------------------------
.../core/PushbackSideInputDoFnRunner.java | 28 +++++++++++---------
1 file changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------