You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/11 00:08:57 UTC

[1/6] incubator-beam git commit: Only remove window from active window set if it is still active

Repository: incubator-beam
Updated Branches:
  refs/heads/master b2b5f429f -> de91b8014


Only remove window from active window set if it is still active


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c415be87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c415be87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c415be87

Branch: refs/heads/master
Commit: c415be870d03fd9491982cc8e1100165e5c8323c
Parents: 0442a24
Author: Mark Shields <ma...@google.com>
Authored: Wed Mar 9 14:02:35 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Wed Mar 9 14:06:36 2016 -0800

----------------------------------------------------------------------
 .../dataflow/sdk/util/MergingActiveWindowSet.java      | 13 +++++--------
 .../google/cloud/dataflow/sdk/util/ReduceFnRunner.java |  9 ++++++---
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c415be87/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
index 95e378d..5af4ea5 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
@@ -72,9 +72,7 @@ import javax.annotation.Nullable;
  */
 public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
   private final WindowFn<Object, W> windowFn;
-
-  @Nullable
-  private Map<W, Set<W>> activeWindowToStateAddressWindows;
+  private final Map<W, Set<W>> activeWindowToStateAddressWindows;
 
   /**
    * As above, but only for EPHEMERAL windows. Does not need to be persisted.
@@ -94,16 +92,14 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
    * MERGED. Otherwise W1 is EPHEMERAL.
    * </ul>
    */
-  @Nullable
-  private Map<W, W> windowToActiveWindow;
+  private final Map<W, W> windowToActiveWindow;
 
   /**
    * Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
    *
    * <p>Used to avoid writing to state if no changes have been made during the work unit.
    */
-  @Nullable
-  private Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
+  private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
 
   /**
    * Handle representing our state in the backend.
@@ -195,6 +191,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
 
   @Override
   public void remove(W window) {
+    Preconditions.checkState(isActive(window), "Window %s is not active", window);
     for (W stateAddressWindow : activeWindowToStateAddressWindows.get(window)) {
       windowToActiveWindow.remove(stateAddressWindow);
     }
@@ -522,7 +519,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
   private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
     Map<W, Set<W>> newMultimap = new HashMap<>();
     for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
-      newMultimap.put(entry.getKey(), new LinkedHashSet<W>(entry.getValue()));
+      newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
     }
     return newMultimap;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c415be87/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 1a009bb..2b6e0d4 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -523,7 +523,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     // - The trigger may implement isClosed as constant false.
     // - If the window function does not support windowing then all windows will be considered
     // active.
-    // So we must combine the above.
+    // So we must take conjunction of activeWindows and triggerRunner state.
     boolean windowIsActive =
         activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
 
@@ -602,7 +602,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       boolean windowIsActive)
           throws Exception {
     if (windowIsActive) {
-      // Since window is still active the trigger has not closed.
+      // Since window was still active the trigger may not have closed.
       reduceFn.clearState(renamedContext);
       watermarkHold.clearHolds(renamedContext);
       nonEmptyPanes.clearPane(renamedContext.state());
@@ -622,7 +622,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       }
     }
     paneInfoTracker.clear(directContext.state());
-    activeWindows.remove(directContext.window());
+    if (activeWindows.isActive(directContext.window())) {
+      // Don't need to track address state windows anymore
+      activeWindows.remove(directContext.window());
+    }
     // We'll never need to test for the trigger being closed again.
     triggerRunner.clearFinished(directContext.state());
   }


[3/6] incubator-beam git commit: Whitespace

Posted by ke...@apache.org.
Whitespace


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bab23a96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bab23a96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bab23a96

Branch: refs/heads/master
Commit: bab23a96f72bbc61aefcb693dd2fcfffb76c372f
Parents: 045471c
Author: Mark Shields <ma...@google.com>
Authored: Thu Mar 10 08:49:47 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Thu Mar 10 08:49:47 2016 -0800

----------------------------------------------------------------------
 .../java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bab23a96/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index e1348f7..c60af85 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -723,7 +723,6 @@ public class ReduceFnRunnerTest {
         equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
   }
 
-
   /**
    * Tests that when data is assigned to multiple windows but some of those windows have
    * had their triggers finish, then the data is dropped and counted accurately.


[4/6] incubator-beam git commit: remove is idempotent

Posted by ke...@apache.org.
remove is idempotent


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34d0d446
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34d0d446
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34d0d446

Branch: refs/heads/master
Commit: 34d0d44680d2653d3fa2b81011061700c59509f5
Parents: bab23a9
Author: Mark Shields <ma...@google.com>
Authored: Thu Mar 10 14:11:20 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Thu Mar 10 14:11:20 2016 -0800

----------------------------------------------------------------------
 examples/pom.xml                                             | 2 +-
 pom.xml                                                      | 2 +-
 sdk/pom.xml                                                  | 2 +-
 .../cloud/dataflow/sdk/util/MergingActiveWindowSet.java      | 8 ++++++--
 4 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34d0d446/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 2218367..c15f73f 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>com.google.cloud.dataflow</groupId>
     <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
-    <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <groupId>com.google.cloud.dataflow</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34d0d446/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e5e078..6fb0b32 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
   <url>http://cloud.google.com/dataflow</url>
   <inceptionYear>2013</inceptionYear>
 
-  <version>1.5.0-SNAPSHOT</version>
+  <version>1.6.0-SNAPSHOT</version>
 
   <licenses>
     <license>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34d0d446/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/sdk/pom.xml b/sdk/pom.xml
index f782b78..d7e10a5 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>com.google.cloud.dataflow</groupId>
     <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
-    <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <groupId>com.google.cloud.dataflow</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34d0d446/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
index 5af4ea5..96629b1 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
@@ -191,8 +191,12 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
 
   @Override
   public void remove(W window) {
-    Preconditions.checkState(isActive(window), "Window %s is not active", window);
-    for (W stateAddressWindow : activeWindowToStateAddressWindows.get(window)) {
+    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+    if (stateAddressWindows == null) {
+      // Window is no longer active.
+      return;
+    }
+    for (W stateAddressWindow : stateAddressWindows) {
       windowToActiveWindow.remove(stateAddressWindow);
     }
     activeWindowToStateAddressWindows.remove(window);


[6/6] incubator-beam git commit: This closes #38

Posted by ke...@apache.org.
This closes #38


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de91b801
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de91b801
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de91b801

Branch: refs/heads/master
Commit: de91b801499c60bc781713de129b562163f856d6
Parents: b2b5f42 a1e3d86
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Mar 10 15:07:52 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Mar 10 15:07:52 2016 -0800

----------------------------------------------------------------------
 .../sdk/util/MergingActiveWindowSet.java        | 19 +++++-----
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 10 ++++--
 .../dataflow/sdk/util/ReduceFnRunnerTest.java   | 37 ++++++++++++++++++++
 3 files changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------



[2/6] incubator-beam git commit: Add unit test.

Posted by ke...@apache.org.
Add unit test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045471c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045471c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045471c1

Branch: refs/heads/master
Commit: 045471c1dc7ffeecc8ea8b6c0695498261aa631b
Parents: c415be8
Author: Mark Shields <ma...@google.com>
Authored: Wed Mar 9 16:50:16 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Wed Mar 9 16:50:16 2016 -0800

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java |  5 +--
 .../dataflow/sdk/util/ReduceFnRunnerTest.java   | 38 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2b6e0d4..2e2d1f6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -602,7 +602,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       boolean windowIsActive)
           throws Exception {
     if (windowIsActive) {
-      // Since window was still active the trigger may not have closed.
+      // Since both the window is in the active window set AND the trigger was not yet closed,
+      // it is possible we still have state.
       reduceFn.clearState(renamedContext);
       watermarkHold.clearHolds(renamedContext);
       nonEmptyPanes.clearPane(renamedContext.state());
@@ -623,7 +624,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     }
     paneInfoTracker.clear(directContext.state());
     if (activeWindows.isActive(directContext.window())) {
-      // Don't need to track address state windows anymore
+      // Don't need to track address state windows anymore.
       activeWindows.remove(directContext.window());
     }
     // We'll never need to test for the trigger being closed again.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index 4fb3e37..e1348f7 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -687,6 +687,44 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * It is possible for a session window's trigger to be closed at the point at which
+   * the (merged) session window is garbage collected. Make sure we don't accidentally
+   * assume the window is still active.
+   */
+  @Test
+  public void testMergingWithCloseBeforeGC() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // Two elements in two overlapping session windows.
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)), // in [1, 11)
+        TimestampedValue.of(10, new Instant(10))); // in [10, 20)
+
+    // Close the trigger, but the gargbage collection timer is still pending.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.advanceInputWatermark(new Instant(30));
+
+    // Now the garbage collection timer will fire, finding the trigger already closed.
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(1));
+    assertThat(output.get(0),
+        isSingleWindowedValue(containsInAnyOrder(1, 10),
+            1, // timestamp
+            1, // window start
+            20)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+  }
+
+
+  /**
    * Tests that when data is assigned to multiple windows but some of those windows have
    * had their triggers finish, then the data is dropped and counted accurately.
    */


[5/6] incubator-beam git commit: Undo

Posted by ke...@apache.org.
Undo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1e3d86d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1e3d86d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1e3d86d

Branch: refs/heads/master
Commit: a1e3d86d942617efd03d63ff2375d4a4f0f1578d
Parents: 34d0d44
Author: Mark Shields <ma...@google.com>
Authored: Thu Mar 10 14:16:55 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Thu Mar 10 14:16:55 2016 -0800

----------------------------------------------------------------------
 examples/pom.xml | 2 +-
 pom.xml          | 2 +-
 sdk/pom.xml      | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1e3d86d/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index c15f73f..2218367 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>com.google.cloud.dataflow</groupId>
     <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
-    <version>1.6.0-SNAPSHOT</version>
+    <version>1.5.0-SNAPSHOT</version>
   </parent>
 
   <groupId>com.google.cloud.dataflow</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1e3d86d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6fb0b32..7e5e078 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
   <url>http://cloud.google.com/dataflow</url>
   <inceptionYear>2013</inceptionYear>
 
-  <version>1.6.0-SNAPSHOT</version>
+  <version>1.5.0-SNAPSHOT</version>
 
   <licenses>
     <license>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1e3d86d/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/sdk/pom.xml b/sdk/pom.xml
index d7e10a5..f782b78 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>com.google.cloud.dataflow</groupId>
     <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
-    <version>1.6.0-SNAPSHOT</version>
+    <version>1.5.0-SNAPSHOT</version>
   </parent>
 
   <groupId>com.google.cloud.dataflow</groupId>