You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:40:40 UTC
[09/50] [abbrv] incubator-beam git commit: Add LeaderBoardTest
Add LeaderBoardTest
This test exercises the PTransforms that make up the LeaderBoard
example. This includes speculative and late trigger firings to produce
team and individual scores on a global and fixed window basis.
Refactor LeaderBoard to expose the team and user score calculations as
composite PTransforms to enable this testing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00b4e951
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00b4e951
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00b4e951
Branch: refs/heads/gearpump-runner
Commit: 00b4e95148eb98d7fea5877274f2fcf2252ac432
Parents: 74d0195
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 14:20:56 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:11 2016 -0700
----------------------------------------------------------------------
.../examples/complete/game/LeaderBoard.java | 113 ++++--
.../examples/complete/game/LeaderBoardTest.java | 362 +++++++++++++++++++
2 files changed, 440 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00b4e951/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 8dd4e39..13bbf44 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.examples.complete.game;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
@@ -32,6 +33,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -68,7 +70,7 @@ import org.joda.time.format.DateTimeFormatter;
* here we're using an unbounded data source, which lets us provide speculative results, and allows
* handling of late data, at much lower latency. We can use the early/speculative results to keep a
* 'leaderboard' updated in near-realtime. Our handling of late data lets us generate correct
- * results, e.g. for 'team prizes'. We're now outputing window results as they're
+ * results, e.g. for 'team prizes'. We're now outputting window results as they're
* calculated, giving us much lower latency than with the previous batch examples.
*
* <p> Run {@link injector.Injector} to generate pubsub data for this pipeline. The Injector
@@ -186,50 +188,91 @@ public class LeaderBoard extends HourlyTeamScore {
.apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
- // [START DocInclude_WindowAndTrigger]
- // Extract team/score pairs from the event stream, using hour-long windows by default.
- gameEvents
- .apply("LeaderboardTeamFixedWindows", Window.<GameActionInfo>into(
- FixedWindows.of(Duration.standardMinutes(options.getTeamWindowDuration())))
- // We will get early (speculative) results as well as cumulative
- // processing of late data.
- .triggering(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(FIVE_MINUTES))
- .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(TEN_MINUTES)))
- .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))
- .accumulatingFiredPanes())
- // Extract and sum teamname/score pairs from the event data.
- .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+ gameEvents.apply("CalculateTeamScores",
+ new CalculateTeamScores(
+ Duration.standardMinutes(options.getTeamWindowDuration()),
+ Duration.standardMinutes(options.getAllowedLateness())))
// Write the results to BigQuery.
.apply("WriteTeamScoreSums",
new WriteWindowedToBigQuery<KV<String, Integer>>(
options.getTableName() + "_team", configureWindowedTableWrite()));
- // [END DocInclude_WindowAndTrigger]
-
- // [START DocInclude_ProcTimeTrigger]
- // Extract user/score pairs from the event stream using processing time, via global windowing.
- // Get periodic updates on all users' running scores.
gameEvents
- .apply("LeaderboardUserGlobalWindow", Window.<GameActionInfo>into(new GlobalWindows())
- // Get periodic results every ten minutes.
- .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(TEN_MINUTES)))
- .accumulatingFiredPanes()
- .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())))
- // Extract and sum username/score pairs from the event data.
- .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+ .apply(
+ "CalculateUserScores",
+ new CalculateUserScores(Duration.standardMinutes(options.getAllowedLateness())))
// Write the results to BigQuery.
- .apply("WriteUserScoreSums",
- new WriteToBigQuery<KV<String, Integer>>(
- options.getTableName() + "_user", configureGlobalWindowBigQueryWrite()));
- // [END DocInclude_ProcTimeTrigger]
+ .apply(
+ "WriteUserScoreSums",
+ new WriteToBigQuery<KV<String, Integer>>(
+ options.getTableName() + "_user", configureGlobalWindowBigQueryWrite()));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
exampleUtils.waitToFinish(result);
}
+
+ /**
+ * Calculates scores for each team within the configured window duration.
+ */
+ // [START DocInclude_WindowAndTrigger]
+ // Extract team/score pairs from the event stream, using hour-long windows by default.
+ @VisibleForTesting
+ static class CalculateTeamScores
+ extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
+ private final Duration teamWindowDuration;
+ private final Duration allowedLateness;
+
+ CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
+ this.teamWindowDuration = teamWindowDuration;
+ this.allowedLateness = allowedLateness;
+ }
+
+ @Override
+ public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
+ return infos.apply("LeaderboardTeamFixedWindows",
+ Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
+ // We will get early (speculative) results as well as cumulative
+ // processing of late data.
+ .triggering(AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(FIVE_MINUTES))
+ .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(TEN_MINUTES)))
+ .withAllowedLateness(allowedLateness)
+ .accumulatingFiredPanes())
+ // Extract and sum teamname/score pairs from the event data.
+ .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
+ }
+ }
+ // [END DocInclude_WindowAndTrigger]
+
+ // [START DocInclude_ProcTimeTrigger]
+ /**
+ * Extract user/score pairs from the event stream using processing time, via global windowing.
+ * Get periodic updates on all users' running scores.
+ */
+ @VisibleForTesting
+ static class CalculateUserScores
+ extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
+ private final Duration allowedLateness;
+
+ CalculateUserScores(Duration allowedLateness) {
+ this.allowedLateness = allowedLateness;
+ }
+
+ @Override
+ public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
+ return input.apply("LeaderboardUserGlobalWindow",
+ Window.<GameActionInfo>into(new GlobalWindows())
+ // Get periodic results every ten minutes.
+ .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(TEN_MINUTES)))
+ .accumulatingFiredPanes()
+ .withAllowedLateness(allowedLateness))
+ // Extract and sum username/score pairs from the event data.
+ .apply("ExtractUserScore", new ExtractAndSumScore("user"));
+ }
+ }
+ // [END DocInclude_ProcTimeTrigger]
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00b4e951/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
new file mode 100644
index 0000000..40cac36
--- /dev/null
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.complete.game;
+
+import static org.apache.beam.sdk.testing.PAssert.that;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import org.apache.beam.examples.complete.game.LeaderBoard.CalculateTeamScores;
+import org.apache.beam.examples.complete.game.LeaderBoard.CalculateUserScores;
+import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link LeaderBoard}.
+ */
+@RunWith(JUnit4.class)
+public class LeaderBoardTest implements Serializable {
+ private static final Duration ALLOWED_LATENESS = Duration.standardHours(1);
+ private static final Duration TEAM_WINDOW_DURATION = Duration.standardMinutes(20);
+ private Instant baseTime = new Instant(0);
+
+ /**
+ * Some example users, on two separate teams.
+ */
+ private enum TestUser {
+ RED_ONE("scarlet", "red"), RED_TWO("burgundy", "red"),
+ BLUE_ONE("navy", "blue"), BLUE_TWO("sky", "blue");
+
+ private final String userName;
+ private final String teamName;
+
+ TestUser(String userName, String teamName) {
+ this.userName = userName;
+ this.teamName = teamName;
+ }
+
+ public String getUser() {
+ return userName;
+ }
+
+ public String getTeam() {
+ return teamName;
+ }
+ }
+
+ /**
+ * A test of the {@link CalculateTeamScores} {@link PTransform} when all of the elements arrive
+ * on time (ahead of the watermark).
+ */
+ @Test
+ public void testTeamScoresOnTime() {
+ TestPipeline p = TestPipeline.create();
+
+ TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
+ // Start at the epoch
+ .advanceWatermarkTo(baseTime)
+ // add some elements ahead of the watermark
+ .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
+ event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1)),
+ event(TestUser.RED_TWO, 3, Duration.standardSeconds(22)),
+ event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(3)))
+ // The watermark advances slightly, but not past the end of the window
+ .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3)))
+ // Add some more on time elements
+ .addElements(event(TestUser.RED_ONE, 1, Duration.standardMinutes(4)),
+ event(TestUser.BLUE_ONE, 2, Duration.standardSeconds(270)))
+ // The window should close and emit an ON_TIME pane
+ .advanceWatermarkToInfinity();
+
+ PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
+ .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+ String blueTeam = TestUser.BLUE_ONE.getTeam();
+ String redTeam = TestUser.RED_ONE.getTeam();
+ that(teamScores)
+ .inOnTimePane(new IntervalWindow(baseTime, TEAM_WINDOW_DURATION))
+ .containsInAnyOrder(KV.of(blueTeam, 12), KV.of(redTeam, 4));
+
+ p.run();
+ }
+
+ /**
+ * A test of the {@link CalculateTeamScores} {@link PTransform} when all of the elements arrive
+ * on time, and the processing time advances far enough for speculative panes.
+ */
+ @Test
+ public void testTeamScoresSpeculative() {
+ TestPipeline p = TestPipeline.create();
+
+ TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
+ // Start at the epoch
+ .advanceWatermarkTo(baseTime)
+ .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
+ event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1)))
+ // Some time passes within the runner, which causes a speculative pane containing the blue
+ // team's score to be emitted
+ .advanceProcessingTime(Duration.standardMinutes(10))
+ .addElements(event(TestUser.RED_TWO, 5, Duration.standardMinutes(3)))
+ // Some additional time passes and we get a speculative pane for the red team
+ .advanceProcessingTime(Duration.standardMinutes(12))
+ .addElements(event(TestUser.BLUE_TWO, 3, Duration.standardSeconds(22)))
+ // More time passes and a speculative pane containing a refined value for the blue pane is
+ // emitted
+ .advanceProcessingTime(Duration.standardMinutes(10))
+ // Some more events occur
+ .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(4)),
+ event(TestUser.BLUE_TWO, 2, Duration.standardMinutes(2)))
+ // The window closes and we get an ON_TIME pane that contains all of the updates
+ .advanceWatermarkToInfinity();
+
+ PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
+ .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+ String blueTeam = TestUser.BLUE_ONE.getTeam();
+ String redTeam = TestUser.RED_ONE.getTeam();
+ IntervalWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION);
+ // The window contains speculative panes alongside the on-time pane
+ PAssert.that(teamScores)
+ .inWindow(window)
+ .containsInAnyOrder(KV.of(blueTeam, 10) /* The on-time blue pane */,
+ KV.of(redTeam, 9) /* The on-time red pane */,
+ KV.of(blueTeam, 5) /* The first blue speculative pane */,
+ KV.of(blueTeam, 8) /* The second blue speculative pane */,
+ KV.of(redTeam, 5) /* The red speculative pane */);
+ PAssert.that(teamScores)
+ .inOnTimePane(window)
+ .containsInAnyOrder(KV.of(blueTeam, 10), KV.of(redTeam, 9));
+
+ p.run();
+ }
+
+ /**
+ * A test where elements arrive behind the watermark (late data), but before the end of the
+ * window. These elements are emitted on time.
+ */
+ @Test
+ public void testTeamScoresUnobservablyLate() {
+ TestPipeline p = TestPipeline.create();
+
+ BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION);
+ TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
+ .advanceWatermarkTo(baseTime)
+ .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
+ event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(8)),
+ event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
+ event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(5)))
+ .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION).minus(Duration.standardMinutes(1)))
+ // These events are late, but the window hasn't closed yet, so the elements are in the
+ // on-time pane
+ .addElements(event(TestUser.RED_TWO, 2, Duration.ZERO),
+ event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)),
+ event(TestUser.BLUE_TWO, 2, Duration.standardSeconds(90)),
+ event(TestUser.RED_TWO, 3, Duration.standardMinutes(3)))
+ .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))
+ .advanceWatermarkToInfinity();
+ PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
+ .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+ String blueTeam = TestUser.BLUE_ONE.getTeam();
+ String redTeam = TestUser.RED_ONE.getTeam();
+ // The On Time pane contains the late elements that arrived before the end of the window
+ PAssert.that(teamScores)
+ .inOnTimePane(window)
+ .containsInAnyOrder(KV.of(redTeam, 14), KV.of(blueTeam, 13));
+
+ p.run();
+ }
+
+ /**
+ * A test where elements arrive behind the watermark (late data) after the watermark passes the
+ * end of the window, but before the maximum allowed lateness. These elements are emitted in a
+ * late pane.
+ */
+ @Test
+ public void testTeamScoresObservablyLate() {
+ TestPipeline p = TestPipeline.create();
+
+ Instant firstWindowCloses = baseTime.plus(ALLOWED_LATENESS).plus(TEAM_WINDOW_DURATION);
+ TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
+ .advanceWatermarkTo(baseTime)
+ .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
+ event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(8)))
+ .advanceProcessingTime(Duration.standardMinutes(10))
+ .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3)))
+ .addElements(event(TestUser.RED_ONE, 3, Duration.standardMinutes(1)),
+ event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
+ event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(5)))
+ .advanceWatermarkTo(firstWindowCloses.minus(Duration.standardMinutes(1)))
+ // These events are late but should still appear in a late pane
+ .addElements(event(TestUser.RED_TWO, 2, Duration.ZERO),
+ event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)),
+ event(TestUser.RED_TWO, 3, Duration.standardMinutes(3)))
+ // A late refinement is emitted due to the advance in processing time, but the window has
+ // not yet closed because the watermark has not advanced
+ .advanceProcessingTime(Duration.standardMinutes(12))
+ // These elements should appear in the final pane
+ .addElements(event(TestUser.RED_TWO, 9, Duration.standardMinutes(1)),
+ event(TestUser.RED_TWO, 1, Duration.standardMinutes(3)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
+ .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+ BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION);
+ String blueTeam = TestUser.BLUE_ONE.getTeam();
+ String redTeam = TestUser.RED_ONE.getTeam();
+ PAssert.that(teamScores)
+ .inWindow(window)
+ .satisfies((SerializableFunction<Iterable<KV<String, Integer>>, Void>) input -> {
+ // The final sums need not exist in the same pane, but must appear in the output
+ // PCollection
+ assertThat(input, hasItem(KV.of(blueTeam, 11)));
+ assertThat(input, hasItem(KV.of(redTeam, 27)));
+ return null;
+ });
+ PAssert.thatMap(teamScores)
+ // The closing behavior of CalculateTeamScores precludes an inFinalPane matcher
+ .inOnTimePane(window)
+ .isEqualTo(ImmutableMap.<String, Integer>builder().put(redTeam, 7)
+ .put(blueTeam, 11)
+ .build());
+
+ // No final pane is emitted for the blue team, as all of their updates have been taken into
+ // account in earlier panes
+ PAssert.that(teamScores).inFinalPane(window).containsInAnyOrder(KV.of(redTeam, 27));
+
+ p.run();
+ }
+
+ /**
+ * A test where elements arrive beyond the maximum allowed lateness. These elements are dropped
+ * within {@link CalculateTeamScores} and do not impact the final result.
+ */
+ @Test
+ public void testTeamScoresDroppablyLate() {
+ TestPipeline p = TestPipeline.create();
+
+ BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION);
+ TestStream<GameActionInfo> infos = TestStream.create(AvroCoder.of(GameActionInfo.class))
+ .addElements(event(TestUser.BLUE_ONE, 12, Duration.ZERO),
+ event(TestUser.RED_ONE, 3, Duration.ZERO))
+ .advanceWatermarkTo(window.maxTimestamp())
+ .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
+ event(TestUser.BLUE_TWO, 3, Duration.ZERO),
+ event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
+ // Move the watermark past the end of the allowed lateness plus the end of the window
+ .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS)
+ .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))
+ // These elements within the expired window are droppably late, and will not appear in the
+ // output
+ .addElements(
+ event(TestUser.BLUE_TWO, 3, TEAM_WINDOW_DURATION.minus(Duration.standardSeconds(5))),
+ event(TestUser.RED_ONE, 7, Duration.standardMinutes(4)))
+ .advanceWatermarkToInfinity();
+ PCollection<KV<String, Integer>> teamScores = p.apply(infos)
+ .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
+
+ String blueTeam = TestUser.BLUE_ONE.getTeam();
+ String redTeam = TestUser.RED_ONE.getTeam();
+ // Only one on-time pane and no late panes should be emitted
+ PAssert.that(teamScores)
+ .inWindow(window)
+ .containsInAnyOrder(KV.of(redTeam, 7), KV.of(blueTeam, 18));
+ // No elements are added before the watermark passes the end of the window plus the allowed
+ // lateness, so no refinement should be emitted
+ PAssert.that(teamScores).inFinalPane(window).empty();
+ }
+
+ /**
+ * A test where elements arrive both on-time and late in {@link CalculateUserScores}, which emits
+ * output into the {@link GlobalWindow}. All elements that arrive should be taken into account,
+ * even if they arrive later than the maximum allowed lateness.
+ */
+ @Test
+ public void testUserScore() {
+ TestPipeline p = TestPipeline.create();
+
+ TestStream<GameActionInfo> infos =
+ TestStream.create(AvroCoder.of(GameActionInfo.class))
+ .addElements(
+ event(TestUser.BLUE_ONE, 12, Duration.ZERO),
+ event(TestUser.RED_ONE, 3, Duration.ZERO))
+ .advanceProcessingTime(Duration.standardMinutes(7))
+ .addElements(
+ event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
+ event(TestUser.BLUE_TWO, 3, Duration.ZERO),
+ event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
+ .advanceProcessingTime(Duration.standardMinutes(5))
+ .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS).plus(Duration.standardHours(12)))
+ // Late elements are always observable within the global window - they arrive before
+ // the window closes, so they will appear in a pane, even if they arrive after the
+ // allowed lateness, and are taken into account alongside on-time elements
+ .addElements(
+ event(TestUser.RED_ONE, 3, Duration.standardMinutes(7)),
+ event(TestUser.RED_ONE, 2, (ALLOWED_LATENESS).plus(Duration.standardHours(13))))
+ .advanceProcessingTime(Duration.standardMinutes(6))
+ .addElements(event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(12)))
+ .advanceProcessingTime(Duration.standardMinutes(20))
+ .advanceWatermarkToInfinity();
+
+ PCollection<KV<String, Integer>> userScores =
+ p.apply(infos).apply(new CalculateUserScores(ALLOWED_LATENESS));
+
+ // User scores are emitted in speculative panes in the Global Window - this matcher choice
+ // ensures that panes emitted by the watermark advancing to positive infinity are not included,
+ // as that will not occur outside of tests
+ that(userScores)
+ .inEarlyGlobalWindowPanes()
+ .containsInAnyOrder(KV.of(TestUser.BLUE_ONE.getUser(), 15),
+ KV.of(TestUser.RED_ONE.getUser(), 7),
+ KV.of(TestUser.RED_ONE.getUser(), 12),
+ KV.of(TestUser.BLUE_TWO.getUser(), 3),
+ KV.of(TestUser.BLUE_TWO.getUser(), 8));
+
+ p.run();
+ }
+
+ private TimestampedValue<GameActionInfo> event(
+ TestUser user,
+ int score,
+ Duration baseTimeOffset) {
+ return TimestampedValue.of(new GameActionInfo(user.getUser(),
+ user.getTeam(),
+ score,
+ baseTime.plus(baseTimeOffset).getMillis()), baseTime.plus(baseTimeOffset));
+ }
+}