You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/01 17:45:42 UTC

[1/2] incubator-beam git commit: Short-circuit side input window checks in PushbackDoFnRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master fab7b2402 -> 68623e91f


Short-circuit side input window checks in PushbackDoFnRunner

This uses the collection of not-ready windows to avoid checking when the
answer must be false.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5bf23ac0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5bf23ac0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5bf23ac0

Branch: refs/heads/master
Commit: 5bf23ac0d3f41a3b3e2088024996b1247d246131
Parents: fab7b24
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 28 09:21:51 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 1 10:44:56 2016 -0700

----------------------------------------------------------------------
 .../core/PushbackSideInputDoFnRunner.java       | 28 +++++++++++---------
 1 file changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5bf23ac0/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index deeac3c..8c169da 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
-
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -78,18 +77,7 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
     ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
     for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
       BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
-      boolean isReady = !notReadyWindows.contains(mainInputWindow);
-      for (PCollectionView<?> view : views) {
-        BoundedWindow sideInputWindow =
-            view.getWindowingStrategyInternal()
-                .getWindowFn()
-                .getSideInputWindow(mainInputWindow);
-        if (!sideInputReader.isReady(view, sideInputWindow)) {
-          isReady = false;
-          break;
-        }
-      }
-      if (isReady) {
+      if (isReady(mainInputWindow)) {
         processElement(windowElem);
       } else {
         notReadyWindows.add(mainInputWindow);
@@ -99,6 +87,20 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
     return pushedBack.build();
   }
 
+  private boolean isReady(BoundedWindow mainInputWindow) {
+    if (notReadyWindows.contains(mainInputWindow)) {
+      return false;
+    }
+    for (PCollectionView<?> view : views) {
+      BoundedWindow sideInputWindow =
+          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+      if (!sideInputReader.isReady(view, sideInputWindow)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   public void processElement(WindowedValue<InputT> elem) {
     underlying.processElement(elem);


[2/2] incubator-beam git commit: [BEAM-447] Remove Window.Unbound

Posted by tg...@apache.org.
[BEAM-447] Remove Window.Unbound

This closes #1188


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/68623e91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/68623e91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/68623e91

Branch: refs/heads/master
Commit: 68623e91fe46bf1aa87ce057e7cffc221d687c1f
Parents: fab7b24 5bf23ac
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 1 10:45:25 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 1 10:45:25 2016 -0700

----------------------------------------------------------------------
 .../core/PushbackSideInputDoFnRunner.java       | 28 +++++++++++---------
 1 file changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------