You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/09 17:12:04 UTC
[1/2] incubator-beam git commit: Make WindowingStrategy combine
WindowFn with OutputTimeFn
Repository: incubator-beam
Updated Branches:
refs/heads/master 199ec2e10 -> 7725a47a7
Make WindowingStrategy combine WindowFn with OutputTimeFn
Previously:
- Any user-specified OutputTimeFn overrode the WindowFn#getOutputTime
- WindowFn#getOutputTimeFn provided a default OutputTimeFn
- The default varied from "earliest" to "end of window"
Now:
- The user-specified OutputTimeFn is used to combine the WindowFn's
assigned output timestamps.
- The WindowFn does not provide the default.
- The default is always to output at end of window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3755c557
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3755c557
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3755c557
Branch: refs/heads/master
Commit: 3755c5579de3c46202a32b3ac2774ab440cf42f3
Parents: e63311f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 5 19:33:16 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 9 10:10:57 2016 -0700
----------------------------------------------------------------------
.../FlinkGroupAlsoByWindowWrapper.java | 2 +-
.../flink/streaming/GroupAlsoByWindowTest.java | 7 +-
.../beam/sdk/testing/WindowFnTestUtils.java | 4 +-
.../sdk/transforms/windowing/OutputTimeFn.java | 1 -
.../beam/sdk/transforms/windowing/Sessions.java | 8 --
.../transforms/windowing/SlidingWindows.java | 23 ++----
.../beam/sdk/transforms/windowing/WindowFn.java | 82 ++++----------------
.../apache/beam/sdk/util/WindowingStrategy.java | 55 ++++++++++++-
.../sdk/transforms/join/CoGroupByKeyTest.java | 7 +-
.../sdk/transforms/windowing/WindowTest.java | 31 +++++---
.../sdk/transforms/windowing/WindowingTest.java | 9 ++-
.../sdk/util/GroupAlsoByWindowsProperties.java | 65 +++-------------
...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 8 +-
.../beam/sdk/util/ReduceFnRunnerTest.java | 6 ++
.../apache/beam/sdk/util/ReduceFnTester.java | 9 ++-
.../CopyOnAccessInMemoryStateInternalsTest.java | 6 +-
16 files changed, 136 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 0306aa1..9d2cad8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -412,7 +412,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
if (stateInternals == null) {
Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
- OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn();
stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
perKeyStateInternals.put(key, stateInternals);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
index f3ceba7..c76af65 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -58,16 +59,19 @@ public class GroupAlsoByWindowTest {
private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy =
WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
private final WindowingStrategy sessionWindowingStrategy =
WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
.withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withAllowedLateness(Duration.standardSeconds(100));
private final WindowingStrategy fixedWindowingStrategy =
- WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
+ WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
@@ -94,6 +98,7 @@ public class GroupAlsoByWindowTest {
public void testWithLateness() throws Exception {
WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withAllowedLateness(Duration.millis(1000));
long initialTime = 0L;
Pipeline pipeline = FlinkTestPipeline.createForStreaming();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index 2566a12..a4130df 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -201,7 +201,7 @@ public class WindowFnTestUtils {
Instant instant = new Instant(timestamp);
for (W window : windows) {
- Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window);
+ Instant outputTimestamp = windowFn.getOutputTime(instant, window);
assertFalse("getOutputTime must be greater than or equal to input timestamp",
outputTimestamp.isBefore(instant));
assertFalse("getOutputTime must be less than or equal to the max timestamp",
@@ -232,7 +232,7 @@ public class WindowFnTestUtils {
Instant instant = new Instant(timestamp);
Instant endOfPrevious = null;
for (W window : sortedWindows) {
- Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window);
+ Instant outputTimestamp = windowFn.getOutputTime(instant, window);
if (endOfPrevious == null) {
// If this is the first window, the output timestamp can be anything, as long as it is in
// the valid range.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
index 3deea56..7cf870a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java
@@ -61,7 +61,6 @@ public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializa
* Returns the output timestamp to use for data depending on the given
* {@code inputTimestamp} in the specified {@code window}.
*
- *
* <p>The result of this method must be between {@code inputTimestamp} and
* {@code window.maxTimestamp()} (inclusive on both sides).
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
index 8e8a005..788566e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.transforms.windowing;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -88,12 +86,6 @@ public class Sessions extends WindowFn<Object, IntervalWindow> {
throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
}
- @Experimental(Kind.OUTPUT_TIME)
- @Override
- public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() {
- return OutputTimeFns.outputAtEarliestInputTimestamp();
- }
-
public Duration getGapDuration() {
return gapDuration;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
index 4153e21..62c2738 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
@@ -185,26 +185,17 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> {
/**
* Ensures that later sliding windows have an output time that is past the end of earlier windows.
*
- * <p>If this is the earliest sliding window containing {@code inputTimestamp}, that's fine.
+ * <p>
+ * If this is the earliest sliding window containing {@code inputTimestamp}, that's fine.
* Otherwise, we pick the earliest time that doesn't overlap with earlier windows.
*/
@Experimental(Kind.OUTPUT_TIME)
@Override
- public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() {
- return new OutputTimeFn.Defaults<BoundedWindow>() {
- @Override
- public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
- Instant startOfLastSegment = window.maxTimestamp().minus(period);
- return startOfLastSegment.isBefore(inputTimestamp)
- ? inputTimestamp
- : startOfLastSegment.plus(1);
- }
-
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return true;
- }
- };
+ public Instant getOutputTime(Instant inputTimestamp, IntervalWindow window) {
+ Instant startOfLastSegment = window.maxTimestamp().minus(period);
+ return startOfLastSegment.isBefore(inputTimestamp)
+ ? inputTimestamp
+ : startOfLastSegment.plus(1);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index e291bee..41833f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -22,9 +22,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.util.WindowingStrategy;
-
-import com.google.common.collect.Ordering;
import org.joda.time.Instant;
@@ -136,34 +133,24 @@ public abstract class WindowFn<T, W extends BoundedWindow>
public abstract W getSideInputWindow(final BoundedWindow window);
/**
- * @deprecated Implement {@link #getOutputTimeFn} to return one of the appropriate
- * {@link OutputTimeFns}, or a custom {@link OutputTimeFn} extending
- * {@link OutputTimeFn.Defaults}.
- */
- @Deprecated
- @Experimental(Kind.OUTPUT_TIME)
- public Instant getOutputTime(Instant inputTimestamp, W window) {
- return getOutputTimeFn().assignOutputTime(inputTimestamp, window);
- }
-
- /**
- * Provides a default implementation for {@link WindowingStrategy#getOutputTimeFn()}.
- * See the full specification there.
+ * Returns the output timestamp to use for data depending on the given
+ * {@code inputTimestamp} in the specified {@code window}.
*
- * <p>If this {@link WindowFn} doesn't produce overlapping windows, this need not (and probably
- * should not) override any of the default implementations in {@link OutputTimeFn.Defaults}.
+ * <p>The result of this method must be between {@code inputTimestamp} and
+ * {@code window.maxTimestamp()} (inclusive on both sides).
*
- * <p>If this {@link WindowFn} does produce overlapping windows that can be predicted here, it is
- * suggested that the result in later overlapping windows is past the end of earlier windows so
- * that the later windows don't prevent the watermark from progressing past the end of the earlier
- * window.
+ * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B},
+ * then {@code getOutputTime(A, window) <= getOutputTime(B, window)}.
*
- * <p>For example, a timestamp in a sliding window should be moved past the beginning of the next
- * sliding window. See {@link SlidingWindows#getOutputTimeFn}.
+ * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically
+ * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is
+ * suggested that the result in later overlapping windows is past the end of earlier windows
+ * so that the later windows don't prevent the watermark from
+ * progressing past the end of the earlier window.
*/
@Experimental(Kind.OUTPUT_TIME)
- public OutputTimeFn<? super W> getOutputTimeFn() {
- return new OutputAtEarliestAssignedTimestamp<>(this);
+ public Instant getOutputTime(Instant inputTimestamp, W window) {
+ return inputTimestamp;
}
/**
@@ -189,47 +176,4 @@ public abstract class WindowFn<T, W extends BoundedWindow>
@Override
public void populateDisplayData(DisplayData.Builder builder) {
}
-
- /**
- * A compatibility adapter that will return the assigned timestamps according to the
- * {@link WindowFn}, which was the prior policy. Specifying the assigned output timestamps
- * on the {@link WindowFn} is now deprecated.
- */
- private static class OutputAtEarliestAssignedTimestamp<W extends BoundedWindow>
- extends OutputTimeFn.Defaults<W> {
-
- private final WindowFn<?, W> windowFn;
-
- public OutputAtEarliestAssignedTimestamp(WindowFn<?, W> windowFn) {
- this.windowFn = windowFn;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the result of {@link WindowFn#getOutputTime windowFn.getOutputTime()}.
- */
- @Override
- @SuppressWarnings("deprecation") // this is an adapter for the deprecated behavior
- public Instant assignOutputTime(Instant timestamp, W window) {
- return windowFn.getOutputTime(timestamp, window);
- }
-
- @Override
- public Instant combine(Instant outputTime, Instant otherOutputTime) {
- return Ordering.natural().min(outputTime, otherOutputTime);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true}. When the {@link OutputTimeFn} is not overridden by {@link WindowFn}
- * or {@link WindowingStrategy}, the minimum output timestamp is taken, which depends
- * only on the minimum input timestamp by monotonicity of {@link #assignOutputTime}.
- */
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return true;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index a82f2b3..d98793f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -30,8 +31,10 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import com.google.common.base.MoreObjects;
import org.joda.time.Duration;
+import org.joda.time.Instant;
import java.io.Serializable;
+import java.util.Collections;
import java.util.Objects;
/**
@@ -99,7 +102,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
ExecutableTrigger.create(DefaultTrigger.<W>of()), false,
AccumulationMode.DISCARDING_FIRED_PANES, false,
DEFAULT_ALLOWED_LATENESS, false,
- windowFn.getOutputTimeFn(), false,
+ OutputTimeFns.outputAtEndOfWindow(), false,
ClosingBehavior.FIRE_IF_NON_EMPTY);
}
@@ -182,7 +185,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
// The onus of type correctness falls on the callee.
@SuppressWarnings("unchecked")
OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>)
- (outputTimeFnSpecified ? outputTimeFn : typedWindowFn.getOutputTimeFn());
+ new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn);
return new WindowingStrategy<T, W>(
typedWindowFn,
@@ -223,12 +226,15 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
@SuppressWarnings("unchecked")
OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) outputTimeFn;
+ OutputTimeFn<? super W> newOutputTimeFn =
+ new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn);
+
return new WindowingStrategy<T, W>(
windowFn,
trigger, triggerSpecified,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
- typedOutputTimeFn, true,
+ newOutputTimeFn, true,
closingBehavior);
}
@@ -265,4 +271,47 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
return Objects.hash(triggerSpecified, allowedLatenessSpecified, modeSpecified,
windowFn, trigger, mode, allowedLateness, closingBehavior);
}
+
+ /**
+ * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to assign initial timestamps
+ * but then combines and merges according to a given {@link OutputTimeFn}.
+ *
+ * <ul>
+ * <li>The {@link WindowFn#getOutputTime} allows adjustments such as that whereby
+ * {@link SlidingWindows#getOutputTime} moves elements later in time to avoid holding up
+ * progress downstream.</li>
+ * <li>Then, when multiple elements are buffered for output, the output timestamp of the
+ * result is calculated using {@link OutputTimeFn#combine}.</li>
+ * <li>In the case of a merging {@link WindowFn}, the output timestamp when windows merge
+ * is calculated using {@link OutputTimeFn#merge}.</li>
+ * </ul>
+ */
+ private static class CombineWindowFnOutputTimes<W extends BoundedWindow>
+ extends OutputTimeFn.Defaults<W> {
+
+ private final OutputTimeFn<? super W> outputTimeFn;
+ private final WindowFn<?, W> windowFn;
+
+ public CombineWindowFnOutputTimes(
+ OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) {
+ this.outputTimeFn = outputTimeFn;
+ this.windowFn = windowFn;
+ }
+
+ @Override
+ public Instant assignOutputTime(Instant inputTimestamp, W window) {
+ return outputTimeFn.merge(
+ window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, window)));
+ }
+
+ @Override
+ public Instant combine(Instant timestamp, Instant otherTimestamp) {
+ return outputTimeFn.combine(timestamp, otherTimestamp);
+ }
+
+ @Override
+ public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) {
+ return outputTimeFn.merge(newWindow, timestamps);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 609f454..f4ce2ca 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -239,7 +240,8 @@ public class CoGroupByKeyTest implements Serializable {
idToClick,
Arrays.asList(0L, 2L, 4L, 6L, 8L))
.apply("WindowClicks", Window.<KV<Integer, String>>into(
- FixedWindows.of(new Duration(4))));
+ FixedWindows.of(new Duration(4)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
PCollection<KV<Integer, String>> purchasesTable =
createInput("CreatePurchases",
@@ -247,7 +249,8 @@ public class CoGroupByKeyTest implements Serializable {
idToPurchases,
Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
.apply("WindowPurchases", Window.<KV<Integer, String>>into(
- FixedWindows.of(new Duration(4))));
+ FixedWindows.of(new Duration(4)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
KeyedPCollectionTuple.of(clicksTag, clicksTable)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 91bd846..8ad590d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -183,25 +183,34 @@ public class WindowTest implements Serializable {
/**
* Tests that when two elements are combined via a GroupByKey their output timestamp agrees
- * with the windowing function default, the earlier of the two values.
+ * with the windowing function default, the end of the window.
*/
@Test
@Category(RunnableOnService.class)
public void testOutputTimeFnDefault() {
Pipeline pipeline = TestPipeline.create();
- pipeline.apply(
- Create.timestamped(
- TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
- TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
+ pipeline
+ .apply(
+ Create.timestamped(
+ TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
+ TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))))
.apply(GroupByKey.<Integer, String>create())
- .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- assertThat(c.timestamp(), equalTo(new Instant(0)));
- }
- }));
+ .apply(
+ ParDo.of(
+ new DoFn<KV<Integer, Iterable<String>>, Void>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ assertThat(
+ c.timestamp(),
+ equalTo(
+ new IntervalWindow(
+ new Instant(0),
+ new Instant(0).plus(Duration.standardMinutes(10)))
+ .maxTimestamp()));
+ }
+ }));
pipeline.run();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 5cbf044..65adac1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -72,11 +72,12 @@ public class WindowingTest implements Serializable {
}
@Override
public PCollection<String> apply(PCollection<String> in) {
- return in
- .apply(Window.named("Window").<String>into(windowFn))
+ return in.apply(
+ Window.named("Window")
+ .<String>into(windowFn)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
.apply(Count.<String>perElement())
- .apply(ParDo
- .named("FormatCounts").of(new FormatCountsDoFn()))
+ .apply(ParDo.named("FormatCounts").of(new FormatCountsDoFn()))
.setCoder(StringUtf8Coder.of());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
index d5aa0da..4518f9f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -123,12 +122,12 @@ public class GroupAlsoByWindowsProperties {
WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0);
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
- assertThat(item0.getTimestamp(), equalTo(new Instant(1)));
+ assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
assertThat(item0.getWindows(), contains(window(0, 10)));
WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1);
assertThat(item1.getValue().getValue(), contains("v3"));
- assertThat(item1.getTimestamp(), equalTo(new Instant(13)));
+ assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
assertThat(item1.getWindows(),
contains(window(10, 20)));
}
@@ -139,12 +138,13 @@ public class GroupAlsoByWindowsProperties {
*
* <p>In the input here, each element occurs in multiple windows.
*/
- public static void groupsElementsIntoSlidingWindows(
+ public static void groupsElementsIntoSlidingWindowsWithMinTimestamp(
GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
throws Exception {
WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(
- SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)));
+ SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
List<WindowedValue<KV<String, Iterable<String>>>> result =
runGABW(gabwFactory, windowingStrategy, "key",
@@ -271,13 +271,13 @@ public class GroupAlsoByWindowsProperties {
WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0);
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
- assertThat(item0.getTimestamp(), equalTo(new Instant(1)));
+ assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));
assertThat(item0.getWindows(),
contains(window(0, 5)));
WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1);
assertThat(item1.getValue().getValue(), contains("v2"));
- assertThat(item1.getTimestamp(), equalTo(new Instant(4)));
+ assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
assertThat(item1.getWindows(),
contains(window(1, 5)));
}
@@ -314,13 +314,13 @@ public class GroupAlsoByWindowsProperties {
WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0);
assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
- assertThat(item0.getTimestamp(), equalTo(new Instant(0)));
+ assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
assertThat(item0.getWindows(),
contains(window(0, 15)));
WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1);
assertThat(item1.getValue().getValue(), contains("v3"));
- assertThat(item1.getTimestamp(), equalTo(new Instant(15)));
+ assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
assertThat(item1.getWindows(),
contains(window(15, 25)));
}
@@ -421,53 +421,6 @@ public class GroupAlsoByWindowsProperties {
/**
* Tests that for a simple sequence of elements on the same key, the given GABW implementation
* correctly groups them according to fixed windows and also sets the output timestamp
- * according to a custom {@link OutputTimeFn}.
- */
- public static void groupsElementsIntoFixedWindowsWithCustomTimestamp(
- GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
- throws Exception {
- WindowingStrategy<?, IntervalWindow> windowingStrategy =
- WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withOutputTimeFn(new OutputTimeFn.Defaults<IntervalWindow>() {
- @Override
- public Instant assignOutputTime(Instant inputTimestamp, IntervalWindow window) {
- return inputTimestamp.isBefore(window.maxTimestamp())
- ? inputTimestamp.plus(1) : window.maxTimestamp();
- }
-
- @Override
- public Instant combine(Instant outputTime, Instant otherOutputTime) {
- return outputTime.isBefore(otherOutputTime) ? outputTime : otherOutputTime;
- }
-
- @Override
- public boolean dependsOnlyOnEarliestInputTimestamp() {
- return true;
- }
- });
-
- List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW(gabwFactory,
- windowingStrategy, "key",
- WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
- WindowedValue.of("v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
- WindowedValue.of("v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
-
- assertThat(result.size(), equalTo(2));
-
- WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0);
- assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
- assertThat(item0.getWindows(), contains(window(0, 10)));
- assertThat(item0.getTimestamp(), equalTo(new Instant(2)));
-
- WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1);
- assertThat(item1.getValue().getValue(), contains("v3"));
- assertThat(item1.getWindows(), contains(window(10, 20)));
- assertThat(item1.getTimestamp(), equalTo(new Instant(14)));
- }
-
- /**
- * Tests that for a simple sequence of elements on the same key, the given GABW implementation
- * correctly groups them according to fixed windows and also sets the output timestamp
* according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
*/
public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
index d9c786d..4ac6164 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
@@ -64,7 +64,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
@Test
public void testGroupsElementsIntoSlidingWindows() throws Exception {
- GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindows(
+ GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp(
new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
}
@@ -93,12 +93,6 @@ public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
}
@Test
- public void testGroupsElementsIntoFixedWindowsWithCustomTimestamp() throws Exception {
- GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithCustomTimestamp(
- new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
- }
-
- @Test
public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception {
GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index f2036eb..41c1710 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
@@ -258,6 +259,7 @@ public class ReduceFnRunnerTest {
.of(FixedWindows.of(Duration.millis(10)))
.withTrigger(mockTrigger)
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withAllowedLateness(Duration.millis(100));
TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
@@ -504,6 +506,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -556,6 +559,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
tester.advanceInputWatermark(new Instant(0));
@@ -582,6 +586,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
@@ -610,6 +615,7 @@ public class ReduceFnRunnerTest {
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
tester.advanceInputWatermark(new Instant(0));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
index f296d65..9916c5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.TriggerBuilder;
@@ -134,6 +135,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception {
WindowingStrategy<?, W> strategy =
WindowingStrategy.of(windowFn)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withTrigger(trigger.buildTrigger())
.withMode(mode)
.withAllowedLateness(allowedDataLateness)
@@ -185,8 +187,11 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
Duration allowedDataLateness) throws Exception {
WindowingStrategy<?, W> strategy =
- WindowingStrategy.of(windowFn).withTrigger(trigger).withMode(mode).withAllowedLateness(
- allowedDataLateness);
+ WindowingStrategy.of(windowFn)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTrigger(trigger)
+ .withMode(mode)
+ .withAllowedLateness(allowedDataLateness);
return combining(strategy, combineFn, outputCoder);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
index f17f64c..b7388ee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
@@ -220,9 +219,8 @@ public class CopyOnAccessInMemoryStateInternalsTest {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
- @SuppressWarnings("unchecked")
- OutputTimeFn<BoundedWindow> outputTimeFn = (OutputTimeFn<BoundedWindow>)
- TestPipeline.create().apply(Create.of("foo")).getWindowingStrategy().getOutputTimeFn();
+ OutputTimeFn<BoundedWindow> outputTimeFn =
+ OutputTimeFns.outputAtEarliestInputTimestamp();
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =
[2/2] incubator-beam git commit: This closes #296
Posted by ke...@apache.org.
This closes #296
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7725a47a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7725a47a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7725a47a
Branch: refs/heads/master
Commit: 7725a47a7918018b834a535baf2c5a5049bb9c6e
Parents: 199ec2e 3755c55
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 9 10:11:34 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 9 10:11:34 2016 -0700
----------------------------------------------------------------------
.../FlinkGroupAlsoByWindowWrapper.java | 2 +-
.../flink/streaming/GroupAlsoByWindowTest.java | 7 +-
.../beam/sdk/testing/WindowFnTestUtils.java | 4 +-
.../sdk/transforms/windowing/OutputTimeFn.java | 1 -
.../beam/sdk/transforms/windowing/Sessions.java | 8 --
.../transforms/windowing/SlidingWindows.java | 23 ++----
.../beam/sdk/transforms/windowing/WindowFn.java | 82 ++++----------------
.../apache/beam/sdk/util/WindowingStrategy.java | 55 ++++++++++++-
.../sdk/transforms/join/CoGroupByKeyTest.java | 7 +-
.../sdk/transforms/windowing/WindowTest.java | 31 +++++---
.../sdk/transforms/windowing/WindowingTest.java | 9 ++-
.../sdk/util/GroupAlsoByWindowsProperties.java | 65 +++-------------
...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 8 +-
.../beam/sdk/util/ReduceFnRunnerTest.java | 6 ++
.../apache/beam/sdk/util/ReduceFnTester.java | 9 ++-
.../CopyOnAccessInMemoryStateInternalsTest.java | 6 +-
16 files changed, 136 insertions(+), 187 deletions(-)
----------------------------------------------------------------------