You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/17 21:47:35 UTC
[3/8] incubator-beam git commit: Move Java 8 examples to their own
module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
new file mode 100644
index 0000000..79b55ce
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
@@ -0,0 +1,113 @@
+
+# 'Gaming' examples
+
+
+This directory holds a series of example Dataflow pipelines in a simple 'mobile
+gaming' domain. They all require Java 8. Each pipeline successively introduces
+new concepts, and gives some examples of using Java 8 syntax in constructing
+Dataflow pipelines. Other than usage of Java 8 lambda expressions, the concepts
+that are used apply equally well in Java 7.
+
+In the gaming scenario, many users play, as members of different teams, over
+the course of a day, and their actions are logged for processing. Some of the
+logged game events may be late-arriving, if users play on mobile devices and go
+transiently offline for a period.
+
+The scenario includes not only "regular" users, but "robot users", which have a
+higher click rate than the regular users, and may move from team to team.
+
+The first two pipelines in the series use pre-generated batch data samples. The
+second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/)
+topic input. For these examples, you will also need to run the
+`injector.Injector` program, which generates and publishes the gaming data to
+PubSub. The javadocs for each pipeline have more detailed information on how to
+run that pipeline.
+
+All of these pipelines write their results to BigQuery table(s).
+
+
+## The pipelines in the 'gaming' series
+
+### UserScore
+
+The first pipeline in the series is `UserScore`. This pipeline does batch
+processing of data collected from gaming events. It calculates the sum of
+scores per user, over an entire batch of gaming data (collected, say, for each
+day). The batch processing will not include any late data that arrives after
+the day's cutoff point.
+
+### HourlyTeamScore
+
+The next pipeline in the series is `HourlyTeamScore`. This pipeline also
+processes data collected from gaming events in batch. It builds on `UserScore`,
+but uses [fixed windows](https://cloud.google.com/dataflow/model/windowing), by
+default an hour in duration. It calculates the sum of scores per team, for each
+window, optionally allowing specification of two timestamps before and after
+which data is filtered out. This allows a model where late data collected after
+the intended analysis window can be included in the analysis, and any late-
+arriving data prior to the beginning of the analysis window can be removed as
+well.
+
+By using windowing and adding element timestamps, we can do finer-grained
+analysis than with the `UserScore` pipeline — we're now tracking scores for
+each hour rather than over the course of a whole day. However, our batch
+processing is high-latency, in that we don't get results from plays at the
+beginning of the batch's time period until the complete batch is processed.
+
+### LeaderBoard
+
+The third pipeline in the series is `LeaderBoard`. This pipeline processes an
+unbounded stream of 'game events' from a PubSub topic. The calculation of the
+team scores uses fixed windowing based on event time (the time of the game play
+event), not processing time (the time that an event is processed by the
+pipeline). The pipeline calculates the sum of scores per team, for each window.
+By default, the team scores are calculated using one-hour windows.
+
+In contrast — to demo another windowing option — the user scores are calculated
+using a global window, which periodically (every ten minutes) emits cumulative
+user score sums.
+
+In contrast to the previous pipelines in the series, which used static, finite
+input data, here we're using an unbounded data source, which lets us provide
+_speculative_ results, and allows handling of late data, at much lower latency.
+E.g., we could use the early/speculative results to keep a 'leaderboard'
+updated in near-realtime. Our handling of late data lets us generate correct
+results, e.g. for 'team prizes'. We're now outputing window results as they're
+calculated, giving us much lower latency than with the previous batch examples.
+
+### GameStats
+
+The fourth pipeline in the series is `GameStats`. This pipeline builds
+on the `LeaderBoard` functionality — supporting output of speculative and late
+data — and adds some "business intelligence" analysis: identifying abuse
+detection. The pipeline derives the Mean user score sum for a window, and uses
+that information to identify likely spammers/robots. (The injector is designed
+so that the "robots" have a higher click rate than the "real" users). The robot
+users are then filtered out when calculating the team scores.
+
+Additionally, user sessions are tracked: that is, we find bursts of user
+activity using session windows. Then, the mean session duration information is
+recorded in the context of subsequent fixed windowing. (This could be used to
+tell us what games are giving us greater user retention).
+
+### Running the PubSub Injector
+
+The `LeaderBoard` and `GameStats` example pipelines read unbounded data
+from a PubSub topic.
+
+Use the `injector.Injector` program to generate this data and publish to a
+PubSub topic. See the `Injector`javadocs for more information on how to run the
+injector. Set up the injector before you start one of these pipelines. Then,
+when you start the pipeline, pass as an argument the name of that PubSub topic.
+See the pipeline javadocs for the details.
+
+## Viewing the results in BigQuery
+
+All of the pipelines write their results to BigQuery. `UserScore` and
+`HourlyTeamScore` each write one table, and `LeaderBoard` and
+`GameStats` each write two. The pipelines have default table names that
+you can override when you start up the pipeline if those tables already exist.
+
+Depending on the windowing intervals defined in a given pipeline, you may have
+to wait for a while (more than an hour) before you start to see results written
+to the BigQuery tables.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
new file mode 100644
index 0000000..de06ce3
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.apache.avro.reflect.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class is the first in a series of four pipelines that tell a story in a 'gaming' domain.
+ * Concepts: batch processing; reading input from Google Cloud Storage and writing output to
+ * BigQuery; using standalone DoFns; use of the sum by key transform; examples of
+ * Java 8 lambda syntax.
+ *
+ * <p> In this gaming scenario, many users play, as members of different teams, over the course of a
+ * day, and their actions are logged for processing. Some of the logged game events may be late-
+ * arriving, if users play on mobile devices and go transiently offline for a period.
+ *
+ * <p> This pipeline does batch processing of data collected from gaming events. It calculates the
+ * sum of scores per user, over an entire batch of gaming data (collected, say, for each day). The
+ * batch processing will not include any late data that arrives after the day's cutoff point.
+ *
+ * <p> To execute this pipeline using the Dataflow service and static example input data, specify
+ * the pipeline configuration like this:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * --dataset=YOUR-DATASET
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist.
+ *
+ * <p> Optionally include the --input argument to specify a batch input file.
+ * See the --input default value for example batch data file, or use {@link injector.Injector} to
+ * generate your own batch data.
+ */
+public class UserScore {
+
+ /**
+ * Class to hold info about a game event.
+ */
+ @DefaultCoder(AvroCoder.class)
+ static class GameActionInfo {
+ @Nullable String user;
+ @Nullable String team;
+ @Nullable Integer score;
+ @Nullable Long timestamp;
+
+ public GameActionInfo() {}
+
+ public GameActionInfo(String user, String team, Integer score, Long timestamp) {
+ this.user = user;
+ this.team = team;
+ this.score = score;
+ this.timestamp = timestamp;
+ }
+
+ public String getUser() {
+ return this.user;
+ }
+ public String getTeam() {
+ return this.team;
+ }
+ public Integer getScore() {
+ return this.score;
+ }
+ public String getKey(String keyname) {
+ if (keyname.equals("team")) {
+ return this.team;
+ } else { // return username as default
+ return this.user;
+ }
+ }
+ public Long getTimestamp() {
+ return this.timestamp;
+ }
+ }
+
+
+ /**
+ * Parses the raw game event info into GameActionInfo objects. Each event line has the following
+ * format: username,teamname,score,timestamp_in_ms,readable_time
+ * e.g.:
+ * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+ * The human-readable time string is not used here.
+ */
+ static class ParseEventFn extends DoFn<String, GameActionInfo> {
+
+ // Log and count parse errors.
+ private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
+ private final Aggregator<Long, Long> numParseErrors =
+ createAggregator("ParseErrors", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String[] components = c.element().split(",");
+ try {
+ String user = components[0].trim();
+ String team = components[1].trim();
+ Integer score = Integer.parseInt(components[2].trim());
+ Long timestamp = Long.parseLong(components[3].trim());
+ GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
+ c.output(gInfo);
+ } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
+ numParseErrors.addValue(1L);
+ LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * A transform to extract key/score information from GameActionInfo, and sum the scores. The
+ * constructor arg determines whether 'team' or 'user' info is extracted.
+ */
+ // [START DocInclude_USExtractXform]
+ public static class ExtractAndSumScore
+ extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
+
+ private final String field;
+
+ ExtractAndSumScore(String field) {
+ this.field = field;
+ }
+
+ @Override
+ public PCollection<KV<String, Integer>> apply(
+ PCollection<GameActionInfo> gameInfo) {
+
+ return gameInfo
+ .apply(MapElements
+ .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
+ .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
+ .apply(Sum.<String>integersPerKey());
+ }
+ }
+ // [END DocInclude_USExtractXform]
+
+
+ /**
+ * Options supported by {@link UserScore}.
+ */
+ public static interface Options extends PipelineOptions {
+
+ @Description("Path to the data file(s) containing game data.")
+ // The default maps to two large Google Cloud Storage files (each ~12GB) holding two subsequent
+ // day's worth (roughly) of data.
+ @Default.String("gs://dataflow-samples/game/gaming_data*.csv")
+ String getInput();
+ void setInput(String value);
+
+ @Description("BigQuery Dataset to write tables to. Must already exist.")
+ @Validation.Required
+ String getDataset();
+ void setDataset(String value);
+
+ @Description("The BigQuery table name. Should not already exist.")
+ @Default.String("user_score")
+ String getTableName();
+ void setTableName(String value);
+ }
+
+ /**
+ * Create a map of information that describes how to write pipeline output to BigQuery. This map
+ * is passed to the {@link WriteToBigQuery} constructor to write user score sums.
+ */
+ protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
+ configureBigQueryWrite() {
+ Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+ new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
+ tableConfigure.put("user",
+ new WriteToBigQuery.FieldInfo<KV<String, Integer>>("STRING", c -> c.element().getKey()));
+ tableConfigure.put("total_score",
+ new WriteToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER", c -> c.element().getValue()));
+ return tableConfigure;
+ }
+
+
+ /**
+ * Run a batch pipeline.
+ */
+ // [START DocInclude_USMain]
+ public static void main(String[] args) throws Exception {
+ // Begin constructing a pipeline configured by commandline flags.
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline pipeline = Pipeline.create(options);
+
+ // Read events from a text file and parse them.
+ pipeline.apply(TextIO.Read.from(options.getInput()))
+ .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+ // Extract and sum username/score pairs from the event data.
+ .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+ .apply("WriteUserScoreSums",
+ new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
+ configureBigQueryWrite()));
+
+ // Run the batch pipeline.
+ pipeline.run();
+ }
+ // [END DocInclude_USMain]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
new file mode 100644
index 0000000..1691c54
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
@@ -0,0 +1,415 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.injector;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+
+
+/**
+ * This is a generator that simulates usage data from a mobile game, and either publishes the data
+ * to a pubsub topic or writes it to a file.
+ *
+ * <p> The general model used by the generator is the following. There is a set of teams with team
+ * members. Each member is scoring points for their team. After some period, a team will dissolve
+ * and a new one will be created in its place. There is also a set of 'Robots', or spammer users.
+ * They hop from team to team. The robots are set to have a higher 'click rate' (generate more
+ * events) than the regular team members.
+ *
+ * <p> Each generated line of data has the following form:
+ * username,teamname,score,timestamp_in_ms,readable_time
+ * e.g.:
+ * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+ *
+ * <p> The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
+ * specified. It takes the following arguments:
+ * {@code Injector project-name (topic-name|none) (filename|none)}.
+ *
+ * <p> To run the Injector in the mode where it publishes to PubSub, you will need to authenticate
+ * locally using project-based service account credentials to avoid running over PubSub
+ * quota.
+ * See https://developers.google.com/identity/protocols/application-default-credentials
+ * for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS
+ * environment variable to point to your downloaded service account credentials before starting the
+ * program, e.g.:
+ * {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}.
+ * If you do not do this, then your injector will only run for a few minutes on your
+ * 'user account' credentials before you will start to see quota error messages like:
+ * "Request throttled due to user QPS limit being reached", and see this exception:
+ * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests".
+ * Once you've set up your credentials, run the Injector like this":
+ * <pre>{@code
+ * Injector <project-name> <topic-name> none
+ * }
+ * </pre>
+ * The pubsub topic will be created if it does not exist.
+ *
+ * <p> To run the injector in write-to-file-mode, set the topic name to "none" and specify the
+ * filename:
+ * <pre>{@code
+ * Injector <project-name> none <filename>
+ * }
+ * </pre>
+ */
+class Injector {
+ private static Pubsub pubsub;
+ private static Random random = new Random();
+ private static String topic;
+ private static String project;
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
+
+ // QPS ranges from 800 to 1000.
+ private static final int MIN_QPS = 800;
+ private static final int QPS_RANGE = 200;
+ // How long to sleep, in ms, between creation of the threads that make API requests to PubSub.
+ private static final int THREAD_SLEEP_MS = 500;
+
+ // Lists used to generate random team names.
+ private static final ArrayList<String> COLORS =
+ new ArrayList<String>(Arrays.asList(
+ "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber",
+ "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen",
+ "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana",
+ "Beige", "Bisque", "BarnRed", "BattleshipGrey"));
+
+ private static final ArrayList<String> ANIMALS =
+ new ArrayList<String>(Arrays.asList(
+ "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu",
+ "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus",
+ "Bandicoot", "Cockatoo", "Antechinus"));
+
+ // The list of live teams.
+ private static ArrayList<TeamInfo> liveTeams = new ArrayList<TeamInfo>();
+
+ private static DateTimeFormatter fmt =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+ .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+
+ // The total number of robots in the system.
+ private static final int NUM_ROBOTS = 20;
+ // Determines the chance that a team will have a robot team member.
+ private static final int ROBOT_PROBABILITY = 3;
+ private static final int NUM_LIVE_TEAMS = 15;
+ private static final int BASE_MEMBERS_PER_TEAM = 5;
+ private static final int MEMBERS_PER_TEAM = 15;
+ private static final int MAX_SCORE = 20;
+ private static final int LATE_DATA_RATE = 5 * 60 * 2; // Every 10 minutes
+ private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000; // 5-10 minute delay
+ private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000;
+
+ // The minimum time a 'team' can live.
+ private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20;
+ private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20;
+
+
+ /**
+ * A class for holding team info: the name of the team, when it started,
+ * and the current team members. Teams may but need not include one robot team member.
+ */
+ private static class TeamInfo {
+ String teamName;
+ long startTimeInMillis;
+ int expirationPeriod;
+ // The team might but need not include 1 robot. Will be non-null if so.
+ String robot;
+ int numMembers;
+
+ private TeamInfo(String teamName, long startTimeInMillis, String robot) {
+ this.teamName = teamName;
+ this.startTimeInMillis = startTimeInMillis;
+ // How long until this team is dissolved.
+ this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS) +
+ BASE_TEAM_EXPIRATION_TIME_IN_MINS;
+ this.robot = robot;
+ // Determine the number of team members.
+ numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM;
+ }
+
+ String getTeamName() {
+ return teamName;
+ }
+ String getRobot() {
+ return robot;
+ }
+
+ long getStartTimeInMillis() {
+ return startTimeInMillis;
+ }
+ long getEndTimeInMillis() {
+ return startTimeInMillis + (expirationPeriod * 60 * 1000);
+ }
+ String getRandomUser() {
+ int userNum = random.nextInt(numMembers);
+ return "user" + userNum + "_" + teamName;
+ }
+
+ int numMembers() {
+ return numMembers;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + teamName + ", num members: " + numMembers() + ", starting at: "
+ + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")";
+ }
+ }
+
+ /** Utility to grab a random element from an array of Strings. */
+ private static String randomElement(ArrayList<String> list) {
+ int index = random.nextInt(list.size());
+ return list.get(index);
+ }
+
+ /**
+ * Get and return a random team. If the selected team is too old w.r.t its expiration, remove
+ * it, replacing it with a new team.
+ */
+ private static TeamInfo randomTeam(ArrayList<TeamInfo> list) {
+ int index = random.nextInt(list.size());
+ TeamInfo team = list.get(index);
+ // If the selected team is expired, remove it and return a new team.
+ long currTime = System.currentTimeMillis();
+ if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) {
+ System.out.println("\nteam " + team + " is too old; replacing.");
+ System.out.println("start time: " + team.getStartTimeInMillis() +
+ ", end time: " + team.getEndTimeInMillis() +
+ ", current time:" + currTime);
+ removeTeam(index);
+ // Add a new team in its stead.
+ return (addLiveTeam());
+ } else {
+ return team;
+ }
+ }
+
+ /**
+ * Create and add a team. Possibly add a robot to the team.
+ */
+ private static synchronized TeamInfo addLiveTeam() {
+ String teamName = randomElement(COLORS) + randomElement(ANIMALS);
+ String robot = null;
+ // Decide if we want to add a robot to the team.
+ if (random.nextInt(ROBOT_PROBABILITY) == 0) {
+ robot = "Robot-" + random.nextInt(NUM_ROBOTS);
+ }
+ // Create the new team.
+ TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot);
+ liveTeams.add(newTeam);
+ System.out.println("[+" + newTeam + "]");
+ return newTeam;
+ }
+
+ /**
+ * Remove a specific team.
+ */
+ private static synchronized void removeTeam(int teamIndex) {
+ TeamInfo removedTeam = liveTeams.remove(teamIndex);
+ System.out.println("[-" + removedTeam + "]");
+ }
+
+ /** Generate a user gaming event. */
+ private static String generateEvent(Long currTime, int delayInMillis) {
+ TeamInfo team = randomTeam(liveTeams);
+ String teamName = team.getTeamName();
+ String user;
+ final int parseErrorRate = 900000;
+
+ String robot = team.getRobot();
+ // If the team has an associated robot team member...
+ if (robot != null) {
+ // Then use that robot for the message with some probability.
+ // Set this probability to higher than that used to select any of the 'regular' team
+ // members, so that if there is a robot on the team, it has a higher click rate.
+ if (random.nextInt(team.numMembers() / 2) == 0) {
+ user = robot;
+ } else {
+ user = team.getRandomUser();
+ }
+ } else { // No robot.
+ user = team.getRandomUser();
+ }
+ String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE);
+ // Randomly introduce occasional parse errors. You can see a custom counter tracking the number
+ // of such errors in the Dataflow Monitoring UI, as the example pipeline runs.
+ if (random.nextInt(parseErrorRate) == 0) {
+ System.out.println("Introducing a parse error.");
+ event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR";
+ }
+ return addTimeInfoToEvent(event, currTime, delayInMillis);
+ }
+
+ /**
+ * Add time info to a generated gaming event.
+ */
+ private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) {
+ String eventTimeString =
+ Long.toString((currTime - delayInMillis) / 1000 * 1000);
+ // Add a (redundant) 'human-readable' date string to make the data semantics more clear.
+ String dateString = fmt.print(currTime);
+ message = message + "," + eventTimeString + "," + dateString;
+ return message;
+ }
+
+ /**
+ * Publish 'numMessages' arbitrary events from live users with the provided delay, to a
+ * PubSub topic.
+ */
+ public static void publishData(int numMessages, int delayInMillis)
+ throws IOException {
+ List<PubsubMessage> pubsubMessages = new ArrayList<>();
+
+ for (int i = 0; i < Math.max(1, numMessages); i++) {
+ Long currTime = System.currentTimeMillis();
+ String message = generateEvent(currTime, delayInMillis);
+ PubsubMessage pubsubMessage = new PubsubMessage()
+ .encodeData(message.getBytes("UTF-8"));
+ pubsubMessage.setAttributes(
+ ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
+ Long.toString((currTime - delayInMillis) / 1000 * 1000)));
+ if (delayInMillis != 0) {
+ System.out.println(pubsubMessage.getAttributes());
+ System.out.println("late data for: " + message);
+ }
+ pubsubMessages.add(pubsubMessage);
+ }
+
+ PublishRequest publishRequest = new PublishRequest();
+ publishRequest.setMessages(pubsubMessages);
+ pubsub.projects().topics().publish(topic, publishRequest).execute();
+ }
+
+ /**
+ * Publish generated events to a file.
+ */
+ public static void publishDataToFile(String fileName, int numMessages, int delayInMillis)
+ throws IOException {
+ PrintWriter out = new PrintWriter(new OutputStreamWriter(
+ new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8"));
+
+ try {
+ for (int i = 0; i < Math.max(1, numMessages); i++) {
+ Long currTime = System.currentTimeMillis();
+ String message = generateEvent(currTime, delayInMillis);
+ out.println(message);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (out != null) {
+ out.flush();
+ out.close();
+ }
+ }
+ }
+
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ if (args.length < 3) {
+ System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)");
+ System.exit(1);
+ }
+ boolean writeToFile = false;
+ boolean writeToPubsub = true;
+ project = args[0];
+ String topicName = args[1];
+ String fileName = args[2];
+ // The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
+ // specified; otherwise, it will try to write to a file.
+ if (topicName.equalsIgnoreCase("none")) {
+ writeToFile = true;
+ writeToPubsub = false;
+ }
+ if (writeToPubsub) {
+ // Create the PubSub client.
+ pubsub = InjectorUtils.getClient();
+ // Create the PubSub topic as necessary.
+ topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName);
+ InjectorUtils.createTopic(pubsub, topic);
+ System.out.println("Injecting to topic: " + topic);
+ } else {
+ if (fileName.equalsIgnoreCase("none")) {
+ System.out.println("Filename not specified.");
+ System.exit(1);
+ }
+ System.out.println("Writing to file: " + fileName);
+ }
+ System.out.println("Starting Injector");
+
+ // Start off with some random live teams.
+ while (liveTeams.size() < NUM_LIVE_TEAMS) {
+ addLiveTeam();
+ }
+
+ // Publish messages at a rate determined by the QPS and Thread sleep settings.
+ for (int i = 0; true; i++) {
+ if (Thread.activeCount() > 10) {
+ System.err.println("I'm falling behind!");
+ }
+
+ // Decide if this should be a batch of late data.
+ final int numMessages;
+ final int delayInMillis;
+ if (i % LATE_DATA_RATE == 0) {
+ // Insert delayed data for one user (one message only)
+ delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS);
+ numMessages = 1;
+ System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")");
+ } else {
+ System.out.print(".");
+ delayInMillis = 0;
+ numMessages = MIN_QPS + random.nextInt(QPS_RANGE);
+ }
+
+ if (writeToFile) { // Won't use threading for the file write.
+ publishDataToFile(fileName, numMessages, delayInMillis);
+ } else { // Write to PubSub.
+ // Start a thread to inject some data.
+ new Thread(){
+ @Override
+ public void run() {
+ try {
+ publishData(numMessages, delayInMillis);
+ } catch (IOException e) {
+ System.err.println(e);
+ }
+ }
+ }.start();
+ }
+
+ // Wait before creating another injector thread.
+ Thread.sleep(THREAD_SLEEP_MS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
new file mode 100644
index 0000000..55982df
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.injector;
+
+
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.util.Utils;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.PubsubScopes;
+import com.google.api.services.pubsub.model.Topic;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+
+class InjectorUtils {
+
+ private static final String APP_NAME = "injector";
+
+ /**
+ * Builds a new Pubsub client and returns it.
+ */
+ public static Pubsub getClient(final HttpTransport httpTransport,
+ final JsonFactory jsonFactory)
+ throws IOException {
+ Preconditions.checkNotNull(httpTransport);
+ Preconditions.checkNotNull(jsonFactory);
+ GoogleCredential credential =
+ GoogleCredential.getApplicationDefault(httpTransport, jsonFactory);
+ if (credential.createScopedRequired()) {
+ credential = credential.createScoped(PubsubScopes.all());
+ }
+ if (credential.getClientAuthentication() != null) {
+ System.out.println("\n***Warning! You are not using service account credentials to "
+ + "authenticate.\nYou need to use service account credentials for this example,"
+ + "\nsince user-level credentials do not have enough pubsub quota,\nand so you will run "
+ + "out of PubSub quota very quickly.\nSee "
+ + "https://developers.google.com/identity/protocols/application-default-credentials.");
+ System.exit(1);
+ }
+ HttpRequestInitializer initializer =
+ new RetryHttpInitializerWrapper(credential);
+ return new Pubsub.Builder(httpTransport, jsonFactory, initializer)
+ .setApplicationName(APP_NAME)
+ .build();
+ }
+
+ /**
+ * Builds a new Pubsub client with default HttpTransport and
+ * JsonFactory and returns it.
+ */
+ public static Pubsub getClient() throws IOException {
+ return getClient(Utils.getDefaultTransport(),
+ Utils.getDefaultJsonFactory());
+ }
+
+
+ /**
+ * Returns the fully qualified topic name for Pub/Sub.
+ */
+ public static String getFullyQualifiedTopicName(
+ final String project, final String topic) {
+ return String.format("projects/%s/topics/%s", project, topic);
+ }
+
+ /**
+ * Create a topic if it doesn't exist.
+ */
+ public static void createTopic(Pubsub client, String fullTopicName)
+ throws IOException {
+ try {
+ client.projects().topics().get(fullTopicName).execute();
+ } catch (GoogleJsonResponseException e) {
+ if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
+ Topic topic = client.projects().topics()
+ .create(fullTopicName, new Topic())
+ .execute();
+ System.out.printf("Topic %s was created.\n", topic.getName());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
new file mode 100644
index 0000000..1437534
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.injector;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+/**
+ * RetryHttpInitializerWrapper will automatically retry upon RPC
+ * failures, preserving the auto-refresh behavior of the Google
+ * Credentials.
+ */
+public class RetryHttpInitializerWrapper implements HttpRequestInitializer {
+
+ /**
+ * A private logger.
+ */
+ private static final Logger LOG =
+ Logger.getLogger(RetryHttpInitializerWrapper.class.getName());
+
+ /**
+ * One minutes in miliseconds.
+ */
+ private static final int ONEMINITUES = 60000;
+
+ /**
+ * Intercepts the request for filling in the "Authorization"
+ * header field, as well as recovering from certain unsuccessful
+ * error codes wherein the Credential must refresh its token for a
+ * retry.
+ */
+ private final Credential wrappedCredential;
+
+ /**
+ * A sleeper; you can replace it with a mock in your test.
+ */
+ private final Sleeper sleeper;
+
+ /**
+ * A constructor.
+ *
+ * @param wrappedCredential Credential which will be wrapped and
+ * used for providing auth header.
+ */
+ public RetryHttpInitializerWrapper(final Credential wrappedCredential) {
+ this(wrappedCredential, Sleeper.DEFAULT);
+ }
+
+ /**
+ * A protected constructor only for testing.
+ *
+ * @param wrappedCredential Credential which will be wrapped and
+ * used for providing auth header.
+ * @param sleeper Sleeper for easy testing.
+ */
+ RetryHttpInitializerWrapper(
+ final Credential wrappedCredential, final Sleeper sleeper) {
+ this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
+ this.sleeper = sleeper;
+ }
+
+ /**
+ * Initializes the given request.
+ */
+ @Override
+ public final void initialize(final HttpRequest request) {
+ request.setReadTimeout(2 * ONEMINITUES); // 2 minutes read timeout
+ final HttpUnsuccessfulResponseHandler backoffHandler =
+ new HttpBackOffUnsuccessfulResponseHandler(
+ new ExponentialBackOff())
+ .setSleeper(sleeper);
+ request.setInterceptor(wrappedCredential);
+ request.setUnsuccessfulResponseHandler(
+ new HttpUnsuccessfulResponseHandler() {
+ @Override
+ public boolean handleResponse(
+ final HttpRequest request,
+ final HttpResponse response,
+ final boolean supportsRetry) throws IOException {
+ if (wrappedCredential.handleResponse(
+ request, response, supportsRetry)) {
+ // If credential decides it can handle it,
+ // the return code or message indicated
+ // something specific to authentication,
+ // and no backoff is desired.
+ return true;
+ } else if (backoffHandler.handleResponse(
+ request, response, supportsRetry)) {
+ // Otherwise, we defer to the judgement of
+ // our internal backoff handler.
+ LOG.info("Retrying "
+ + request.getUrl().toString());
+ return true;
+ } else {
+ return false;
+ }
+ }
+ });
+ request.setIOExceptionHandler(
+ new HttpBackOffIOExceptionHandler(new ExponentialBackOff())
+ .setSleeper(sleeper));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
new file mode 100644
index 0000000..2cf719a
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
@@ -0,0 +1,134 @@
+ /*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.utils;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.complete.game.UserScore;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
+import com.google.cloud.dataflow.sdk.options.GcpOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generate, format, and write BigQuery table row information. Use provided information about
+ * the field names and types, as well as lambda functions that describe how to generate their
+ * values.
+ */
+public class WriteToBigQuery<T>
+ extends PTransform<PCollection<T>, PDone> {
+
+ protected String tableName;
+ protected Map<String, FieldInfo<T>> fieldInfo;
+
+ public WriteToBigQuery() {
+ }
+
+ public WriteToBigQuery(String tableName,
+ Map<String, FieldInfo<T>> fieldInfo) {
+ this.tableName = tableName;
+ this.fieldInfo = fieldInfo;
+ }
+
+ /** Define a class to hold information about output table field definitions. */
+ public static class FieldInfo<T> implements Serializable {
+ // The BigQuery 'type' of the field
+ private String fieldType;
+ // A lambda function to generate the field value
+ private SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn;
+
+ public FieldInfo(String fieldType,
+ SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn) {
+ this.fieldType = fieldType;
+ this.fieldFn = fieldFn;
+ }
+
+ String getFieldType() {
+ return this.fieldType;
+ }
+
+ SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> getFieldFn() {
+ return this.fieldFn;
+ }
+ }
+ /** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */
+ protected class BuildRowFn extends DoFn<T, TableRow> {
+
+ @Override
+ public void processElement(ProcessContext c) {
+
+ TableRow row = new TableRow();
+ for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+ String key = entry.getKey();
+ FieldInfo<T> fcnInfo = entry.getValue();
+ SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
+ fcnInfo.getFieldFn();
+ row.set(key, fcn.apply(c));
+ }
+ c.output(row);
+ }
+ }
+
+ /** Build the output table schema. */
+ protected TableSchema getSchema() {
+ List<TableFieldSchema> fields = new ArrayList<>();
+ for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+ String key = entry.getKey();
+ FieldInfo<T> fcnInfo = entry.getValue();
+ String bqType = fcnInfo.getFieldType();
+ fields.add(new TableFieldSchema().setName(key).setType(bqType));
+ }
+ return new TableSchema().setFields(fields);
+ }
+
+ @Override
+ public PDone apply(PCollection<T> teamAndScore) {
+ return teamAndScore
+ .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
+ .apply(BigQueryIO.Write
+ .to(getTable(teamAndScore.getPipeline(),
+ tableName))
+ .withSchema(getSchema())
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ }
+
+ /** Utility to construct an output table reference. */
+ static TableReference getTable(Pipeline pipeline, String tableName) {
+ PipelineOptions options = pipeline.getOptions();
+ TableReference table = new TableReference();
+ table.setDatasetId(options.as(UserScore.Options.class).getDataset());
+ table.setProjectId(options.as(GcpOptions.class).getProject());
+ table.setTableId(tableName);
+ return table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
new file mode 100644
index 0000000..8433021
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -0,0 +1,76 @@
+ /*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.utils;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+import java.util.Map;
+
+/**
+ * Generate, format, and write BigQuery table row information. Subclasses {@link WriteToBigQuery}
+ * to require windowing; so this subclass may be used for writes that require access to the
+ * context's window information.
+ */
+public class WriteWindowedToBigQuery<T>
+ extends WriteToBigQuery<T> {
+
+ public WriteWindowedToBigQuery(String tableName,
+ Map<String, FieldInfo<T>> fieldInfo) {
+ super(tableName, fieldInfo);
+ }
+
+ /** Convert each key/score pair into a BigQuery TableRow. */
+ protected class BuildRowFn extends DoFn<T, TableRow>
+ implements RequiresWindowAccess {
+
+ @Override
+ public void processElement(ProcessContext c) {
+
+ TableRow row = new TableRow();
+ for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+ String key = entry.getKey();
+ FieldInfo<T> fcnInfo = entry.getValue();
+ SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
+ fcnInfo.getFieldFn();
+ row.set(key, fcn.apply(c));
+ }
+ c.output(row);
+ }
+ }
+
+ @Override
+ public PDone apply(PCollection<T> teamAndScore) {
+ return teamAndScore
+ .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
+ .apply(BigQueryIO.Write
+ .to(getTable(teamAndScore.getPipeline(),
+ tableName))
+ .withSchema(getSchema())
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java
new file mode 100644
index 0000000..fcae41c
--- /dev/null
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.GcsOptions;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.FlatMapElements;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * To keep {@link MinimalWordCountJava8} simple, it is not factored or testable. This test
+ * file should be maintained with a copy of its code for a basic smoke test.
+ */
+@RunWith(JUnit4.class)
+public class MinimalWordCountJava8Test implements Serializable {
+
+ /**
+ * A basic smoke test that ensures there is no crash at pipeline construction time.
+ */
+ @Test
+ public void testMinimalWordCountJava8() throws Exception {
+ Pipeline p = TestPipeline.create();
+ p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
+
+ p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
+ .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
+ .withOutputType(new TypeDescriptor<String>() {}))
+ .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+ .apply(Count.<String>perElement())
+ .apply(MapElements
+ .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
+ .withOutputType(new TypeDescriptor<String>() {}))
+ .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+ }
+
+ private GcsUtil buildMockGcsUtil() throws IOException {
+ GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+ // Any request to open gets a new bogus channel
+ Mockito
+ .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+ .then(new Answer<SeekableByteChannel>() {
+ @Override
+ public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+ return FileChannel.open(
+ Files.createTempFile("channel-", ".tmp"),
+ StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+ }
+ });
+
+ // Any request for expansion returns a list containing the original GcsPath
+ // This is required to pass validation that occurs in TextIO during apply()
+ Mockito
+ .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
+ .then(new Answer<List<GcsPath>>() {
+ @Override
+ public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+ return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+ }
+ });
+
+ return mockGcsUtil;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
----------------------------------------------------------------------
diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
new file mode 100644
index 0000000..f77d146
--- /dev/null
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.GameStats.CalculateSpammyUsers;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests of GameStats.
+ * Because the pipeline was designed for easy readability and explanations, it lacks good
+ * modularity for testing. See our testing documentation for better ideas:
+ * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline.
+ */
+@RunWith(JUnit4.class)
+public class GameStatsTest implements Serializable {
+
+ // User scores
+ static final List<KV<String, Integer>> USER_SCORES = Arrays.asList(
+ KV.of("Robot-2", 66), KV.of("Robot-1", 116), KV.of("user7_AndroidGreenKookaburra", 23),
+ KV.of("user7_AndroidGreenKookaburra", 1),
+ KV.of("user19_BisqueBilby", 14), KV.of("user13_ApricotQuokka", 15),
+ KV.of("user18_BananaEmu", 25), KV.of("user6_AmberEchidna", 8),
+ KV.of("user2_AmberQuokka", 6), KV.of("user0_MagentaKangaroo", 4),
+ KV.of("user0_MagentaKangaroo", 3), KV.of("user2_AmberCockatoo", 13),
+ KV.of("user7_AlmondWallaby", 15), KV.of("user6_AmberNumbat", 11),
+ KV.of("user6_AmberQuokka", 4));
+
+ // The expected list of 'spammers'.
+ static final List<KV<String, Integer>> SPAMMERS = Arrays.asList(
+ KV.of("Robot-2", 66), KV.of("Robot-1", 116));
+
+ /** Test the calculation of 'spammy users'. */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testCalculateSpammyUsers() throws Exception {
+ Pipeline p = TestPipeline.create();
+
+ PCollection<KV<String, Integer>> input = p.apply(Create.of(USER_SCORES));
+ PCollection<KV<String, Integer>> output = input.apply(new CalculateSpammyUsers());
+
+ // Check the set of spammers.
+ DataflowAssert.that(output).containsInAnyOrder(SPAMMERS);
+
+ p.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
new file mode 100644
index 0000000..f77a5d4
--- /dev/null
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests of HourlyTeamScore.
+ * Because the pipeline was designed for easy readability and explanations, it lacks good
+ * modularity for testing. See our testing documentation for better ideas:
+ * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline.
+ */
+@RunWith(JUnit4.class)
+public class HourlyTeamScoreTest implements Serializable {
+
+ static final String[] GAME_EVENTS_ARRAY = new String[] {
+ "user0_MagentaKangaroo,MagentaKangaroo,3,1447955630000,2015-11-19 09:53:53.444",
+ "user13_ApricotQuokka,ApricotQuokka,15,1447955630000,2015-11-19 09:53:53.444",
+ "user6_AmberNumbat,AmberNumbat,11,1447955630000,2015-11-19 09:53:53.444",
+ "user7_AlmondWallaby,AlmondWallaby,15,1447955630000,2015-11-19 09:53:53.444",
+ "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,12,1447955630000,2015-11-19 09:53:53.444",
+ "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,11,1447955630000,2015-11-19 09:53:53.444",
+ "user19_BisqueBilby,BisqueBilby,6,1447955630000,2015-11-19 09:53:53.444",
+ "user19_BisqueBilby,BisqueBilby,8,1447955630000,2015-11-19 09:53:53.444",
+ // time gap...
+ "user0_AndroidGreenEchidna,AndroidGreenEchidna,0,1447965690000,2015-11-19 12:41:31.053",
+ "user0_MagentaKangaroo,MagentaKangaroo,4,1447965690000,2015-11-19 12:41:31.053",
+ "user2_AmberCockatoo,AmberCockatoo,13,1447965690000,2015-11-19 12:41:31.053",
+ "user18_BananaEmu,BananaEmu,7,1447965690000,2015-11-19 12:41:31.053",
+ "user3_BananaEmu,BananaEmu,17,1447965690000,2015-11-19 12:41:31.053",
+ "user18_BananaEmu,BananaEmu,1,1447965690000,2015-11-19 12:41:31.053",
+ "user18_ApricotCaneToad,ApricotCaneToad,14,1447965690000,2015-11-19 12:41:31.053"
+ };
+
+
+ static final List<String> GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY);
+
+
+ // Used to check the filtering.
+ static final KV[] FILTERED_EVENTS = new KV[] {
+ KV.of("user0_AndroidGreenEchidna", 0), KV.of("user0_MagentaKangaroo", 4),
+ KV.of("user2_AmberCockatoo", 13),
+ KV.of("user18_BananaEmu", 7), KV.of("user3_BananaEmu", 17),
+ KV.of("user18_BananaEmu", 1), KV.of("user18_ApricotCaneToad", 14)
+ };
+
+
+ /** Test the filtering. */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUserScoresFilter() throws Exception {
+ Pipeline p = TestPipeline.create();
+
+ final Instant startMinTimestamp = new Instant(1447965680000L);
+
+ PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+ PCollection<KV<String, Integer>> output = input
+ .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+
+ .apply("FilterStartTime", Filter.byPredicate(
+ (GameActionInfo gInfo)
+ -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
+ // run a map to access the fields in the result.
+ .apply(MapElements
+ .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
+ .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
+
+ DataflowAssert.that(output).containsInAnyOrder(FILTERED_EVENTS);
+
+ p.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
new file mode 100644
index 0000000..641e2c3
--- /dev/null
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ExtractAndSumScore;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests of UserScore.
+ */
+@RunWith(JUnit4.class)
+public class UserScoreTest implements Serializable {
+
+ static final String[] GAME_EVENTS_ARRAY = new String[] {
+ "user0_MagentaKangaroo,MagentaKangaroo,3,1447955630000,2015-11-19 09:53:53.444",
+ "user13_ApricotQuokka,ApricotQuokka,15,1447955630000,2015-11-19 09:53:53.444",
+ "user6_AmberNumbat,AmberNumbat,11,1447955630000,2015-11-19 09:53:53.444",
+ "user7_AlmondWallaby,AlmondWallaby,15,1447955630000,2015-11-19 09:53:53.444",
+ "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,12,1447955630000,2015-11-19 09:53:53.444",
+ "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444",
+ "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,11,1447955630000,2015-11-19 09:53:53.444",
+ "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444",
+ "user19_BisqueBilby,BisqueBilby,6,1447955630000,2015-11-19 09:53:53.444",
+ "user19_BisqueBilby,BisqueBilby,8,1447955630000,2015-11-19 09:53:53.444"
+ };
+
+ static final String[] GAME_EVENTS_ARRAY2 = new String[] {
+ "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444",
+ "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444",
+ "user13_BisqueBilby,BisqueBilby,xxx,1447955630000,2015-11-19 09:53:53.444"
+ };
+
+ static final List<String> GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY);
+ static final List<String> GAME_EVENTS2 = Arrays.asList(GAME_EVENTS_ARRAY2);
+
+ static final List<KV<String, Integer>> USER_SUMS = Arrays.asList(
+ KV.of("user0_MagentaKangaroo", 3), KV.of("user13_ApricotQuokka", 15),
+ KV.of("user6_AmberNumbat", 11), KV.of("user7_AlmondWallaby", 15),
+ KV.of("user7_AndroidGreenKookaburra", 23),
+ KV.of("user19_BisqueBilby", 14));
+
+ static final List<KV<String, Integer>> TEAM_SUMS = Arrays.asList(
+ KV.of("MagentaKangaroo", 3), KV.of("ApricotQuokka", 15),
+ KV.of("AmberNumbat", 11), KV.of("AlmondWallaby", 15),
+ KV.of("AndroidGreenKookaburra", 23),
+ KV.of("BisqueBilby", 14));
+
+ /** Test the ParseEventFn DoFn. */
+ @Test
+ public void testParseEventFn() {
+ DoFnTester<String, GameActionInfo> parseEventFn =
+ DoFnTester.of(new ParseEventFn());
+
+ List<GameActionInfo> results = parseEventFn.processBatch(GAME_EVENTS_ARRAY);
+ Assert.assertEquals(results.size(), 8);
+ Assert.assertEquals(results.get(0).getUser(), "user0_MagentaKangaroo");
+ Assert.assertEquals(results.get(0).getTeam(), "MagentaKangaroo");
+ Assert.assertEquals(results.get(0).getScore(), new Integer(3));
+ }
+
+ /** Tests ExtractAndSumScore("user"). */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUserScoreSums() throws Exception {
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+ PCollection<KV<String, Integer>> output = input
+ .apply(ParDo.of(new ParseEventFn()))
+ // Extract and sum username/score pairs from the event data.
+ .apply("ExtractUserScore", new ExtractAndSumScore("user"));
+
+ // Check the user score sums.
+ DataflowAssert.that(output).containsInAnyOrder(USER_SUMS);
+
+ p.run();
+ }
+
+ /** Tests ExtractAndSumScore("team"). */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testTeamScoreSums() throws Exception {
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+ PCollection<KV<String, Integer>> output = input
+ .apply(ParDo.of(new ParseEventFn()))
+ // Extract and sum teamname/score pairs from the event data.
+ .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
+
+ // Check the team score sums.
+ DataflowAssert.that(output).containsInAnyOrder(TEAM_SUMS);
+
+ p.run();
+ }
+
+ /** Test that bad input data is dropped appropriately. */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUserScoresBadInput() throws Exception {
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(GAME_EVENTS2).withCoder(StringUtf8Coder.of()));
+
+ PCollection<KV<String, Integer>> extract = input
+ .apply(ParDo.of(new ParseEventFn()))
+ .apply(
+ MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
+ .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
+
+ DataflowAssert.that(extract).empty();
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8a44ea9..1cd3a15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,15 @@
</modules>
</profile>
<profile>
+ <id>java8-examples</id>
+ <activation>
+ <jdk>[1.8,)</jdk>
+ </activation>
+ <modules>
+ <module>java8examples</module>
+ </modules>
+ </profile>
+ <profile>
<id>doclint-java8-disable</id>
<activation>
<jdk>[1.8,)</jdk>