You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:04 UTC

[02/50] beam git commit: Add WindowFn#assignsToOneWindow

Add WindowFn#assignsToOneWindow


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7fee4b93
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7fee4b93
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7fee4b93

Branch: refs/heads/gearpump-runner
Commit: 7fee4b93d5b548d390ab2511a91880b4c5e57a26
Parents: 2365e71
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 27 14:23:22 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jun 27 15:03:58 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/testing/StaticWindows.java  |  5 ++++
 .../sdk/transforms/windowing/GlobalWindows.java |  5 ++++
 .../windowing/PartitioningWindowFn.java         |  5 ++++
 .../transforms/windowing/SlidingWindows.java    |  5 ++++
 .../beam/sdk/transforms/windowing/WindowFn.java | 11 +++++++
 .../apache/beam/sdk/util/IdentityWindowFn.java  |  5 ++++
 .../windowing/SlidingWindowsTest.java           | 30 ++++++++++++++++----
 7 files changed, 61 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
index c11057a..eba6978 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
@@ -126,4 +126,9 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> {
       }
     };
   }
+
+  @Override
+  public boolean assignsToOneWindow() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index d48d26b..c68c497 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -79,6 +79,11 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> {
   }
 
   @Override
+  public boolean assignsToOneWindow() {
+    return true;
+  }
+
+  @Override
   public boolean equals(Object other) {
     return other instanceof GlobalWindows;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
index 40ee68a..341ba27 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
@@ -58,4 +58,9 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow>
   public Instant getOutputTime(Instant inputTimestamp, W window) {
     return inputTimestamp;
   }
+
+  @Override
+  public final boolean assignsToOneWindow() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
index f657884..150b956 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
@@ -148,6 +148,11 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> {
   }
 
   @Override
+  public boolean assignsToOneWindow() {
+    return !this.period.isShorterThan(this.size);
+  }
+
+  @Override
   public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
     if (!this.isCompatible(other)) {
       throw new IncompatibleWindowException(

http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index 001d630..ffe85f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -180,6 +180,17 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   }
 
   /**
+   * Returns true if this {@link WindowFn} always assigns an element to exactly one window.
+   *
+   * <p>If this varies per-element, or cannot be determined, conservatively return false.
+   *
+   * <p>By default, returns false.
+   */
+  public boolean assignsToOneWindow() {
+    return false;
+  }
+
+  /**
    * Returns a {@link TypeDescriptor} capturing what is known statically about the window type of
    * this {@link WindowFn} instance's most-derived class.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
index a4bfdda..ef6d833 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -116,4 +116,9 @@ public class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
   public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) {
     return inputTimestamp;
   }
+
+  @Override
+  public boolean assignsToOneWindow() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7fee4b93/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
index b14e221..bfd01f0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn;
 import static org.apache.beam.sdk.testing.WindowFnTestUtils.set;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -55,11 +56,12 @@ public class SlidingWindowsTest {
     expected.put(new IntervalWindow(new Instant(0), new Instant(10)), set(1, 2, 5, 9));
     expected.put(new IntervalWindow(new Instant(5), new Instant(15)), set(5, 9, 10, 11));
     expected.put(new IntervalWindow(new Instant(10), new Instant(20)), set(10, 11));
+    SlidingWindows windowFn = SlidingWindows.of(new Duration(10)).every(new Duration(5));
     assertEquals(
         expected,
-        runWindowFn(
-            SlidingWindows.of(new Duration(10)).every(new Duration(5)),
+        runWindowFn(windowFn,
             Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L)));
+    assertThat(windowFn.assignsToOneWindow(), is(false));
   }
 
   @Test
@@ -69,11 +71,27 @@ public class SlidingWindowsTest {
     expected.put(new IntervalWindow(new Instant(0), new Instant(7)), set(1, 2, 5));
     expected.put(new IntervalWindow(new Instant(5), new Instant(12)), set(5, 9, 10, 11));
     expected.put(new IntervalWindow(new Instant(10), new Instant(17)), set(10, 11));
+    SlidingWindows windowFn = SlidingWindows.of(new Duration(7)).every(new Duration(5));
     assertEquals(
         expected,
-        runWindowFn(
-            SlidingWindows.of(new Duration(7)).every(new Duration(5)),
+        runWindowFn(windowFn,
             Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L)));
+    assertThat(windowFn.assignsToOneWindow(), is(false));
+  }
+
+  @Test
+  public void testEqualSize() throws Exception {
+    Map<IntervalWindow, Set<String>> expected = new HashMap<>();
+    expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2));
+    expected.put(new IntervalWindow(new Instant(3), new Instant(6)), set(3, 4, 5));
+    expected.put(new IntervalWindow(new Instant(6), new Instant(9)), set(6, 7));
+    SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(3));
+    assertEquals(
+        expected,
+        runWindowFn(
+            windowFn,
+            Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L)));
+    assertThat(windowFn.assignsToOneWindow(), is(true));
   }
 
   @Test
@@ -82,12 +100,14 @@ public class SlidingWindowsTest {
     expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2));
     expected.put(new IntervalWindow(new Instant(10), new Instant(13)), set(10, 11));
     expected.put(new IntervalWindow(new Instant(100), new Instant(103)), set(100));
+    SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(10));
     assertEquals(
         expected,
         runWindowFn(
             // Only look at the first 3 millisecs of every 10-millisec interval.
-            SlidingWindows.of(new Duration(3)).every(new Duration(10)),
+            windowFn,
             Arrays.asList(1L, 2L, 3L, 5L, 9L, 10L, 11L, 100L)));
+    assertThat(windowFn.assignsToOneWindow(), is(true));
   }
 
   @Test