You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/23 17:01:44 UTC

[1/5] incubator-beam git commit: Explode windows in DirectRunner's Window.into evaluator

Repository: incubator-beam
Updated Branches:
  refs/heads/master f2d2ce5f4 -> 82ae661c5


Explode windows in DirectRunner's Window.into evaluator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3aa4c7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3aa4c7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3aa4c7f

Branch: refs/heads/master
Commit: a3aa4c7f1bd54122358c8e41e984a0d0000c160b
Parents: edf11fa
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jun 20 11:37:45 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 21 20:58:31 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/WindowEvaluatorFactory.java  |  12 +-
 .../direct/WindowEvaluatorFactoryTest.java      | 174 ++++++++++++-------
 .../org/apache/beam/sdk/WindowMatchers.java     |  80 ++++++++-
 3 files changed, 193 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3aa4c7f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index b07b58a..6045912 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -82,11 +82,13 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public void processElement(WindowedValue<InputT> element) throws Exception {
-      Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
-      outputBundle.add(
-          WindowedValue.<InputT>of(
-              element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
+    public void processElement(WindowedValue<InputT> compressedElement) throws Exception {
+      for (WindowedValue<InputT> element : compressedElement.explodeWindows()) {
+        Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
+        outputBundle.add(
+            WindowedValue.<InputT>of(
+                element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
+      }
     }
 
     private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3aa4c7f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 71abcca..c5faa5a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -73,23 +76,30 @@ public class WindowEvaluatorFactoryTest {
 
   private BundleFactory bundleFactory;
 
-  private WindowedValue<Long> first =
+  private WindowedValue<Long> valueInGlobalWindow =
       WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L));
-  private WindowedValue<Long> second =
-      WindowedValue.of(Long.valueOf(1L),
-          EPOCH.plus(Duration.standardDays(3)),
-          ImmutableList.of(GlobalWindow.INSTANCE,
-              new IntervalWindow(EPOCH, BoundedWindow.TIMESTAMP_MAX_VALUE),
-              new IntervalWindow(EPOCH.plus(Duration.standardDays(3)),
-                  EPOCH.plus(Duration.standardDays(6)))),
-          PaneInfo.NO_FIRING);
-  private WindowedValue<Long> third =
+
+  private WindowedValue<Long> valueInIntervalWindow =
       WindowedValue.of(
           Long.valueOf(2L),
           new Instant(-10L),
           new IntervalWindow(new Instant(-100), EPOCH),
           PaneInfo.NO_FIRING);
 
+  private IntervalWindow intervalWindow1 =
+      new IntervalWindow(EPOCH, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+  private IntervalWindow intervalWindow2 =
+      new IntervalWindow(
+          EPOCH.plus(Duration.standardDays(3)), EPOCH.plus(Duration.standardDays(6)));
+
+  private WindowedValue<Long> valueInGlobalAndTwoIntervalWindows =
+      WindowedValue.of(
+          Long.valueOf(1L),
+          EPOCH.plus(Duration.standardDays(3)),
+          ImmutableList.of(GlobalWindow.INSTANCE, intervalWindow1, intervalWindow2),
+          PaneInfo.NO_FIRING);
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
@@ -118,7 +128,10 @@ public class WindowEvaluatorFactoryTest {
         Iterables.getOnlyElement(result.getOutputBundles()),
         Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
     CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
-    assertThat(committed.getElements(), containsInAnyOrder(third, first, second));
+    assertThat(
+        committed.getElements(),
+        containsInAnyOrder(
+            valueInIntervalWindow, valueInGlobalWindow, valueInGlobalAndTwoIntervalWindows));
   }
 
   @Test
@@ -141,16 +154,22 @@ public class WindowEvaluatorFactoryTest {
         Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
     CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
 
-    WindowedValue<Long> expectedNewFirst =
-        WindowedValue.of(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedNewSecond =
-        WindowedValue.of(
-            1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedNewThird =
-        WindowedValue.of(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING);
     assertThat(
         committed.getElements(),
-        containsInAnyOrder(expectedNewFirst, expectedNewSecond, expectedNewThird));
+        containsInAnyOrder(
+            // value in global window
+            isSingleWindowedValue(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING),
+
+            // value in just interval window
+            isSingleWindowedValue(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING),
+
+            // value in global window and two interval windows
+            isSingleWindowedValue(
+                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING),
+            isSingleWindowedValue(
+                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING),
+            isSingleWindowedValue(
+                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING)));
   }
 
   @Test
@@ -177,24 +196,39 @@ public class WindowEvaluatorFactoryTest {
     BoundedWindow wMinusSlide =
         new IntervalWindow(EPOCH.minus(windowDuration).plus(slidingBy), EPOCH.plus(slidingBy));
 
-    WindowedValue<Long> expectedFirst =
-        WindowedValue.of(
-            first.getValue(),
-            first.getTimestamp(),
-            ImmutableSet.of(w1, wMinusSlide),
-            PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedSecond =
-        WindowedValue.of(
-            second.getValue(), second.getTimestamp(), ImmutableSet.of(w1, w2), PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedThird =
-        WindowedValue.of(
-            third.getValue(),
-            third.getTimestamp(),
-            ImmutableSet.of(wMinus1, wMinusSlide),
-            PaneInfo.NO_FIRING);
-
     assertThat(
-        committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird));
+        committed.getElements(),
+        containsInAnyOrder(
+            // Value in global window mapped to one windowed value in multiple windows
+            isWindowedValue(
+                valueInGlobalWindow.getValue(),
+                valueInGlobalWindow.getTimestamp(),
+                ImmutableSet.of(w1, wMinusSlide),
+                PaneInfo.NO_FIRING),
+
+            // Value in interval window mapped to one windowed value in multiple windows
+            isWindowedValue(
+                valueInIntervalWindow.getValue(),
+                valueInIntervalWindow.getTimestamp(),
+                ImmutableSet.of(wMinus1, wMinusSlide),
+                PaneInfo.NO_FIRING),
+
+            // Value in three windows mapped to three windowed values in the same multiple windows
+            isWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                ImmutableSet.of(w1, w2),
+                PaneInfo.NO_FIRING),
+            isWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                ImmutableSet.of(w1, w2),
+                PaneInfo.NO_FIRING),
+            isWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                ImmutableSet.of(w1, w2),
+                PaneInfo.NO_FIRING)));
   }
 
   @Test
@@ -212,34 +246,54 @@ public class WindowEvaluatorFactoryTest {
         Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
     CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
 
-    WindowedValue<Long> expectedFirst =
-        WindowedValue.of(
-            first.getValue(),
-            first.getTimestamp(),
-            new IntervalWindow(first.getTimestamp(), first.getTimestamp().plus(1L)),
-            PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedSecond = WindowedValue.of(second.getValue(),
-        second.getTimestamp(),
-        new IntervalWindow(second.getTimestamp(), second.getTimestamp().plus(1L)),
-        PaneInfo.NO_FIRING);
-    WindowedValue<Long> expectedThird =
-        WindowedValue.of(
-            third.getValue(),
-            third.getTimestamp(),
-            third.getWindows(),
-            PaneInfo.NO_FIRING);
-
     assertThat(
-        committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird));
+        committed.getElements(),
+        containsInAnyOrder(
+            // Value in global window mapped to [timestamp, timestamp+1)
+            isSingleWindowedValue(
+                valueInGlobalWindow.getValue(),
+                valueInGlobalWindow.getTimestamp(),
+                new IntervalWindow(
+                    valueInGlobalWindow.getTimestamp(),
+                    valueInGlobalWindow.getTimestamp().plus(1L)),
+                PaneInfo.NO_FIRING),
+
+            // Value in interval window mapped to the same window
+            isWindowedValue(
+                valueInIntervalWindow.getValue(),
+                valueInIntervalWindow.getTimestamp(),
+                valueInIntervalWindow.getWindows(),
+                PaneInfo.NO_FIRING),
+
+            // Value in global window and two interval windows exploded and mapped in both ways
+            isSingleWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                new IntervalWindow(
+                    valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                    valueInGlobalAndTwoIntervalWindows.getTimestamp().plus(1L)),
+                PaneInfo.NO_FIRING),
+
+            isSingleWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                intervalWindow1,
+                PaneInfo.NO_FIRING),
+
+            isSingleWindowedValue(
+                valueInGlobalAndTwoIntervalWindows.getValue(),
+                valueInGlobalAndTwoIntervalWindows.getTimestamp(),
+                intervalWindow2,
+                PaneInfo.NO_FIRING)));
   }
 
   private CommittedBundle<Long> createInputBundle() {
     CommittedBundle<Long> inputBundle =
         bundleFactory
             .createRootBundle(input)
-            .add(first)
-            .add(second)
-            .add(third)
+            .add(valueInGlobalWindow)
+            .add(valueInGlobalAndTwoIntervalWindows)
+            .add(valueInIntervalWindow)
             .commit(Instant.now());
     return inputBundle;
   }
@@ -262,9 +316,9 @@ public class WindowEvaluatorFactoryTest {
             inputBundle,
             evaluationContext);
 
-    evaluator.processElement(first);
-    evaluator.processElement(second);
-    evaluator.processElement(third);
+    evaluator.processElement(valueInGlobalWindow);
+    evaluator.processElement(valueInGlobalAndTwoIntervalWindows);
+    evaluator.processElement(valueInIntervalWindow);
     TransformResult result = evaluator.finishBundle();
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3aa4c7f/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
index 7a5e2fb..48c2589 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
@@ -22,6 +22,8 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 
+import com.google.common.collect.Lists;
+
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
@@ -37,19 +39,51 @@ import java.util.Objects;
 public class WindowMatchers {
 
   public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
-      Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher,
+      T value,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo paneInfo) {
+
+    Collection<Matcher<? super BoundedWindow>> windowMatchers =
+        Lists.newArrayListWithCapacity(windows.size());
+    for (BoundedWindow window : windows) {
+      windowMatchers.add(Matchers.equalTo(window));
+    }
+
+    return isWindowedValue(
+        Matchers.equalTo(value),
+        Matchers.equalTo(timestamp),
+        Matchers.containsInAnyOrder(windowMatchers),
+        Matchers.equalTo(paneInfo));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
+      Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
+      Matcher<? super PaneInfo> paneInfoMatcher) {
+    return new WindowedValueMatcher<>(
+        valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher);
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
       Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowsMatcher);
+    return new WindowedValueMatcher<>(
+        valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything());
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
       Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
+    return new WindowedValueMatcher<>(
+        valueMatcher, timestampMatcher, Matchers.anything(), Matchers.anything());
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
       Matcher<? super T> valueMatcher) {
-    return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+    return new WindowedValueMatcher<>(
+        valueMatcher, Matchers.anything(), Matchers.anything(), Matchers.anything());
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
@@ -59,20 +93,46 @@ public class WindowMatchers {
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
+    return WindowMatchers.<T>isSingleWindowedValue(
+        Matchers.equalTo(value),
+        Matchers.equalTo(timestamp),
+        Matchers.equalTo(window),
+        Matchers.equalTo(paneInfo));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      T value, Instant timestamp, BoundedWindow window) {
+    return WindowMatchers.<T>isSingleWindowedValue(
+        Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
       Matcher<T> valueMatcher, long timestamp, long windowStart, long windowEnd) {
     IntervalWindow intervalWindow =
         new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
     return WindowMatchers.<T>isSingleWindowedValue(
         valueMatcher,
         Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp),
-        Matchers.<BoundedWindow>equalTo(intervalWindow));
+        Matchers.<BoundedWindow>equalTo(intervalWindow),
+        Matchers.anything());
   }
 
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
-      Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher,
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
       Matcher<? super BoundedWindow> windowMatcher) {
     return new WindowedValueMatcher<T>(
-        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher));
+        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), Matchers.anything());
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
+      Matcher<? super BoundedWindow> windowMatcher,
+      Matcher<? super PaneInfo> paneInfoMatcher) {
+    return new WindowedValueMatcher<T>(
+        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), paneInfoMatcher);
   }
 
   public static Matcher<IntervalWindow> intervalWindow(long start, long end) {
@@ -114,14 +174,17 @@ public class WindowMatchers {
     private Matcher<? super T> valueMatcher;
     private Matcher<? super Instant> timestampMatcher;
     private Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher;
+    private Matcher<? super PaneInfo> paneInfoMatcher;
 
     private WindowedValueMatcher(
         Matcher<? super T> valueMatcher,
         Matcher<? super Instant> timestampMatcher,
-        Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
+        Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
+        Matcher<? super PaneInfo> paneInfoMatcher) {
       this.valueMatcher = valueMatcher;
       this.timestampMatcher = timestampMatcher;
       this.windowsMatcher = windowsMatcher;
+      this.paneInfoMatcher = paneInfoMatcher;
     }
 
     @Override
@@ -130,6 +193,7 @@ public class WindowMatchers {
           .appendText("a WindowedValue(").appendValue(valueMatcher)
           .appendText(", ").appendValue(timestampMatcher)
           .appendText(", ").appendValue(windowsMatcher)
+          .appendText(", ").appendValue(paneInfoMatcher)
           .appendText(")");
     }
 


[4/5] incubator-beam git commit: Remove references to multi-window representation from model

Posted by ke...@apache.org.
Remove references to multi-window representation from model

Some areas of the Beam model in the SDK allude to the use of a
compressed representation of an element along with the set
of windows it is assigned to. However, the model itself views
elements in different windows as fully independent, so the SDK
should not place any obligation on the part of the runner or
user to use a particular representation.

This change removes those places in the SDK where an element
is treated in multiple windows at once.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08104410
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08104410
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08104410

Branch: refs/heads/master
Commit: 08104410177063b1095bd91b24b40f9961c92cf2
Parents: a3aa4c7
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 9 12:17:09 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 23 09:35:44 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/AssignWindowsDoFn.java | 15 ++++---
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |  2 +-
 .../beam/sdk/util/ReduceFnRunnerTest.java       |  3 +-
 .../apache/beam/sdk/util/ReduceFnTester.java    | 46 +++++++++++---------
 .../runners/direct/WindowEvaluatorFactory.java  |  6 ++-
 .../direct/WindowEvaluatorFactoryTest.java      |  4 +-
 .../FlinkStreamingTransformTranslators.java     |  5 ++-
 .../functions/FlinkAssignContext.java           | 15 ++++++-
 .../functions/FlinkNoElementAssignContext.java  |  4 +-
 .../streaming/FlinkAbstractParDoWrapper.java    |  4 +-
 .../flink/streaming/GroupAlsoByWindowTest.java  |  2 +-
 .../beam/sdk/testing/WindowFnTestUtils.java     |  5 ++-
 .../sdk/transforms/windowing/GlobalWindows.java |  5 ---
 .../windowing/PartitioningWindowFn.java         |  5 ---
 .../beam/sdk/transforms/windowing/WindowFn.java | 11 +----
 .../apache/beam/sdk/util/GatherAllPanes.java    |  3 +-
 .../apache/beam/sdk/util/IdentityWindowFn.java  | 20 +++------
 .../org/apache/beam/sdk/util/Reshuffle.java     |  3 +-
 .../sdk/util/IdentitySideInputWindowFn.java     |  3 +-
 .../sdk/util/MergingActiveWindowSetTest.java    |  6 +--
 .../org/apache/beam/sdk/util/TriggerTester.java | 14 +++---
 21 files changed, 89 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
index caec40e..d40b007 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
@@ -20,22 +20,27 @@ package org.apache.beam.sdk.util;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 
 import java.util.Collection;
 
 /**
- * {@link DoFn} that tags elements of a PCollection with windows, according
- * to the provided {@link WindowFn}.
+ * {@link DoFn} that tags elements of a {@link PCollection} with windows, according to the provided
+ * {@link WindowFn}.
+ *
  * @param <T> Type of elements being windowed
  * @param <W> Window type
  */
 @SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> {
+public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T>
+    implements RequiresWindowAccess {
   private WindowFn<? super T, W> fn;
 
   public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
@@ -64,8 +69,8 @@ public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> {
                 }
 
                 @Override
-                public Collection<? extends BoundedWindow> windows() {
-                  return c.windowingInternals().windows();
+                public BoundedWindow window() {
+                  return Iterables.getOnlyElement(c.windowingInternals().windows());
                 }
               });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index 1ebe72b..a849eb2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -254,7 +254,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
             }
 
             @Override
-            public Collection<? extends BoundedWindow> windows() {
+            public W window() {
               throw new UnsupportedOperationException(
                   "WindowFn attempted to access input windows when none were available");
             }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index b7ec540..64fcae3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -82,7 +82,6 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -258,7 +257,7 @@ public class ReduceFnRunnerTest {
               }
 
               @Override
-              public Collection<? extends BoundedWindow> windows() {
+              public BoundedWindow window() {
                 throw new UnsupportedOperationException();
               }
             }));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
index 9916c5c..e897f54 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
@@ -401,21 +401,25 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
       WindowTracing.trace("TriggerTester.injectElements: {}", value);
     }
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    runner.processElements(Iterables.transform(
-        Arrays.asList(values), new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() {
-          @Override
-          public WindowedValue<InputT> apply(TimestampedValue<InputT> input) {
-            try {
-              InputT value = input.getValue();
-              Instant timestamp = input.getTimestamp();
-              Collection<W> windows = windowFn.assignWindows(new TestAssignContext<W>(
-                  windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE)));
-              return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING);
-            } catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-          }
-        }));
+    runner.processElements(
+        Iterables.transform(
+            Arrays.asList(values),
+            new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() {
+              @Override
+              public WindowedValue<InputT> apply(TimestampedValue<InputT> input) {
+                try {
+                  InputT value = input.getValue();
+                  Instant timestamp = input.getTimestamp();
+                  Collection<W> windows =
+                      windowFn.assignWindows(
+                          new TestAssignContext<W>(
+                              windowFn, value, timestamp, GlobalWindow.INSTANCE));
+                  return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING);
+                } catch (Exception e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            }));
 
     // Persist after each bundle.
     runner.persist();
@@ -538,14 +542,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
       extends WindowFn<Object, W>.AssignContext {
     private Object element;
     private Instant timestamp;
-    private Collection<? extends BoundedWindow> windows;
+    private BoundedWindow window;
 
-    public TestAssignContext(WindowFn<Object, W> windowFn, Object element, Instant timestamp,
-        Collection<? extends BoundedWindow> windows) {
+    public TestAssignContext(
+        WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
       windowFn.super();
       this.element = element;
       this.timestamp = timestamp;
-      this.windows = windows;
+      this.window = window;
     }
 
     @Override
@@ -559,8 +563,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return windows;
+    public BoundedWindow window() {
+      return window;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 6045912..67c2f17 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -29,6 +29,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 
 import java.util.Collection;
@@ -125,8 +127,8 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return value.getWindows();
+    public BoundedWindow window() {
+      return Iterables.getOnlyElement(value.getWindows());
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index c5faa5a..65dcfeb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -326,11 +326,11 @@ public class WindowEvaluatorFactoryTest {
   private static class EvaluatorTestWindowFn extends NonMergingWindowFn<Long, BoundedWindow> {
     @Override
     public Collection<BoundedWindow> assignWindows(AssignContext c) throws Exception {
-      if (c.windows().contains(GlobalWindow.INSTANCE)) {
+      if (c.window().equals(GlobalWindow.INSTANCE)) {
         return Collections.<BoundedWindow>singleton(new IntervalWindow(c.timestamp(),
             c.timestamp().plus(1L)));
       }
-      return (Collection<BoundedWindow>) c.windows();
+      return Collections.singleton(c.window());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index b3fed99..5d04068 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 import com.google.api.client.util.Maps;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.functions.FilterFunction;
@@ -359,8 +360,8 @@ public class FlinkStreamingTransformTranslators {
                 }
 
                 @Override
-                public Collection<? extends BoundedWindow> windows() {
-                  return c.windowingInternals().windows();
+                public BoundedWindow window() {
+                  return Iterables.getOnlyElement(c.windowingInternals().windows());
                 }
               });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
index 7ea8c20..6abb8ff 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 
 import java.util.Collection;
@@ -35,6 +39,13 @@ class FlinkAssignContext<InputT, W extends BoundedWindow>
 
   FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
     fn.super();
+    checkArgument(
+        Iterables.size(value.getWindows()) == 1,
+        String.format(
+            "%s passed to window assignment must be in a single window, but it was in %s: %s",
+            WindowedValue.class.getSimpleName(),
+            Iterables.size(value.getWindows()),
+            value.getWindows()));
     this.value = value;
   }
 
@@ -49,8 +60,8 @@ class FlinkAssignContext<InputT, W extends BoundedWindow>
   }
 
   @Override
-  public Collection<? extends BoundedWindow> windows() {
-    return value.getWindows();
+  public BoundedWindow window() {
+    return Iterables.getOnlyElement(value.getWindows());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
index 892f7a1..d49821b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
@@ -65,7 +65,7 @@ class FlinkNoElementAssignContext<InputT, W extends BoundedWindow>
   }
 
   @Override
-  public Collection<? extends BoundedWindow> windows() {
-    throw new UnsupportedOperationException("No windows available.");
+  public BoundedWindow window() {
+    throw new UnsupportedOperationException("No window available.");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 3c37aa9..f68a519 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -245,9 +245,9 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
           }
 
           @Override
-          public Collection<? extends BoundedWindow> windows() {
+          public BoundedWindow window() {
             throw new UnsupportedOperationException(
-                "WindowFn attempted to access input windows when none were available");
+                "WindowFn attempted to access input window when none was available");
           }
         });
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
index 3e5a17d..207fb5a 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
@@ -508,7 +508,7 @@ public class GroupAlsoByWindowTest extends StreamingMultipleProgramsTestBase {
           }
 
           @Override
-          public Collection<? extends BoundedWindow> windows() {
+          public BoundedWindow window() {
             throw new UnsupportedOperationException(
                 "WindowFn attempted to access input windows when none were available");
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index a4130df..127721a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -115,8 +116,8 @@ public class WindowFnTestUtils {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return null;
+    public BoundedWindow window() {
+      return GlobalWindow.INSTANCE;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index 499ffeb..002bf2e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -53,11 +53,6 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> {
   }
 
   @Override
-  public boolean assignsToSingleWindow() {
-    return true;
-  }
-
-  @Override
   public Instant getOutputTime(Instant inputTimestamp, GlobalWindow window) {
     return inputTimestamp;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
index b0dd8b9..da2f38c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
@@ -51,11 +51,6 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow>
   }
 
   @Override
-  public boolean assignsToSingleWindow() {
-    return true;
-  }
-
-  @Override
   public Instant getOutputTime(Instant inputTimestamp, W window) {
     return inputTimestamp;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index 41833f8..d84866b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -65,10 +65,10 @@ public abstract class WindowFn<T, W extends BoundedWindow>
     public abstract Instant timestamp();
 
     /**
-     * Returns the windows the current element was in, prior to this
+     * Returns the window of the current element prior to this
      * {@code WindowFn} being called.
      */
-    public abstract Collection<? extends BoundedWindow> windows();
+    public abstract BoundedWindow window();
   }
 
   /**
@@ -161,13 +161,6 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   }
 
   /**
-   * Returns true if this {@code WindowFn} assigns each element to a single window.
-   */
-  public boolean assignsToSingleWindow() {
-    return false;
-  }
-
-  /**
    * {@inheritDoc}
    *
    * <p>By default, does not register any display data. Implementors may override this method

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
index 5a01c28..ab40678 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -62,8 +62,7 @@ public class GatherAllPanes<T>
         .apply(
             Window.into(
                     new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
-                        originalWindowFn.windowCoder(),
-                        input.getWindowingStrategy().getWindowFn().assignsToSingleWindow()))
+                        originalWindowFn.windowCoder()))
                 .triggering(Never.ever()))
         // all values have the same key so they all appear as a single output element
         .apply(GroupByKey.<Void, WindowedValue<T>>create())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
index 91e5609..a3477e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
 
 import java.util.Collection;
+import java.util.Collections;
 
 /**
  * A {@link WindowFn} that leaves all associations between elements and windows unchanged.
@@ -55,25 +56,21 @@ class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
    * these windows.
    */
   private final Coder<BoundedWindow> coder;
-  private final boolean assignsToSingleWindow;
 
-  public IdentityWindowFn(Coder<? extends BoundedWindow> coder, boolean assignsToSingleWindow) {
+  public IdentityWindowFn(Coder<? extends BoundedWindow> coder) {
     // Safe because it is only used privately here.
     // At every point where a window is returned or accepted, it has been provided
-    // by priorWindowFn, so it is of the expected type.
+    // by the prior WindowFn, so it is of the expected type.
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) coder;
     this.coder = windowCoder;
-    this.assignsToSingleWindow = assignsToSingleWindow;
   }
 
   @Override
   public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext c)
       throws Exception {
-    // The windows are provided by priorWindowFn, which also provides the coder for them
-    @SuppressWarnings("unchecked")
-    Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows();
-    return priorWindows;
+    // The window is provided by the prior WindowFn, which also provides the coder for them
+    return Collections.singleton(c.window());
   }
 
   @Override
@@ -88,17 +85,12 @@ class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
 
   @Override
   public Coder<BoundedWindow> windowCoder() {
-    // Safe because the previous WindowFn provides both the windows and the coder.
+    // Safe because the prior WindowFn provides both the windows and the coder.
     // The Coder is _not_ actually a coder for an arbitrary BoundedWindow.
     return coder;
   }
 
   @Override
-  public boolean assignsToSingleWindow() {
-    return assignsToSingleWindow;
-  }
-
-  @Override
   public BoundedWindow getSideInputWindow(BoundedWindow window) {
     throw new UnsupportedOperationException(
         String.format(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 5c91326..c0d159b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -58,8 +58,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
     Window.Bound<KV<K, V>> rewindow =
         Window.<KV<K, V>>into(
                 new IdentityWindowFn<>(
-                    originalStrategy.getWindowFn().windowCoder(),
-                    originalStrategy.getWindowFn().assignsToSingleWindow()))
+                    originalStrategy.getWindowFn().windowCoder()))
             .triggering(new ReshuffleTrigger<>())
             .discardingFiredPanes()
             .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
index db6f425..705003e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 
 import java.util.Collection;
+import java.util.Collections;
 
 /**
  * A {@link WindowFn} for use during tests that returns the input window for calls to
@@ -33,7 +34,7 @@ public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, Bound
   @Override
   public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext c)
       throws Exception {
-    return (Collection<BoundedWindow>) c.windows();
+    return Collections.singleton(c.window());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
index 84699d6..4750af1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
@@ -39,7 +40,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -87,8 +87,8 @@ public class MergingActiveWindowSetTest {
         }
 
         @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return ImmutableList.of();
+        public BoundedWindow window() {
+          return GlobalWindow.INSTANCE;
         }
       };
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
index a1e376e..c495712 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
@@ -245,7 +245,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
         InputT value = input.getValue();
         Instant timestamp = input.getTimestamp();
         Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>(
-            windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE)));
+            windowFn, value, timestamp, GlobalWindow.INSTANCE));
 
         for (W window : assignedWindows) {
           activeWindows.addActiveForTesting(window);
@@ -401,14 +401,14 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
       extends WindowFn<Object, W>.AssignContext {
     private Object element;
     private Instant timestamp;
-    private Collection<? extends BoundedWindow> windows;
+    private BoundedWindow window;
 
-    public TestAssignContext(WindowFn<Object, W> windowFn, Object element, Instant timestamp,
-        Collection<? extends BoundedWindow> windows) {
+    public TestAssignContext(
+        WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
       windowFn.super();
       this.element = element;
       this.timestamp = timestamp;
-      this.windows = windows;
+      this.window = window;
     }
 
     @Override
@@ -422,8 +422,8 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return windows;
+    public BoundedWindow window() {
+      return window;
     }
   }
 


[2/5] incubator-beam git commit: Add basic WindowMatchersTest

Posted by ke...@apache.org.
Add basic WindowMatchersTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/edf11fa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/edf11fa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/edf11fa7

Branch: refs/heads/master
Commit: edf11fa782e119989bbb5413287c34edcac8c3f5
Parents: d9bca25
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 21 20:13:35 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 21 20:58:31 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/WindowMatchersTest.java | 84 ++++++++++++++++++++
 1 file changed, 84 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edf11fa7/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
new file mode 100644
index 0000000..8b108cd
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk;
+
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link WindowMatchers}.
+ */
+@RunWith(JUnit4.class)
+public class WindowMatchersTest {
+
+  @Test
+  public void testIsWindowedValueExact() {
+    long timestamp = 100;
+    long windowStart = 0;
+    long windowEnd = 200;
+
+    assertThat(
+        WindowedValue.of(
+            "hello",
+            new Instant(timestamp),
+            new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)),
+            PaneInfo.NO_FIRING),
+        WindowMatchers.isWindowedValue(
+            "hello",
+            new Instant(timestamp),
+            ImmutableList.of(new IntervalWindow(new Instant(windowStart), new Instant(windowEnd))),
+            PaneInfo.NO_FIRING));
+  }
+
+  @Test
+  public void testIsWindowedValueReorderedWindows() {
+    long timestamp = 100;
+    long windowStart = 0;
+    long windowEnd = 200;
+    long windowStart2 = 50;
+    long windowEnd2 = 150;
+
+    assertThat(
+        WindowedValue.of(
+            "hello",
+            new Instant(timestamp),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)),
+                new IntervalWindow(new Instant(windowStart2), new Instant(windowEnd2))),
+            PaneInfo.NO_FIRING),
+        WindowMatchers.isWindowedValue(
+            "hello",
+            new Instant(timestamp),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)),
+                new IntervalWindow(new Instant(windowStart2), new Instant(windowEnd2))),
+            PaneInfo.NO_FIRING));
+  }
+}
+
+


[5/5] incubator-beam git commit: This closes #308

Posted by ke...@apache.org.
This closes #308


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/82ae661c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/82ae661c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/82ae661c

Branch: refs/heads/master
Commit: 82ae661c5b76b0d8e10caa5599d821f2a10a0064
Parents: f2d2ce5 0810441
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 23 10:01:14 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 23 10:01:14 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/AssignWindowsDoFn.java |  15 +-
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |   2 +-
 .../beam/sdk/util/ReduceFnRunnerTest.java       |   3 +-
 .../apache/beam/sdk/util/ReduceFnTester.java    |  46 ++---
 .../runners/direct/WindowEvaluatorFactory.java  |  18 +-
 .../direct/WindowEvaluatorFactoryTest.java      | 178 ++++++++++++-------
 .../FlinkStreamingTransformTranslators.java     |   5 +-
 .../functions/FlinkAssignContext.java           |  15 +-
 .../functions/FlinkNoElementAssignContext.java  |   4 +-
 .../streaming/FlinkAbstractParDoWrapper.java    |   4 +-
 .../flink/streaming/GroupAlsoByWindowTest.java  |   2 +-
 .../beam/sdk/testing/WindowFnTestUtils.java     |   5 +-
 .../sdk/transforms/windowing/GlobalWindows.java |   5 -
 .../beam/sdk/transforms/windowing/PaneInfo.java |   4 +
 .../windowing/PartitioningWindowFn.java         |   5 -
 .../beam/sdk/transforms/windowing/WindowFn.java |  11 +-
 .../apache/beam/sdk/util/GatherAllPanes.java    |   3 +-
 .../apache/beam/sdk/util/IdentityWindowFn.java  |  20 +--
 .../org/apache/beam/sdk/util/Reshuffle.java     |   3 +-
 .../org/apache/beam/sdk/WindowMatchers.java     |  80 ++++++++-
 .../org/apache/beam/sdk/WindowMatchersTest.java |  84 +++++++++
 .../sdk/util/IdentitySideInputWindowFn.java     |   3 +-
 .../sdk/util/MergingActiveWindowSetTest.java    |   6 +-
 .../org/apache/beam/sdk/util/TriggerTester.java |  14 +-
 24 files changed, 370 insertions(+), 165 deletions(-)
----------------------------------------------------------------------



[3/5] incubator-beam git commit: Simple PaneInfo.toString() for interned constants

Posted by ke...@apache.org.
Simple PaneInfo.toString() for interned constants


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9bca25a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9bca25a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9bca25a

Branch: refs/heads/master
Commit: d9bca25a0e7824819425b97200dcebb23f5f4e54
Parents: e255cd6
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 21 20:13:14 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 21 20:58:31 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java  | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9bca25a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index 266c4d3..fff5001 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -292,6 +292,10 @@ public final class PaneInfo {
 
   @Override
   public String toString() {
+    if (this == PaneInfo.NO_FIRING) {
+      return "PaneInfo.NO_FIRING";
+    }
+
     return MoreObjects.toStringHelper(getClass())
         .omitNullValues()
         .add("isFirst", isFirst ? true : null)