You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/01 00:26:45 UTC

[1/2] incubator-beam git commit: This closes #1471

Repository: incubator-beam
Updated Branches:
  refs/heads/master 565e99fbf -> a20bc4793


This closes #1471


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a20bc479
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a20bc479
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a20bc479

Branch: refs/heads/master
Commit: a20bc479347b2a10307ada45dc7220ee00671fac
Parents: 565e99f 38f0b11
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 30 16:26:33 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 30 16:26:33 2016 -0800

----------------------------------------------------------------------
 .../core/PushbackSideInputDoFnRunner.java       | 20 ++++++++++++++++----
 .../core/PushbackSideInputDoFnRunnerTest.java   | 18 +++++++++++-------
 2 files changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Preserves compressed windows in PushbackSideInputDoFnRunner

Posted by tg...@apache.org.
Preserves compressed windows in PushbackSideInputDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38f0b11c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38f0b11c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38f0b11c

Branch: refs/heads/master
Commit: 38f0b11cc9028cf347e3c96b6e6116e5a5a9972d
Parents: 565e99f
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Nov 30 14:28:51 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 30 16:26:33 2016 -0800

----------------------------------------------------------------------
 .../core/PushbackSideInputDoFnRunner.java       | 20 ++++++++++++++++----
 .../core/PushbackSideInputDoFnRunnerTest.java   | 18 +++++++++++-------
 2 files changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 8c169da..460154d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -74,17 +74,29 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<
       processElement(elem);
       return Collections.emptyList();
     }
-    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+    ImmutableList.Builder<BoundedWindow> readyWindowsBuilder = ImmutableList.builder();
+    ImmutableList.Builder<BoundedWindow> pushedBackWindowsBuilder = ImmutableList.builder();
     for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
       BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
       if (isReady(mainInputWindow)) {
-        processElement(windowElem);
+        readyWindowsBuilder.add(mainInputWindow);
       } else {
         notReadyWindows.add(mainInputWindow);
-        pushedBack.add(windowElem);
+        pushedBackWindowsBuilder.add(mainInputWindow);
       }
     }
-    return pushedBack.build();
+    ImmutableList<BoundedWindow> readyWindows = readyWindowsBuilder.build();
+    ImmutableList<BoundedWindow> pushedBackWindows = pushedBackWindowsBuilder.build();
+    if (!readyWindows.isEmpty()) {
+      processElement(
+          WindowedValue.of(
+              elem.getValue(), elem.getTimestamp(), readyWindows, elem.getPane()));
+    }
+    return pushedBackWindows.isEmpty()
+        ? ImmutableList.<WindowedValue<InputT>>of()
+        : ImmutableList.of(
+            WindowedValue.of(
+                elem.getValue(), elem.getTimestamp(), pushedBackWindows, elem.getPane()));
   }
 
   private boolean isReady(BoundedWindow mainInputWindow) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index 59a7c92..f8f4604 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
@@ -131,7 +130,7 @@ public class PushbackSideInputDoFnRunnerTest {
             PaneInfo.ON_TIME_AND_ONLY_FIRING);
     Iterable<WindowedValue<Integer>> multiWindowPushback =
         runner.processElementInReadyWindows(multiWindow);
-    assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+    assertThat(multiWindowPushback, contains(multiWindow));
     assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
   }
 
@@ -162,9 +161,14 @@ public class PushbackSideInputDoFnRunnerTest {
     assertThat(
         multiWindowPushback,
         containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
-    assertThat(underlying.inputElems,
-        containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING),
-            WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING)));
+    assertThat(
+        underlying.inputElems,
+        containsInAnyOrder(
+            WindowedValue.of(
+                2,
+                new Instant(-2),
+                ImmutableList.of(littleWindow, bigWindow),
+                PaneInfo.NO_FIRING)));
   }
 
   @Test
@@ -188,7 +192,7 @@ public class PushbackSideInputDoFnRunnerTest {
         runner.processElementInReadyWindows(multiWindow);
     assertThat(multiWindowPushback, emptyIterable());
     assertThat(underlying.inputElems,
-        containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+        containsInAnyOrder(ImmutableList.of(multiWindow).toArray()));
   }
 
   @Test