You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/19 22:04:37 UTC
[1/2] incubator-beam git commit: Add WindowedValue#explodeWindows
Repository: incubator-beam
Updated Branches:
refs/heads/master 135cb733f -> 0952f4433
Add WindowedValue#explodeWindows
This takes an existing WindowedValue and returns a Collection of
WindowedValues, each of which is in exactly one window.
Use the explode implementation on DoFnRunnerBase
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/98c9d99d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/98c9d99d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/98c9d99d
Branch: refs/heads/master
Commit: 98c9d99d27224012637e96839aee0721200dc351
Parents: 135cb73
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 18 16:55:57 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 19 12:45:17 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/DoFnRunnerBase.java | 5 +-
.../org/apache/beam/sdk/util/WindowedValue.java | 13 +++++
.../apache/beam/sdk/util/WindowedValueTest.java | 53 ++++++++++++++++++++
3 files changed, 68 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98c9d99d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index e9202a2..75861fe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -141,9 +141,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
} else {
// We could modify the windowed value (and the processContext) to
// avoid repeated allocations, but this is more straightforward.
- for (BoundedWindow window : elem.getWindows()) {
- invokeProcessElement(WindowedValue.of(
- elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+ for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
+ invokeProcessElement(windowedValue);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98c9d99d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index f6e82cf..1bbdbd9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -175,6 +176,18 @@ public abstract class WindowedValue<T> {
public abstract Collection<? extends BoundedWindow> getWindows();
/**
+ * Returns a collection of {@link WindowedValue WindowedValues} identical to this one, except each
+ * is in exactly one of the windows that this {@link WindowedValue} is in.
+ */
+ public Iterable<WindowedValue<T>> explodeWindows() {
+ ImmutableList.Builder<WindowedValue<T>> windowedValues = ImmutableList.builder();
+ for (BoundedWindow w : getWindows()) {
+ windowedValues.add(of(getValue(), getTimestamp(), w, getPane()));
+ }
+ return windowedValues.build();
+ }
+
+ /**
* Returns the pane of this {@code WindowedValue} in its window.
*/
public PaneInfo getPane() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98c9d99d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index c2c22c0..90969b7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -17,11 +17,21 @@
*/
package org.apache.beam.sdk.util;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import org.joda.time.Instant;
import org.junit.Assert;
@@ -55,4 +65,47 @@ public class WindowedValueTest {
Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp());
Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray());
}
+
+ @Test
+ public void testExplodeWindowsInNoWindowsEmptyIterable() {
+ WindowedValue<String> value =
+ WindowedValue.of(
+ "foo", Instant.now(), ImmutableList.<BoundedWindow>of(), PaneInfo.NO_FIRING);
+
+ assertThat(value.explodeWindows(), emptyIterable());
+ }
+
+ @Test
+ public void testExplodeWindowsInOneWindowEquals() {
+ Instant now = Instant.now();
+ BoundedWindow window = new IntervalWindow(now.minus(1000L), now.plus(1000L));
+ WindowedValue<String> value =
+ WindowedValue.of("foo", now, window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+ assertThat(Iterables.getOnlyElement(value.explodeWindows()), equalTo(value));
+ }
+
+ @Test
+ public void testExplodeWindowsManyWindowsMultipleWindowedValues() {
+ Instant now = Instant.now();
+ BoundedWindow centerWindow = new IntervalWindow(now.minus(1000L), now.plus(1000L));
+ BoundedWindow pastWindow = new IntervalWindow(now.minus(1500L), now.plus(500L));
+ BoundedWindow futureWindow = new IntervalWindow(now.minus(500L), now.plus(1500L));
+ BoundedWindow futureFutureWindow = new IntervalWindow(now, now.plus(2000L));
+ PaneInfo pane = PaneInfo.createPane(false, false, Timing.ON_TIME, 3L, 0L);
+ WindowedValue<String> value =
+ WindowedValue.of(
+ "foo",
+ now,
+ ImmutableList.of(pastWindow, centerWindow, futureWindow, futureFutureWindow),
+ pane);
+
+ assertThat(
+ value.explodeWindows(),
+ containsInAnyOrder(
+ WindowedValue.of("foo", now, futureFutureWindow, pane),
+ WindowedValue.of("foo", now, futureWindow, pane),
+ WindowedValue.of("foo", now, centerWindow, pane),
+ WindowedValue.of("foo", now, pastWindow, pane)));
+ }
}
[2/2] incubator-beam git commit: This closes #206
Posted by bc...@apache.org.
This closes #206
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0952f443
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0952f443
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0952f443
Branch: refs/heads/master
Commit: 0952f4433034b2c98dd2b5baf7276293db544d66
Parents: 135cb73 98c9d99
Author: bchambers <bc...@google.com>
Authored: Tue Apr 19 12:45:49 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 19 12:45:49 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/DoFnRunnerBase.java | 5 +-
.../org/apache/beam/sdk/util/WindowedValue.java | 13 +++++
.../apache/beam/sdk/util/WindowedValueTest.java | 53 ++++++++++++++++++++
3 files changed, 68 insertions(+), 3 deletions(-)
----------------------------------------------------------------------