You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/11 00:08:57 UTC
[1/6] incubator-beam git commit: Only remove window from active
window set if it is still active
Repository: incubator-beam
Updated Branches:
refs/heads/master b2b5f429f -> de91b8014
Only remove window from active window set if it is still active
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c415be87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c415be87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c415be87
Branch: refs/heads/master
Commit: c415be870d03fd9491982cc8e1100165e5c8323c
Parents: 0442a24
Author: Mark Shields <ma...@google.com>
Authored: Wed Mar 9 14:02:35 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Wed Mar 9 14:06:36 2016 -0800
----------------------------------------------------------------------
.../dataflow/sdk/util/MergingActiveWindowSet.java | 13 +++++--------
.../google/cloud/dataflow/sdk/util/ReduceFnRunner.java | 9 ++++++---
2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c415be87/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
index 95e378d..5af4ea5 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
@@ -72,9 +72,7 @@ import javax.annotation.Nullable;
*/
public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
private final WindowFn<Object, W> windowFn;
-
- @Nullable
- private Map<W, Set<W>> activeWindowToStateAddressWindows;
+ private final Map<W, Set<W>> activeWindowToStateAddressWindows;
/**
* As above, but only for EPHEMERAL windows. Does not need to be persisted.
@@ -94,16 +92,14 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
* MERGED. Otherwise W1 is EPHEMERAL.
* </ul>
*/
- @Nullable
- private Map<W, W> windowToActiveWindow;
+ private final Map<W, W> windowToActiveWindow;
/**
* Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
*
* <p>Used to avoid writing to state if no changes have been made during the work unit.
*/
- @Nullable
- private Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
+ private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
/**
* Handle representing our state in the backend.
@@ -195,6 +191,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
@Override
public void remove(W window) {
+ Preconditions.checkState(isActive(window), "Window %s is not active", window);
for (W stateAddressWindow : activeWindowToStateAddressWindows.get(window)) {
windowToActiveWindow.remove(stateAddressWindow);
}
@@ -522,7 +519,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
Map<W, Set<W>> newMultimap = new HashMap<>();
for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
- newMultimap.put(entry.getKey(), new LinkedHashSet<W>(entry.getValue()));
+ newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
}
return newMultimap;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c415be87/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 1a009bb..2b6e0d4 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -523,7 +523,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// - The trigger may implement isClosed as constant false.
// - If the window function does not support windowing then all windows will be considered
// active.
- // So we must combine the above.
+ // So we must take conjunction of activeWindows and triggerRunner state.
boolean windowIsActive =
activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
@@ -602,7 +602,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
boolean windowIsActive)
throws Exception {
if (windowIsActive) {
- // Since window is still active the trigger has not closed.
+ // Since window was still active the trigger may not have closed.
reduceFn.clearState(renamedContext);
watermarkHold.clearHolds(renamedContext);
nonEmptyPanes.clearPane(renamedContext.state());
@@ -622,7 +622,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
}
paneInfoTracker.clear(directContext.state());
- activeWindows.remove(directContext.window());
+ if (activeWindows.isActive(directContext.window())) {
+ // Don't need to track address state windows anymore
+ activeWindows.remove(directContext.window());
+ }
// We'll never need to test for the trigger being closed again.
triggerRunner.clearFinished(directContext.state());
}
[3/6] incubator-beam git commit: Whitespace
Posted by ke...@apache.org.
Whitespace
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bab23a96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bab23a96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bab23a96
Branch: refs/heads/master
Commit: bab23a96f72bbc61aefcb693dd2fcfffb76c372f
Parents: 045471c
Author: Mark Shields <ma...@google.com>
Authored: Thu Mar 10 08:49:47 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Thu Mar 10 08:49:47 2016 -0800
----------------------------------------------------------------------
.../java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bab23a96/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index e1348f7..c60af85 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -723,7 +723,6 @@ public class ReduceFnRunnerTest {
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}
-
/**
* Tests that when data is assigned to multiple windows but some of those windows have
* had their triggers finish, then the data is dropped and counted accurately.
[4/6] incubator-beam git commit: remove is idempotent
Posted by ke...@apache.org.
remove is idempotent
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34d0d446
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34d0d446
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34d0d446
Branch: refs/heads/master
Commit: 34d0d44680d2653d3fa2b81011061700c59509f5
Parents: bab23a9
Author: Mark Shields <ma...@google.com>
Authored: Thu Mar 10 14:11:20 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Thu Mar 10 14:11:20 2016 -0800
----------------------------------------------------------------------
examples/pom.xml | 2 +-
pom.xml | 2 +-
sdk/pom.xml | 2 +-
.../cloud/dataflow/sdk/util/MergingActiveWindowSet.java | 8 ++++++--
4 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34d0d446/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 2218367..c15f73f 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<groupId>com.google.cloud.dataflow</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34d0d446/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e5e078..6fb0b32 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
<url>http://cloud.google.com/dataflow</url>
<inceptionYear>2013</inceptionYear>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<licenses>
<license>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34d0d446/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/sdk/pom.xml b/sdk/pom.xml
index f782b78..d7e10a5 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<groupId>com.google.cloud.dataflow</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34d0d446/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
index 5af4ea5..96629b1 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java
@@ -191,8 +191,12 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
@Override
public void remove(W window) {
- Preconditions.checkState(isActive(window), "Window %s is not active", window);
- for (W stateAddressWindow : activeWindowToStateAddressWindows.get(window)) {
+ Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+ if (stateAddressWindows == null) {
+ // Window is no longer active.
+ return;
+ }
+ for (W stateAddressWindow : stateAddressWindows) {
windowToActiveWindow.remove(stateAddressWindow);
}
activeWindowToStateAddressWindows.remove(window);
[6/6] incubator-beam git commit: This closes #38
Posted by ke...@apache.org.
This closes #38
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de91b801
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de91b801
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de91b801
Branch: refs/heads/master
Commit: de91b801499c60bc781713de129b562163f856d6
Parents: b2b5f42 a1e3d86
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Mar 10 15:07:52 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Mar 10 15:07:52 2016 -0800
----------------------------------------------------------------------
.../sdk/util/MergingActiveWindowSet.java | 19 +++++-----
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 10 ++++--
.../dataflow/sdk/util/ReduceFnRunnerTest.java | 37 ++++++++++++++++++++
3 files changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
[2/6] incubator-beam git commit: Add unit test.
Posted by ke...@apache.org.
Add unit test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045471c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045471c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045471c1
Branch: refs/heads/master
Commit: 045471c1dc7ffeecc8ea8b6c0695498261aa631b
Parents: c415be8
Author: Mark Shields <ma...@google.com>
Authored: Wed Mar 9 16:50:16 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Wed Mar 9 16:50:16 2016 -0800
----------------------------------------------------------------------
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 5 +--
.../dataflow/sdk/util/ReduceFnRunnerTest.java | 38 ++++++++++++++++++++
2 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2b6e0d4..2e2d1f6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -602,7 +602,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
boolean windowIsActive)
throws Exception {
if (windowIsActive) {
- // Since window was still active the trigger may not have closed.
+ // Since both the window is in the active window set AND the trigger was not yet closed,
+ // it is possible we still have state.
reduceFn.clearState(renamedContext);
watermarkHold.clearHolds(renamedContext);
nonEmptyPanes.clearPane(renamedContext.state());
@@ -623,7 +624,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
paneInfoTracker.clear(directContext.state());
if (activeWindows.isActive(directContext.window())) {
- // Don't need to track address state windows anymore
+ // Don't need to track address state windows anymore.
activeWindows.remove(directContext.window());
}
// We'll never need to test for the trigger being closed again.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index 4fb3e37..e1348f7 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -687,6 +687,44 @@ public class ReduceFnRunnerTest {
}
/**
+ * It is possible for a session window's trigger to be closed at the point at which
+ * the (merged) session window is garbage collected. Make sure we don't accidentally
+ * assume the window is still active.
+ */
+ @Test
+ public void testMergingWithCloseBeforeGC() throws Exception {
+ ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+ ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+ // Two elements in two overlapping session windows.
+ tester.injectElements(
+ TimestampedValue.of(1, new Instant(1)), // in [1, 11)
+ TimestampedValue.of(10, new Instant(10))); // in [10, 20)
+
+ // Close the trigger, but the gargbage collection timer is still pending.
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTrigger);
+ tester.advanceInputWatermark(new Instant(30));
+
+ // Now the garbage collection timer will fire, finding the trigger already closed.
+ tester.advanceInputWatermark(new Instant(100));
+
+ List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+ assertThat(output.size(), equalTo(1));
+ assertThat(output.get(0),
+ isSingleWindowedValue(containsInAnyOrder(1, 10),
+ 1, // timestamp
+ 1, // window start
+ 20)); // window end
+ assertThat(
+ output.get(0).getPane(),
+ equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+ }
+
+
+ /**
* Tests that when data is assigned to multiple windows but some of those windows have
* had their triggers finish, then the data is dropped and counted accurately.
*/
[5/6] incubator-beam git commit: Undo
Posted by ke...@apache.org.
Undo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1e3d86d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1e3d86d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1e3d86d
Branch: refs/heads/master
Commit: a1e3d86d942617efd03d63ff2375d4a4f0f1578d
Parents: 34d0d44
Author: Mark Shields <ma...@google.com>
Authored: Thu Mar 10 14:16:55 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Thu Mar 10 14:16:55 2016 -0800
----------------------------------------------------------------------
examples/pom.xml | 2 +-
pom.xml | 2 +-
sdk/pom.xml | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1e3d86d/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index c15f73f..2218367 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
- <version>1.6.0-SNAPSHOT</version>
+ <version>1.5.0-SNAPSHOT</version>
</parent>
<groupId>com.google.cloud.dataflow</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1e3d86d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6fb0b32..7e5e078 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
<url>http://cloud.google.com/dataflow</url>
<inceptionYear>2013</inceptionYear>
- <version>1.6.0-SNAPSHOT</version>
+ <version>1.5.0-SNAPSHOT</version>
<licenses>
<license>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1e3d86d/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/sdk/pom.xml b/sdk/pom.xml
index d7e10a5..f782b78 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
- <version>1.6.0-SNAPSHOT</version>
+ <version>1.5.0-SNAPSHOT</version>
</parent>
<groupId>com.google.cloud.dataflow</groupId>