You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/17 21:47:35 UTC

[3/8] incubator-beam git commit: Move Java 8 examples to their own module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
new file mode 100644
index 0000000..79b55ce
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
@@ -0,0 +1,113 @@
+
+# 'Gaming' examples
+
+
+This directory holds a series of example Dataflow pipelines in a simple 'mobile
+gaming' domain. They all require Java 8.  Each pipeline successively introduces
+new concepts, and gives some examples of using Java 8 syntax in constructing
+Dataflow pipelines. Other than usage of Java 8 lambda expressions, the concepts
+that are used apply equally well in Java 7.
+
+In the gaming scenario, many users play, as members of different teams, over
+the course of a day, and their actions are logged for processing. Some of the
+logged game events may be late-arriving, if users play on mobile devices and go
+transiently offline for a period.
+
+The scenario includes not only "regular" users, but "robot users", which have a
+higher click rate than the regular users, and may move from team to team.
+
+The first two pipelines in the series use pre-generated batch data samples. The
+second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/)
+topic input.  For these examples, you will also need to run the
+`injector.Injector` program, which generates and publishes the gaming data to
+PubSub. The javadocs for each pipeline have more detailed information on how to
+run that pipeline.
+
+All of these pipelines write their results to BigQuery table(s).
+
+
+## The pipelines in the 'gaming' series
+
+### UserScore
+
+The first pipeline in the series is `UserScore`. This pipeline does batch
+processing of data collected from gaming events. It calculates the sum of
+scores per user, over an entire batch of gaming data (collected, say, for each
+day). The batch processing will not include any late data that arrives after
+the day's cutoff point.
+
+### HourlyTeamScore
+
+The next pipeline in the series is `HourlyTeamScore`. This pipeline also
+processes data collected from gaming events in batch. It builds on `UserScore`,
+but uses [fixed windows](https://cloud.google.com/dataflow/model/windowing), by
+default an hour in duration. It calculates the sum of scores per team, for each
+window, optionally allowing specification of two timestamps before and after
+which data is filtered out. This allows a model where late data collected after
+the intended analysis window can be included in the analysis, and any late-
+arriving data prior to the beginning of the analysis window can be removed as
+well.
+
+By using windowing and adding element timestamps, we can do finer-grained
+analysis than with the `UserScore` pipeline — we're now tracking scores for
+each hour rather than over the course of a whole day. However, our batch
+processing is high-latency, in that we don't get results from plays at the
+beginning of the batch's time period until the complete batch is processed.
+
+### LeaderBoard
+
+The third pipeline in the series is `LeaderBoard`. This pipeline processes an
+unbounded stream of 'game events' from a PubSub topic. The calculation of the
+team scores uses fixed windowing based on event time (the time of the game play
+event), not processing time (the time that an event is processed by the
+pipeline). The pipeline calculates the sum of scores per team, for each window.
+By default, the team scores are calculated using one-hour windows.
+
+In contrast — to demo another windowing option — the user scores are calculated
+using a global window, which periodically (every ten minutes) emits cumulative
+user score sums.
+
+In contrast to the previous pipelines in the series, which used static, finite
+input data, here we're using an unbounded data source, which lets us provide
+_speculative_ results, and allows handling of late data, at much lower latency.
+E.g., we could use the early/speculative results to keep a 'leaderboard'
+updated in near-realtime. Our handling of late data lets us generate correct
+results, e.g. for 'team prizes'. We're now outputing window results as they're
+calculated, giving us much lower latency than with the previous batch examples.
+
+### GameStats
+
+The fourth pipeline in the series is `GameStats`. This pipeline builds
+on the `LeaderBoard` functionality — supporting output of speculative and late
+data — and adds some "business intelligence" analysis: identifying abuse
+detection. The pipeline derives the Mean user score sum for a window, and uses
+that information to identify likely spammers/robots. (The injector is designed
+so that the "robots" have a higher click rate than the "real" users). The robot
+users are then filtered out when calculating the team scores.
+
+Additionally, user sessions are tracked: that is, we find bursts of user
+activity using session windows. Then, the mean session duration information is
+recorded in the context of subsequent fixed windowing. (This could be used to
+tell us what games are giving us greater user retention).
+
+### Running the PubSub Injector
+
+The `LeaderBoard` and `GameStats` example pipelines read unbounded data
+from a PubSub topic.
+
+Use the `injector.Injector` program to generate this data and publish to a
+PubSub topic. See the `Injector`javadocs for more information on how to run the
+injector. Set up the injector before you start one of these pipelines. Then,
+when you start the pipeline, pass as an argument the name of that PubSub topic.
+See the pipeline javadocs for the details.
+
+## Viewing the results in BigQuery
+
+All of the pipelines write their results to BigQuery.  `UserScore` and
+`HourlyTeamScore` each write one table, and `LeaderBoard` and
+`GameStats` each write two. The pipelines have default table names that
+you can override when you start up the pipeline if those tables already exist.
+
+Depending on the windowing intervals defined in a given pipeline, you may have
+to wait for a while (more than an hour) before you start to see results written
+to the BigQuery tables.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
new file mode 100644
index 0000000..de06ce3
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.apache.avro.reflect.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class is the first in a series of four pipelines that tell a story in a 'gaming' domain.
+ * Concepts: batch processing; reading input from Google Cloud Storage and writing output to
+ * BigQuery; using standalone DoFns; use of the sum by key transform; examples of
+ * Java 8 lambda syntax.
+ *
+ * <p> In this gaming scenario, many users play, as members of different teams, over the course of a
+ * day, and their actions are logged for processing.  Some of the logged game events may be late-
+ * arriving, if users play on mobile devices and go transiently offline for a period.
+ *
+ * <p> This pipeline does batch processing of data collected from gaming events. It calculates the
+ * sum of scores per user, over an entire batch of gaming data (collected, say, for each day). The
+ * batch processing will not include any late data that arrives after the day's cutoff point.
+ *
+ * <p> To execute this pipeline using the Dataflow service and static example input data, specify
+ * the pipeline configuration like this:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ *   --dataset=YOUR-DATASET
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist.
+ *
+ * <p> Optionally include the --input argument to specify a batch input file.
+ * See the --input default value for example batch data file, or use {@link injector.Injector} to
+ * generate your own batch data.
+  */
+public class UserScore {
+
+  /**
+   * Class to hold info about a game event.
+   */
+  @DefaultCoder(AvroCoder.class)
+  static class GameActionInfo {
+    @Nullable String user;
+    @Nullable String team;
+    @Nullable Integer score;
+    @Nullable Long timestamp;
+
+    public GameActionInfo() {}
+
+    public GameActionInfo(String user, String team, Integer score, Long timestamp) {
+      this.user = user;
+      this.team = team;
+      this.score = score;
+      this.timestamp = timestamp;
+    }
+
+    public String getUser() {
+      return this.user;
+    }
+    public String getTeam() {
+      return this.team;
+    }
+    public Integer getScore() {
+      return this.score;
+    }
+    public String getKey(String keyname) {
+      if (keyname.equals("team")) {
+        return this.team;
+      } else {  // return username as default
+        return this.user;
+      }
+    }
+    public Long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+
+  /**
+   * Parses the raw game event info into GameActionInfo objects. Each event line has the following
+   * format: username,teamname,score,timestamp_in_ms,readable_time
+   * e.g.:
+   * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+   * The human-readable time string is not used here.
+   */
+  static class ParseEventFn extends DoFn<String, GameActionInfo> {
+
+    // Log and count parse errors.
+    private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
+    private final Aggregator<Long, Long> numParseErrors =
+        createAggregator("ParseErrors", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c) {
+      String[] components = c.element().split(",");
+      try {
+        String user = components[0].trim();
+        String team = components[1].trim();
+        Integer score = Integer.parseInt(components[2].trim());
+        Long timestamp = Long.parseLong(components[3].trim());
+        GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
+        c.output(gInfo);
+      } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
+        numParseErrors.addValue(1L);
+        LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * A transform to extract key/score information from GameActionInfo, and sum the scores. The
+   * constructor arg determines whether 'team' or 'user' info is extracted.
+   */
+  // [START DocInclude_USExtractXform]
+  public static class ExtractAndSumScore
+      extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
+
+    private final String field;
+
+    ExtractAndSumScore(String field) {
+      this.field = field;
+    }
+
+    @Override
+    public PCollection<KV<String, Integer>> apply(
+        PCollection<GameActionInfo> gameInfo) {
+
+      return gameInfo
+        .apply(MapElements
+            .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
+            .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
+        .apply(Sum.<String>integersPerKey());
+    }
+  }
+  // [END DocInclude_USExtractXform]
+
+
+  /**
+   * Options supported by {@link UserScore}.
+   */
+  public static interface Options extends PipelineOptions {
+
+    @Description("Path to the data file(s) containing game data.")
+    // The default maps to two large Google Cloud Storage files (each ~12GB) holding two subsequent
+    // day's worth (roughly) of data.
+    @Default.String("gs://dataflow-samples/game/gaming_data*.csv")
+    String getInput();
+    void setInput(String value);
+
+    @Description("BigQuery Dataset to write tables to. Must already exist.")
+    @Validation.Required
+    String getDataset();
+    void setDataset(String value);
+
+    @Description("The BigQuery table name. Should not already exist.")
+    @Default.String("user_score")
+    String getTableName();
+    void setTableName(String value);
+  }
+
+  /**
+   * Create a map of information that describes how to write pipeline output to BigQuery. This map
+   * is passed to the {@link WriteToBigQuery} constructor to write user score sums.
+   */
+  protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
+    configureBigQueryWrite() {
+    Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+        new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
+    tableConfigure.put("user",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>("STRING", c -> c.element().getKey()));
+    tableConfigure.put("total_score",
+        new WriteToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER", c -> c.element().getValue()));
+    return tableConfigure;
+  }
+
+
+  /**
+   * Run a batch pipeline.
+   */
+ // [START DocInclude_USMain]
+  public static void main(String[] args) throws Exception {
+    // Begin constructing a pipeline configured by commandline flags.
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    // Read events from a text file and parse them.
+    pipeline.apply(TextIO.Read.from(options.getInput()))
+      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+      // Extract and sum username/score pairs from the event data.
+      .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+      .apply("WriteUserScoreSums",
+          new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
+                                                   configureBigQueryWrite()));
+
+    // Run the batch pipeline.
+    pipeline.run();
+  }
+  // [END DocInclude_USMain]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
new file mode 100644
index 0000000..1691c54
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
@@ -0,0 +1,415 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.injector;
+
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.model.PublishRequest;
+import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+
+
+/**
+ * This is a generator that simulates usage data from a mobile game, and either publishes the data
+ * to a pubsub topic or writes it to a file.
+ *
+ * <p> The general model used by the generator is the following. There is a set of teams with team
+ * members. Each member is scoring points for their team. After some period, a team will dissolve
+ * and a new one will be created in its place. There is also a set of 'Robots', or spammer users.
+ * They hop from team to team. The robots are set to have a higher 'click rate' (generate more
+ * events) than the regular team members.
+ *
+ * <p> Each generated line of data has the following form:
+ * username,teamname,score,timestamp_in_ms,readable_time
+ * e.g.:
+ * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+ *
+ * <p> The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
+ * specified. It takes the following arguments:
+ * {@code Injector project-name (topic-name|none) (filename|none)}.
+ *
+ * <p> To run the Injector in the mode where it publishes to PubSub, you will need to authenticate
+ * locally using project-based service account credentials to avoid running over PubSub
+ * quota.
+ * See https://developers.google.com/identity/protocols/application-default-credentials
+ * for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS
+ * environment variable to point to your downloaded service account credentials before starting the
+ * program, e.g.:
+ * {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}.
+ * If you do not do this, then your injector will only run for a few minutes on your
+ * 'user account' credentials before you will start to see quota error messages like:
+ * "Request throttled due to user QPS limit being reached", and see this exception:
+ * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests".
+ * Once you've set up your credentials, run the Injector like this":
+  * <pre>{@code
+ * Injector <project-name> <topic-name> none
+ * }
+ * </pre>
+ * The pubsub topic will be created if it does not exist.
+ *
+ * <p> To run the injector in write-to-file-mode, set the topic name to "none" and specify the
+ * filename:
+ * <pre>{@code
+ * Injector <project-name> none <filename>
+ * }
+ * </pre>
+ */
+class Injector {
+  private static Pubsub pubsub;
+  private static Random random = new Random();
+  private static String topic;
+  private static String project;
+  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
+
+  // QPS ranges from 800 to 1000.
+  private static final int MIN_QPS = 800;
+  private static final int QPS_RANGE = 200;
+  // How long to sleep, in ms, between creation of the threads that make API requests to PubSub.
+  private static final int THREAD_SLEEP_MS = 500;
+
+  // Lists used to generate random team names.
+  private static final ArrayList<String> COLORS =
+      new ArrayList<String>(Arrays.asList(
+         "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber",
+         "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen",
+         "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana",
+         "Beige", "Bisque", "BarnRed", "BattleshipGrey"));
+
+  private static final ArrayList<String> ANIMALS =
+      new ArrayList<String>(Arrays.asList(
+         "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu",
+         "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus",
+         "Bandicoot", "Cockatoo", "Antechinus"));
+
+  // The list of live teams.
+  private static ArrayList<TeamInfo> liveTeams = new ArrayList<TeamInfo>();
+
+  private static DateTimeFormatter fmt =
+    DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+        .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+
+  // The total number of robots in the system.
+  private static final int NUM_ROBOTS = 20;
+  // Determines the chance that a team will have a robot team member.
+  private static final int ROBOT_PROBABILITY = 3;
+  private static final int NUM_LIVE_TEAMS = 15;
+  private static final int BASE_MEMBERS_PER_TEAM = 5;
+  private static final int MEMBERS_PER_TEAM = 15;
+  private static final int MAX_SCORE = 20;
+  private static final int LATE_DATA_RATE = 5 * 60 * 2;       // Every 10 minutes
+  private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000;  // 5-10 minute delay
+  private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000;
+
+  // The minimum time a 'team' can live.
+  private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20;
+  private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20;
+
+
+  /**
+   * A class for holding team info: the name of the team, when it started,
+   * and the current team members. Teams may but need not include one robot team member.
+   */
+  private static class TeamInfo {
+    String teamName;
+    long startTimeInMillis;
+    int expirationPeriod;
+    // The team might but need not include 1 robot. Will be non-null if so.
+    String robot;
+    int numMembers;
+
+    private TeamInfo(String teamName, long startTimeInMillis, String robot) {
+      this.teamName = teamName;
+      this.startTimeInMillis = startTimeInMillis;
+      // How long until this team is dissolved.
+      this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS) +
+        BASE_TEAM_EXPIRATION_TIME_IN_MINS;
+      this.robot = robot;
+      // Determine the number of team members.
+      numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM;
+    }
+
+    String getTeamName() {
+      return teamName;
+    }
+    String getRobot() {
+      return robot;
+    }
+
+    long getStartTimeInMillis() {
+      return startTimeInMillis;
+    }
+    long getEndTimeInMillis() {
+      return startTimeInMillis + (expirationPeriod * 60 * 1000);
+    }
+    String getRandomUser() {
+      int userNum = random.nextInt(numMembers);
+      return "user" + userNum + "_" + teamName;
+    }
+
+    int numMembers() {
+      return numMembers;
+    }
+
+    @Override
+    public String toString() {
+      return "(" + teamName + ", num members: " + numMembers() + ", starting at: "
+        + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")";
+    }
+  }
+
+  /** Utility to grab a random element from an array of Strings. */
+  private static String randomElement(ArrayList<String> list) {
+    int index = random.nextInt(list.size());
+    return list.get(index);
+  }
+
+  /**
+   * Get and return a random team. If the selected team is too old w.r.t its expiration, remove
+   * it, replacing it with a new team.
+   */
+  private static TeamInfo randomTeam(ArrayList<TeamInfo> list) {
+    int index = random.nextInt(list.size());
+    TeamInfo team = list.get(index);
+    // If the selected team is expired, remove it and return a new team.
+    long currTime = System.currentTimeMillis();
+    if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) {
+      System.out.println("\nteam " + team + " is too old; replacing.");
+      System.out.println("start time: " + team.getStartTimeInMillis() +
+        ", end time: " + team.getEndTimeInMillis() +
+        ", current time:" + currTime);
+      removeTeam(index);
+      // Add a new team in its stead.
+      return (addLiveTeam());
+    } else {
+      return team;
+    }
+  }
+
+  /**
+   * Create and add a team. Possibly add a robot to the team.
+   */
+  private static synchronized TeamInfo addLiveTeam() {
+    String teamName = randomElement(COLORS) + randomElement(ANIMALS);
+    String robot = null;
+    // Decide if we want to add a robot to the team.
+    if (random.nextInt(ROBOT_PROBABILITY) == 0) {
+      robot = "Robot-" + random.nextInt(NUM_ROBOTS);
+    }
+    // Create the new team.
+    TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot);
+    liveTeams.add(newTeam);
+    System.out.println("[+" + newTeam + "]");
+    return newTeam;
+  }
+
+  /**
+   * Remove a specific team.
+   */
+  private static synchronized void removeTeam(int teamIndex) {
+    TeamInfo removedTeam = liveTeams.remove(teamIndex);
+    System.out.println("[-" + removedTeam + "]");
+  }
+
+  /** Generate a user gaming event. */
+  private static String generateEvent(Long currTime, int delayInMillis) {
+    TeamInfo team = randomTeam(liveTeams);
+    String teamName = team.getTeamName();
+    String user;
+    final int parseErrorRate = 900000;
+
+    String robot = team.getRobot();
+    // If the team has an associated robot team member...
+    if (robot != null) {
+      // Then use that robot for the message with some probability.
+      // Set this probability to higher than that used to select any of the 'regular' team
+      // members, so that if there is a robot on the team, it has a higher click rate.
+      if (random.nextInt(team.numMembers() / 2) == 0) {
+        user = robot;
+      } else {
+        user = team.getRandomUser();
+      }
+    } else { // No robot.
+      user = team.getRandomUser();
+    }
+    String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE);
+    // Randomly introduce occasional parse errors. You can see a custom counter tracking the number
+    // of such errors in the Dataflow Monitoring UI, as the example pipeline runs.
+    if (random.nextInt(parseErrorRate) == 0) {
+      System.out.println("Introducing a parse error.");
+      event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR";
+    }
+    return addTimeInfoToEvent(event, currTime, delayInMillis);
+  }
+
+  /**
+   * Add time info to a generated gaming event.
+   */
+  private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) {
+    String eventTimeString =
+        Long.toString((currTime - delayInMillis) / 1000 * 1000);
+    // Add a (redundant) 'human-readable' date string to make the data semantics more clear.
+    String dateString = fmt.print(currTime);
+    message = message + "," + eventTimeString + "," + dateString;
+    return message;
+  }
+
+  /**
+   * Publish 'numMessages' arbitrary events from live users with the provided delay, to a
+   * PubSub topic.
+   */
+  public static void publishData(int numMessages, int delayInMillis)
+      throws IOException {
+    List<PubsubMessage> pubsubMessages = new ArrayList<>();
+
+    for (int i = 0; i < Math.max(1, numMessages); i++) {
+      Long currTime = System.currentTimeMillis();
+      String message = generateEvent(currTime, delayInMillis);
+      PubsubMessage pubsubMessage = new PubsubMessage()
+              .encodeData(message.getBytes("UTF-8"));
+      pubsubMessage.setAttributes(
+          ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
+              Long.toString((currTime - delayInMillis) / 1000 * 1000)));
+      if (delayInMillis != 0) {
+        System.out.println(pubsubMessage.getAttributes());
+        System.out.println("late data for: " + message);
+      }
+      pubsubMessages.add(pubsubMessage);
+    }
+
+    PublishRequest publishRequest = new PublishRequest();
+    publishRequest.setMessages(pubsubMessages);
+    pubsub.projects().topics().publish(topic, publishRequest).execute();
+  }
+
+  /**
+   * Publish generated events to a file.
+   */
+  public static void publishDataToFile(String fileName, int numMessages, int delayInMillis)
+      throws IOException {
+    PrintWriter out = new PrintWriter(new OutputStreamWriter(
+        new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8"));
+
+    try {
+      for (int i = 0; i < Math.max(1, numMessages); i++) {
+        Long currTime = System.currentTimeMillis();
+        String message = generateEvent(currTime, delayInMillis);
+        out.println(message);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (out != null) {
+        out.flush();
+        out.close();
+      }
+    }
+  }
+
+
+  public static void main(String[] args) throws IOException, InterruptedException {
+    if (args.length < 3) {
+      System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)");
+      System.exit(1);
+    }
+    boolean writeToFile = false;
+    boolean writeToPubsub = true;
+    project = args[0];
+    String topicName = args[1];
+    String fileName = args[2];
+    // The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
+    // specified; otherwise, it will try to write to a file.
+    if (topicName.equalsIgnoreCase("none")) {
+      writeToFile = true;
+      writeToPubsub = false;
+    }
+    if (writeToPubsub) {
+      // Create the PubSub client.
+      pubsub = InjectorUtils.getClient();
+      // Create the PubSub topic as necessary.
+      topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName);
+      InjectorUtils.createTopic(pubsub, topic);
+      System.out.println("Injecting to topic: " + topic);
+    } else {
+      if (fileName.equalsIgnoreCase("none")) {
+        System.out.println("Filename not specified.");
+        System.exit(1);
+      }
+      System.out.println("Writing to file: " + fileName);
+    }
+    System.out.println("Starting Injector");
+
+    // Start off with some random live teams.
+    while (liveTeams.size() < NUM_LIVE_TEAMS) {
+      addLiveTeam();
+    }
+
+    // Publish messages at a rate determined by the QPS and Thread sleep settings.
+    for (int i = 0; true; i++) {
+      if (Thread.activeCount() > 10) {
+        System.err.println("I'm falling behind!");
+      }
+
+      // Decide if this should be a batch of late data.
+      final int numMessages;
+      final int delayInMillis;
+      if (i % LATE_DATA_RATE == 0) {
+        // Insert delayed data for one user (one message only)
+        delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS);
+        numMessages = 1;
+        System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")");
+      } else {
+        System.out.print(".");
+        delayInMillis = 0;
+        numMessages = MIN_QPS + random.nextInt(QPS_RANGE);
+      }
+
+      if (writeToFile) { // Won't use threading for the file write.
+        publishDataToFile(fileName, numMessages, delayInMillis);
+      } else { // Write to PubSub.
+        // Start a thread to inject some data.
+        new Thread(){
+          @Override
+          public void run() {
+            try {
+              publishData(numMessages, delayInMillis);
+            } catch (IOException e) {
+              System.err.println(e);
+            }
+          }
+        }.start();
+      }
+
+      // Wait before creating another injector thread.
+      Thread.sleep(THREAD_SLEEP_MS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
new file mode 100644
index 0000000..55982df
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.injector;
+
+
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.util.Utils;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.PubsubScopes;
+import com.google.api.services.pubsub.model.Topic;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+
+class InjectorUtils {
+
+  private static final String APP_NAME = "injector";
+
+  /**
+   * Builds a new Pubsub client and returns it.
+   */
+  public static Pubsub getClient(final HttpTransport httpTransport,
+                                 final JsonFactory jsonFactory)
+           throws IOException {
+      Preconditions.checkNotNull(httpTransport);
+      Preconditions.checkNotNull(jsonFactory);
+      GoogleCredential credential =
+          GoogleCredential.getApplicationDefault(httpTransport, jsonFactory);
+      if (credential.createScopedRequired()) {
+          credential = credential.createScoped(PubsubScopes.all());
+      }
+      if (credential.getClientAuthentication() != null) {
+        System.out.println("\n***Warning! You are not using service account credentials to "
+          + "authenticate.\nYou need to use service account credentials for this example,"
+          + "\nsince user-level credentials do not have enough pubsub quota,\nand so you will run "
+          + "out of PubSub quota very quickly.\nSee "
+          + "https://developers.google.com/identity/protocols/application-default-credentials.");
+        System.exit(1);
+      }
+      HttpRequestInitializer initializer =
+          new RetryHttpInitializerWrapper(credential);
+      return new Pubsub.Builder(httpTransport, jsonFactory, initializer)
+              .setApplicationName(APP_NAME)
+              .build();
+  }
+
+  /**
+   * Builds a new Pubsub client with default HttpTransport and
+   * JsonFactory and returns it.
+   */
+  public static Pubsub getClient() throws IOException {
+      return getClient(Utils.getDefaultTransport(),
+                       Utils.getDefaultJsonFactory());
+  }
+
+
+  /**
+   * Returns the fully qualified topic name for Pub/Sub.
+   */
+  public static String getFullyQualifiedTopicName(
+          final String project, final String topic) {
+      return String.format("projects/%s/topics/%s", project, topic);
+  }
+
+  /**
+   * Create a topic if it doesn't exist.
+   */
+  public static void createTopic(Pubsub client, String fullTopicName)
+      throws IOException {
+    try {
+        client.projects().topics().get(fullTopicName).execute();
+    } catch (GoogleJsonResponseException e) {
+      if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
+        Topic topic = client.projects().topics()
+                .create(fullTopicName, new Topic())
+                .execute();
+        System.out.printf("Topic %s was created.\n", topic.getName());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
new file mode 100644
index 0000000..1437534
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.injector;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+/**
+ * RetryHttpInitializerWrapper will automatically retry upon RPC
+ * failures, preserving the auto-refresh behavior of the Google
+ * Credentials.
+ */
+public class RetryHttpInitializerWrapper implements HttpRequestInitializer {
+
+    /**
+     * A private logger.
+     */
+    private static final Logger LOG =
+            Logger.getLogger(RetryHttpInitializerWrapper.class.getName());
+
+    /**
+     * One minutes in miliseconds.
+     */
+    private static final int ONEMINITUES = 60000;
+
+    /**
+     * Intercepts the request for filling in the "Authorization"
+     * header field, as well as recovering from certain unsuccessful
+     * error codes wherein the Credential must refresh its token for a
+     * retry.
+     */
+    private final Credential wrappedCredential;
+
+    /**
+     * A sleeper; you can replace it with a mock in your test.
+     */
+    private final Sleeper sleeper;
+
+    /**
+     * A constructor.
+     *
+     * @param wrappedCredential Credential which will be wrapped and
+     * used for providing auth header.
+     */
+    public RetryHttpInitializerWrapper(final Credential wrappedCredential) {
+        this(wrappedCredential, Sleeper.DEFAULT);
+    }
+
+    /**
+     * A protected constructor only for testing.
+     *
+     * @param wrappedCredential Credential which will be wrapped and
+     * used for providing auth header.
+     * @param sleeper Sleeper for easy testing.
+     */
+    RetryHttpInitializerWrapper(
+            final Credential wrappedCredential, final Sleeper sleeper) {
+        this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
+        this.sleeper = sleeper;
+    }
+
+    /**
+     * Initializes the given request.
+     */
+    @Override
+    public final void initialize(final HttpRequest request) {
+        request.setReadTimeout(2 * ONEMINITUES); // 2 minutes read timeout
+        final HttpUnsuccessfulResponseHandler backoffHandler =
+                new HttpBackOffUnsuccessfulResponseHandler(
+                        new ExponentialBackOff())
+                        .setSleeper(sleeper);
+        request.setInterceptor(wrappedCredential);
+        request.setUnsuccessfulResponseHandler(
+                new HttpUnsuccessfulResponseHandler() {
+                    @Override
+                    public boolean handleResponse(
+                            final HttpRequest request,
+                            final HttpResponse response,
+                            final boolean supportsRetry) throws IOException {
+                        if (wrappedCredential.handleResponse(
+                                request, response, supportsRetry)) {
+                            // If credential decides it can handle it,
+                            // the return code or message indicated
+                            // something specific to authentication,
+                            // and no backoff is desired.
+                            return true;
+                        } else if (backoffHandler.handleResponse(
+                                request, response, supportsRetry)) {
+                            // Otherwise, we defer to the judgement of
+                            // our internal backoff handler.
+                            LOG.info("Retrying "
+                                    + request.getUrl().toString());
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    }
+                });
+        request.setIOExceptionHandler(
+                new HttpBackOffIOExceptionHandler(new ExponentialBackOff())
+                        .setSleeper(sleeper));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
new file mode 100644
index 0000000..2cf719a
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
@@ -0,0 +1,134 @@
+  /*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.utils;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.complete.game.UserScore;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
+import com.google.cloud.dataflow.sdk.options.GcpOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generate, format, and write BigQuery table row information. Use provided information about
+ * the field names and types, as well as lambda functions that describe how to generate their
+ * values.
+ */
+public class WriteToBigQuery<T>
+    extends PTransform<PCollection<T>, PDone> {
+
+  protected String tableName;
+  protected Map<String, FieldInfo<T>> fieldInfo;
+
+  public WriteToBigQuery() {
+  }
+
+  public WriteToBigQuery(String tableName,
+      Map<String, FieldInfo<T>> fieldInfo) {
+    this.tableName = tableName;
+    this.fieldInfo = fieldInfo;
+  }
+
+  /** Define a class to hold information about output table field definitions. */
+  public static class FieldInfo<T> implements Serializable {
+    // The BigQuery 'type' of the field
+    private String fieldType;
+    // A lambda function to generate the field value
+    private SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn;
+
+    public FieldInfo(String fieldType,
+        SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn) {
+      this.fieldType = fieldType;
+      this.fieldFn = fieldFn;
+    }
+
+    String getFieldType() {
+      return this.fieldType;
+    }
+
+    SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> getFieldFn() {
+      return this.fieldFn;
+    }
+  }
+  /** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */
+  protected class BuildRowFn extends DoFn<T, TableRow> {
+
+    @Override
+    public void processElement(ProcessContext c) {
+
+      TableRow row = new TableRow();
+      for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+          String key = entry.getKey();
+          FieldInfo<T> fcnInfo = entry.getValue();
+          SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
+            fcnInfo.getFieldFn();
+          row.set(key, fcn.apply(c));
+        }
+      c.output(row);
+    }
+  }
+
+  /** Build the output table schema. */
+  protected TableSchema getSchema() {
+    List<TableFieldSchema> fields = new ArrayList<>();
+    for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+      String key = entry.getKey();
+      FieldInfo<T> fcnInfo = entry.getValue();
+      String bqType = fcnInfo.getFieldType();
+      fields.add(new TableFieldSchema().setName(key).setType(bqType));
+    }
+    return new TableSchema().setFields(fields);
+  }
+
+  @Override
+  public PDone apply(PCollection<T> teamAndScore) {
+    return teamAndScore
+      .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
+      .apply(BigQueryIO.Write
+                .to(getTable(teamAndScore.getPipeline(),
+                    tableName))
+                .withSchema(getSchema())
+                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+  }
+
+  /** Utility to construct an output table reference. */
+  static TableReference getTable(Pipeline pipeline, String tableName) {
+    PipelineOptions options = pipeline.getOptions();
+    TableReference table = new TableReference();
+    table.setDatasetId(options.as(UserScore.Options.class).getDataset());
+    table.setProjectId(options.as(GcpOptions.class).getProject());
+    table.setTableId(tableName);
+    return table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
new file mode 100644
index 0000000..8433021
--- /dev/null
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -0,0 +1,76 @@
+  /*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game.utils;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+import java.util.Map;
+
+/**
+ * Generate, format, and write BigQuery table row information. Subclasses {@link WriteToBigQuery}
+ * to require windowing; so this subclass may be used for writes that require access to the
+ * context's window information.
+ */
+public class WriteWindowedToBigQuery<T>
+    extends WriteToBigQuery<T> {
+
+  public WriteWindowedToBigQuery(String tableName,
+      Map<String, FieldInfo<T>> fieldInfo) {
+    super(tableName, fieldInfo);
+  }
+
+  /** Convert each key/score pair into a BigQuery TableRow. */
+  protected class BuildRowFn extends DoFn<T, TableRow>
+      implements RequiresWindowAccess {
+
+    @Override
+    public void processElement(ProcessContext c) {
+
+      TableRow row = new TableRow();
+      for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
+          String key = entry.getKey();
+          FieldInfo<T> fcnInfo = entry.getValue();
+          SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
+            fcnInfo.getFieldFn();
+          row.set(key, fcn.apply(c));
+        }
+      c.output(row);
+    }
+  }
+
+  @Override
+  public PDone apply(PCollection<T> teamAndScore) {
+    return teamAndScore
+      .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
+      .apply(BigQueryIO.Write
+                .to(getTable(teamAndScore.getPipeline(),
+                    tableName))
+                .withSchema(getSchema())
+                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java
new file mode 100644
index 0000000..fcae41c
--- /dev/null
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.GcsOptions;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.FlatMapElements;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * To keep {@link MinimalWordCountJava8} simple, it is not factored or testable. This test
+ * file should be maintained with a copy of its code for a basic smoke test.
+ */
+@RunWith(JUnit4.class)
+public class MinimalWordCountJava8Test implements Serializable {
+
+  /**
+   * A basic smoke test that ensures there is no crash at pipeline construction time.
+   */
+  @Test
+  public void testMinimalWordCountJava8() throws Exception {
+    Pipeline p = TestPipeline.create();
+    p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
+
+    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
+     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
+         .withOutputType(new TypeDescriptor<String>() {}))
+     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+     .apply(Count.<String>perElement())
+     .apply(MapElements
+         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
+         .withOutputType(new TypeDescriptor<String>() {}))
+     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+  }
+
+  private GcsUtil buildMockGcsUtil() throws IOException {
+    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+    // Any request to open gets a new bogus channel
+    Mockito
+        .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+        .then(new Answer<SeekableByteChannel>() {
+          @Override
+          public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+            return FileChannel.open(
+                Files.createTempFile("channel-", ".tmp"),
+                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+          }
+        });
+
+    // Any request for expansion returns a list containing the original GcsPath
+    // This is required to pass validation that occurs in TextIO during apply()
+    Mockito
+        .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
+        .then(new Answer<List<GcsPath>>() {
+          @Override
+          public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+            return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+          }
+        });
+
+    return mockGcsUtil;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
----------------------------------------------------------------------
diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
new file mode 100644
index 0000000..f77d146
--- /dev/null
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.GameStats.CalculateSpammyUsers;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests of GameStats.
+ * Because the pipeline was designed for easy readability and explanations, it lacks good
+ * modularity for testing. See our testing documentation for better ideas:
+ * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline.
+ */
+@RunWith(JUnit4.class)
+public class GameStatsTest implements Serializable {
+
+  // User scores
+  static final List<KV<String, Integer>> USER_SCORES = Arrays.asList(
+    KV.of("Robot-2", 66), KV.of("Robot-1", 116), KV.of("user7_AndroidGreenKookaburra", 23),
+    KV.of("user7_AndroidGreenKookaburra", 1),
+    KV.of("user19_BisqueBilby", 14), KV.of("user13_ApricotQuokka", 15),
+    KV.of("user18_BananaEmu", 25), KV.of("user6_AmberEchidna", 8),
+    KV.of("user2_AmberQuokka", 6), KV.of("user0_MagentaKangaroo", 4),
+    KV.of("user0_MagentaKangaroo", 3), KV.of("user2_AmberCockatoo", 13),
+    KV.of("user7_AlmondWallaby", 15), KV.of("user6_AmberNumbat", 11),
+    KV.of("user6_AmberQuokka", 4));
+
+  // The expected list of 'spammers'.
+  static final List<KV<String, Integer>> SPAMMERS = Arrays.asList(
+      KV.of("Robot-2", 66), KV.of("Robot-1", 116));
+
+  /** Test the calculation of 'spammy users'. */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testCalculateSpammyUsers() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<KV<String, Integer>> input = p.apply(Create.of(USER_SCORES));
+    PCollection<KV<String, Integer>> output = input.apply(new CalculateSpammyUsers());
+
+    // Check the set of spammers.
+    DataflowAssert.that(output).containsInAnyOrder(SPAMMERS);
+
+    p.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
new file mode 100644
index 0000000..f77a5d4
--- /dev/null
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests of HourlyTeamScore.
+ * Because the pipeline was designed for easy readability and explanations, it lacks good
+ * modularity for testing. See our testing documentation for better ideas:
+ * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline.
+ */
+@RunWith(JUnit4.class)
+public class HourlyTeamScoreTest implements Serializable {
+
+  static final String[] GAME_EVENTS_ARRAY = new String[] {
+    "user0_MagentaKangaroo,MagentaKangaroo,3,1447955630000,2015-11-19 09:53:53.444",
+    "user13_ApricotQuokka,ApricotQuokka,15,1447955630000,2015-11-19 09:53:53.444",
+    "user6_AmberNumbat,AmberNumbat,11,1447955630000,2015-11-19 09:53:53.444",
+    "user7_AlmondWallaby,AlmondWallaby,15,1447955630000,2015-11-19 09:53:53.444",
+    "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,12,1447955630000,2015-11-19 09:53:53.444",
+    "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,11,1447955630000,2015-11-19 09:53:53.444",
+    "user19_BisqueBilby,BisqueBilby,6,1447955630000,2015-11-19 09:53:53.444",
+    "user19_BisqueBilby,BisqueBilby,8,1447955630000,2015-11-19 09:53:53.444",
+    // time gap...
+    "user0_AndroidGreenEchidna,AndroidGreenEchidna,0,1447965690000,2015-11-19 12:41:31.053",
+    "user0_MagentaKangaroo,MagentaKangaroo,4,1447965690000,2015-11-19 12:41:31.053",
+    "user2_AmberCockatoo,AmberCockatoo,13,1447965690000,2015-11-19 12:41:31.053",
+    "user18_BananaEmu,BananaEmu,7,1447965690000,2015-11-19 12:41:31.053",
+    "user3_BananaEmu,BananaEmu,17,1447965690000,2015-11-19 12:41:31.053",
+    "user18_BananaEmu,BananaEmu,1,1447965690000,2015-11-19 12:41:31.053",
+    "user18_ApricotCaneToad,ApricotCaneToad,14,1447965690000,2015-11-19 12:41:31.053"
+  };
+
+
+  static final List<String> GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY);
+
+
+  // Used to check the filtering.
+  static final KV[] FILTERED_EVENTS = new KV[] {
+      KV.of("user0_AndroidGreenEchidna", 0), KV.of("user0_MagentaKangaroo", 4),
+      KV.of("user2_AmberCockatoo", 13),
+      KV.of("user18_BananaEmu", 7), KV.of("user3_BananaEmu", 17),
+      KV.of("user18_BananaEmu", 1), KV.of("user18_ApricotCaneToad", 14)
+    };
+
+
+  /** Test the filtering. */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUserScoresFilter() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    final Instant startMinTimestamp = new Instant(1447965680000L);
+
+    PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+    PCollection<KV<String, Integer>> output = input
+      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+
+      .apply("FilterStartTime", Filter.byPredicate(
+          (GameActionInfo gInfo)
+              -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
+      // run a map to access the fields in the result.
+      .apply(MapElements
+          .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
+          .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
+
+      DataflowAssert.that(output).containsInAnyOrder(FILTERED_EVENTS);
+
+    p.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
new file mode 100644
index 0000000..641e2c3
--- /dev/null
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ExtractAndSumScore;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo;
+import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests of UserScore.
+ */
+@RunWith(JUnit4.class)
+public class UserScoreTest implements Serializable {
+
+  static final String[] GAME_EVENTS_ARRAY = new String[] {
+    "user0_MagentaKangaroo,MagentaKangaroo,3,1447955630000,2015-11-19 09:53:53.444",
+    "user13_ApricotQuokka,ApricotQuokka,15,1447955630000,2015-11-19 09:53:53.444",
+    "user6_AmberNumbat,AmberNumbat,11,1447955630000,2015-11-19 09:53:53.444",
+    "user7_AlmondWallaby,AlmondWallaby,15,1447955630000,2015-11-19 09:53:53.444",
+    "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,12,1447955630000,2015-11-19 09:53:53.444",
+    "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444",
+    "user7_AndroidGreenKookaburra,AndroidGreenKookaburra,11,1447955630000,2015-11-19 09:53:53.444",
+    "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444",
+    "user19_BisqueBilby,BisqueBilby,6,1447955630000,2015-11-19 09:53:53.444",
+    "user19_BisqueBilby,BisqueBilby,8,1447955630000,2015-11-19 09:53:53.444"
+  };
+
+    static final String[] GAME_EVENTS_ARRAY2 = new String[] {
+    "user6_AliceBlueDingo,AliceBlueDingo,4,xxxxxxx,2015-11-19 09:53:53.444",
+    "THIS IS A PARSE ERROR,2015-11-19 09:53:53.444",
+    "user13_BisqueBilby,BisqueBilby,xxx,1447955630000,2015-11-19 09:53:53.444"
+  };
+
+  static final List<String> GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY);
+  static final List<String> GAME_EVENTS2 = Arrays.asList(GAME_EVENTS_ARRAY2);
+
+  static final List<KV<String, Integer>> USER_SUMS = Arrays.asList(
+      KV.of("user0_MagentaKangaroo", 3), KV.of("user13_ApricotQuokka", 15),
+      KV.of("user6_AmberNumbat", 11), KV.of("user7_AlmondWallaby", 15),
+      KV.of("user7_AndroidGreenKookaburra", 23),
+      KV.of("user19_BisqueBilby", 14));
+
+  static final List<KV<String, Integer>> TEAM_SUMS = Arrays.asList(
+      KV.of("MagentaKangaroo", 3), KV.of("ApricotQuokka", 15),
+      KV.of("AmberNumbat", 11), KV.of("AlmondWallaby", 15),
+      KV.of("AndroidGreenKookaburra", 23),
+      KV.of("BisqueBilby", 14));
+
+  /** Test the ParseEventFn DoFn. */
+  @Test
+  public void testParseEventFn() {
+    DoFnTester<String, GameActionInfo> parseEventFn =
+        DoFnTester.of(new ParseEventFn());
+
+    List<GameActionInfo> results = parseEventFn.processBatch(GAME_EVENTS_ARRAY);
+    Assert.assertEquals(results.size(), 8);
+    Assert.assertEquals(results.get(0).getUser(), "user0_MagentaKangaroo");
+    Assert.assertEquals(results.get(0).getTeam(), "MagentaKangaroo");
+    Assert.assertEquals(results.get(0).getScore(), new Integer(3));
+  }
+
+  /** Tests ExtractAndSumScore("user"). */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUserScoreSums() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+    PCollection<KV<String, Integer>> output = input
+      .apply(ParDo.of(new ParseEventFn()))
+      // Extract and sum username/score pairs from the event data.
+      .apply("ExtractUserScore", new ExtractAndSumScore("user"));
+
+    // Check the user score sums.
+    DataflowAssert.that(output).containsInAnyOrder(USER_SUMS);
+
+    p.run();
+  }
+
+  /** Tests ExtractAndSumScore("team"). */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testTeamScoreSums() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of()));
+
+    PCollection<KV<String, Integer>> output = input
+      .apply(ParDo.of(new ParseEventFn()))
+      // Extract and sum teamname/score pairs from the event data.
+      .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
+
+    // Check the team score sums.
+    DataflowAssert.that(output).containsInAnyOrder(TEAM_SUMS);
+
+    p.run();
+  }
+
+  /** Test that bad input data is dropped appropriately. */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUserScoresBadInput() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of(GAME_EVENTS2).withCoder(StringUtf8Coder.of()));
+
+    PCollection<KV<String, Integer>> extract = input
+      .apply(ParDo.of(new ParseEventFn()))
+      .apply(
+          MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
+          .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
+
+    DataflowAssert.that(extract).empty();
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8a44ea9..1cd3a15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,15 @@
       </modules>
     </profile>
     <profile>
+      <id>java8-examples</id>
+      <activation>
+        <jdk>[1.8,)</jdk>
+      </activation>
+      <modules>
+        <module>java8examples</module>
+      </modules>
+    </profile>
+    <profile>
       <id>doclint-java8-disable</id>
       <activation>
         <jdk>[1.8,)</jdk>