You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:27 UTC
[40/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
deleted file mode 100644
index 18ff0a3..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.injector;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.BufferedOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-import java.util.TimeZone;
-
-
-/**
- * This is a generator that simulates usage data from a mobile game, and either publishes the data
- * to a pubsub topic or writes it to a file.
- *
- * <p> The general model used by the generator is the following. There is a set of teams with team
- * members. Each member is scoring points for their team. After some period, a team will dissolve
- * and a new one will be created in its place. There is also a set of 'Robots', or spammer users.
- * They hop from team to team. The robots are set to have a higher 'click rate' (generate more
- * events) than the regular team members.
- *
- * <p> Each generated line of data has the following form:
- * username,teamname,score,timestamp_in_ms,readable_time
- * e.g.:
- * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
- *
- * <p> The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
- * specified. It takes the following arguments:
- * {@code Injector project-name (topic-name|none) (filename|none)}.
- *
- * <p> To run the Injector in the mode where it publishes to PubSub, you will need to authenticate
- * locally using project-based service account credentials to avoid running over PubSub
- * quota.
- * See https://developers.google.com/identity/protocols/application-default-credentials
- * for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS
- * environment variable to point to your downloaded service account credentials before starting the
- * program, e.g.:
- * {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}.
- * If you do not do this, then your injector will only run for a few minutes on your
- * 'user account' credentials before you will start to see quota error messages like:
- * "Request throttled due to user QPS limit being reached", and see this exception:
- * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests".
- * Once you've set up your credentials, run the Injector like this":
- * <pre>{@code
- * Injector <project-name> <topic-name> none
- * }
- * </pre>
- * The pubsub topic will be created if it does not exist.
- *
- * <p> To run the injector in write-to-file-mode, set the topic name to "none" and specify the
- * filename:
- * <pre>{@code
- * Injector <project-name> none <filename>
- * }
- * </pre>
- */
-class Injector {
- private static Pubsub pubsub;
- private static Random random = new Random();
- private static String topic;
- private static String project;
- private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
-
- // QPS ranges from 800 to 1000.
- private static final int MIN_QPS = 800;
- private static final int QPS_RANGE = 200;
- // How long to sleep, in ms, between creation of the threads that make API requests to PubSub.
- private static final int THREAD_SLEEP_MS = 500;
-
- // Lists used to generate random team names.
- private static final ArrayList<String> COLORS =
- new ArrayList<String>(Arrays.asList(
- "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber",
- "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen",
- "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana",
- "Beige", "Bisque", "BarnRed", "BattleshipGrey"));
-
- private static final ArrayList<String> ANIMALS =
- new ArrayList<String>(Arrays.asList(
- "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu",
- "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus",
- "Bandicoot", "Cockatoo", "Antechinus"));
-
- // The list of live teams.
- private static ArrayList<TeamInfo> liveTeams = new ArrayList<TeamInfo>();
-
- private static DateTimeFormatter fmt =
- DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
- .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
-
-
- // The total number of robots in the system.
- private static final int NUM_ROBOTS = 20;
- // Determines the chance that a team will have a robot team member.
- private static final int ROBOT_PROBABILITY = 3;
- private static final int NUM_LIVE_TEAMS = 15;
- private static final int BASE_MEMBERS_PER_TEAM = 5;
- private static final int MEMBERS_PER_TEAM = 15;
- private static final int MAX_SCORE = 20;
- private static final int LATE_DATA_RATE = 5 * 60 * 2; // Every 10 minutes
- private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000; // 5-10 minute delay
- private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000;
-
- // The minimum time a 'team' can live.
- private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20;
- private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20;
-
-
- /**
- * A class for holding team info: the name of the team, when it started,
- * and the current team members. Teams may but need not include one robot team member.
- */
- private static class TeamInfo {
- String teamName;
- long startTimeInMillis;
- int expirationPeriod;
- // The team might but need not include 1 robot. Will be non-null if so.
- String robot;
- int numMembers;
-
- private TeamInfo(String teamName, long startTimeInMillis, String robot) {
- this.teamName = teamName;
- this.startTimeInMillis = startTimeInMillis;
- // How long until this team is dissolved.
- this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS) +
- BASE_TEAM_EXPIRATION_TIME_IN_MINS;
- this.robot = robot;
- // Determine the number of team members.
- numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM;
- }
-
- String getTeamName() {
- return teamName;
- }
- String getRobot() {
- return robot;
- }
-
- long getStartTimeInMillis() {
- return startTimeInMillis;
- }
- long getEndTimeInMillis() {
- return startTimeInMillis + (expirationPeriod * 60 * 1000);
- }
- String getRandomUser() {
- int userNum = random.nextInt(numMembers);
- return "user" + userNum + "_" + teamName;
- }
-
- int numMembers() {
- return numMembers;
- }
-
- @Override
- public String toString() {
- return "(" + teamName + ", num members: " + numMembers() + ", starting at: "
- + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")";
- }
- }
-
- /** Utility to grab a random element from an array of Strings. */
- private static String randomElement(ArrayList<String> list) {
- int index = random.nextInt(list.size());
- return list.get(index);
- }
-
- /**
- * Get and return a random team. If the selected team is too old w.r.t its expiration, remove
- * it, replacing it with a new team.
- */
- private static TeamInfo randomTeam(ArrayList<TeamInfo> list) {
- int index = random.nextInt(list.size());
- TeamInfo team = list.get(index);
- // If the selected team is expired, remove it and return a new team.
- long currTime = System.currentTimeMillis();
- if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) {
- System.out.println("\nteam " + team + " is too old; replacing.");
- System.out.println("start time: " + team.getStartTimeInMillis() +
- ", end time: " + team.getEndTimeInMillis() +
- ", current time:" + currTime);
- removeTeam(index);
- // Add a new team in its stead.
- return (addLiveTeam());
- } else {
- return team;
- }
- }
-
- /**
- * Create and add a team. Possibly add a robot to the team.
- */
- private static synchronized TeamInfo addLiveTeam() {
- String teamName = randomElement(COLORS) + randomElement(ANIMALS);
- String robot = null;
- // Decide if we want to add a robot to the team.
- if (random.nextInt(ROBOT_PROBABILITY) == 0) {
- robot = "Robot-" + random.nextInt(NUM_ROBOTS);
- }
- // Create the new team.
- TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot);
- liveTeams.add(newTeam);
- System.out.println("[+" + newTeam + "]");
- return newTeam;
- }
-
- /**
- * Remove a specific team.
- */
- private static synchronized void removeTeam(int teamIndex) {
- TeamInfo removedTeam = liveTeams.remove(teamIndex);
- System.out.println("[-" + removedTeam + "]");
- }
-
- /** Generate a user gaming event. */
- private static String generateEvent(Long currTime, int delayInMillis) {
- TeamInfo team = randomTeam(liveTeams);
- String teamName = team.getTeamName();
- String user;
- final int parseErrorRate = 900000;
-
- String robot = team.getRobot();
- // If the team has an associated robot team member...
- if (robot != null) {
- // Then use that robot for the message with some probability.
- // Set this probability to higher than that used to select any of the 'regular' team
- // members, so that if there is a robot on the team, it has a higher click rate.
- if (random.nextInt(team.numMembers() / 2) == 0) {
- user = robot;
- } else {
- user = team.getRandomUser();
- }
- } else { // No robot.
- user = team.getRandomUser();
- }
- String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE);
- // Randomly introduce occasional parse errors. You can see a custom counter tracking the number
- // of such errors in the Dataflow Monitoring UI, as the example pipeline runs.
- if (random.nextInt(parseErrorRate) == 0) {
- System.out.println("Introducing a parse error.");
- event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR";
- }
- return addTimeInfoToEvent(event, currTime, delayInMillis);
- }
-
- /**
- * Add time info to a generated gaming event.
- */
- private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) {
- String eventTimeString =
- Long.toString((currTime - delayInMillis) / 1000 * 1000);
- // Add a (redundant) 'human-readable' date string to make the data semantics more clear.
- String dateString = fmt.print(currTime);
- message = message + "," + eventTimeString + "," + dateString;
- return message;
- }
-
- /**
- * Publish 'numMessages' arbitrary events from live users with the provided delay, to a
- * PubSub topic.
- */
- public static void publishData(int numMessages, int delayInMillis)
- throws IOException {
- List<PubsubMessage> pubsubMessages = new ArrayList<>();
-
- for (int i = 0; i < Math.max(1, numMessages); i++) {
- Long currTime = System.currentTimeMillis();
- String message = generateEvent(currTime, delayInMillis);
- PubsubMessage pubsubMessage = new PubsubMessage()
- .encodeData(message.getBytes("UTF-8"));
- pubsubMessage.setAttributes(
- ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
- Long.toString((currTime - delayInMillis) / 1000 * 1000)));
- if (delayInMillis != 0) {
- System.out.println(pubsubMessage.getAttributes());
- System.out.println("late data for: " + message);
- }
- pubsubMessages.add(pubsubMessage);
- }
-
- PublishRequest publishRequest = new PublishRequest();
- publishRequest.setMessages(pubsubMessages);
- pubsub.projects().topics().publish(topic, publishRequest).execute();
- }
-
- /**
- * Publish generated events to a file.
- */
- public static void publishDataToFile(String fileName, int numMessages, int delayInMillis)
- throws IOException {
- PrintWriter out = new PrintWriter(new OutputStreamWriter(
- new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8"));
-
- try {
- for (int i = 0; i < Math.max(1, numMessages); i++) {
- Long currTime = System.currentTimeMillis();
- String message = generateEvent(currTime, delayInMillis);
- out.println(message);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (out != null) {
- out.flush();
- out.close();
- }
- }
- }
-
-
- public static void main(String[] args) throws IOException, InterruptedException {
- if (args.length < 3) {
- System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)");
- System.exit(1);
- }
- boolean writeToFile = false;
- boolean writeToPubsub = true;
- project = args[0];
- String topicName = args[1];
- String fileName = args[2];
- // The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
- // specified; otherwise, it will try to write to a file.
- if (topicName.equalsIgnoreCase("none")) {
- writeToFile = true;
- writeToPubsub = false;
- }
- if (writeToPubsub) {
- // Create the PubSub client.
- pubsub = InjectorUtils.getClient();
- // Create the PubSub topic as necessary.
- topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName);
- InjectorUtils.createTopic(pubsub, topic);
- System.out.println("Injecting to topic: " + topic);
- } else {
- if (fileName.equalsIgnoreCase("none")) {
- System.out.println("Filename not specified.");
- System.exit(1);
- }
- System.out.println("Writing to file: " + fileName);
- }
- System.out.println("Starting Injector");
-
- // Start off with some random live teams.
- while (liveTeams.size() < NUM_LIVE_TEAMS) {
- addLiveTeam();
- }
-
- // Publish messages at a rate determined by the QPS and Thread sleep settings.
- for (int i = 0; true; i++) {
- if (Thread.activeCount() > 10) {
- System.err.println("I'm falling behind!");
- }
-
- // Decide if this should be a batch of late data.
- final int numMessages;
- final int delayInMillis;
- if (i % LATE_DATA_RATE == 0) {
- // Insert delayed data for one user (one message only)
- delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS);
- numMessages = 1;
- System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")");
- } else {
- System.out.print(".");
- delayInMillis = 0;
- numMessages = MIN_QPS + random.nextInt(QPS_RANGE);
- }
-
- if (writeToFile) { // Won't use threading for the file write.
- publishDataToFile(fileName, numMessages, delayInMillis);
- } else { // Write to PubSub.
- // Start a thread to inject some data.
- new Thread(){
- @Override
- public void run() {
- try {
- publishData(numMessages, delayInMillis);
- } catch (IOException e) {
- System.err.println(e);
- }
- }
- }.start();
- }
-
- // Wait before creating another injector thread.
- Thread.sleep(THREAD_SLEEP_MS);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
deleted file mode 100644
index db58650..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.injector;
-
-
-import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.util.Utils;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpStatusCodes;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.PubsubScopes;
-import com.google.api.services.pubsub.model.Topic;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
-class InjectorUtils {
-
- private static final String APP_NAME = "injector";
-
- /**
- * Builds a new Pubsub client and returns it.
- */
- public static Pubsub getClient(final HttpTransport httpTransport,
- final JsonFactory jsonFactory)
- throws IOException {
- Preconditions.checkNotNull(httpTransport);
- Preconditions.checkNotNull(jsonFactory);
- GoogleCredential credential =
- GoogleCredential.getApplicationDefault(httpTransport, jsonFactory);
- if (credential.createScopedRequired()) {
- credential = credential.createScoped(PubsubScopes.all());
- }
- if (credential.getClientAuthentication() != null) {
- System.out.println("\n***Warning! You are not using service account credentials to "
- + "authenticate.\nYou need to use service account credentials for this example,"
- + "\nsince user-level credentials do not have enough pubsub quota,\nand so you will run "
- + "out of PubSub quota very quickly.\nSee "
- + "https://developers.google.com/identity/protocols/application-default-credentials.");
- System.exit(1);
- }
- HttpRequestInitializer initializer =
- new RetryHttpInitializerWrapper(credential);
- return new Pubsub.Builder(httpTransport, jsonFactory, initializer)
- .setApplicationName(APP_NAME)
- .build();
- }
-
- /**
- * Builds a new Pubsub client with default HttpTransport and
- * JsonFactory and returns it.
- */
- public static Pubsub getClient() throws IOException {
- return getClient(Utils.getDefaultTransport(),
- Utils.getDefaultJsonFactory());
- }
-
-
- /**
- * Returns the fully qualified topic name for Pub/Sub.
- */
- public static String getFullyQualifiedTopicName(
- final String project, final String topic) {
- return String.format("projects/%s/topics/%s", project, topic);
- }
-
- /**
- * Create a topic if it doesn't exist.
- */
- public static void createTopic(Pubsub client, String fullTopicName)
- throws IOException {
- try {
- client.projects().topics().get(fullTopicName).execute();
- } catch (GoogleJsonResponseException e) {
- if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
- Topic topic = client.projects().topics()
- .create(fullTopicName, new Topic())
- .execute();
- System.out.printf("Topic %s was created.\n", topic.getName());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
deleted file mode 100644
index 9d58837..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.injector;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.http.HttpBackOffIOExceptionHandler;
-import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
-import com.google.api.client.http.HttpRequest;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpResponse;
-import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
-import com.google.api.client.util.ExponentialBackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.util.logging.Logger;
-
-/**
- * RetryHttpInitializerWrapper will automatically retry upon RPC
- * failures, preserving the auto-refresh behavior of the Google
- * Credentials.
- */
-public class RetryHttpInitializerWrapper implements HttpRequestInitializer {
-
- /**
- * A private logger.
- */
- private static final Logger LOG =
- Logger.getLogger(RetryHttpInitializerWrapper.class.getName());
-
- /**
- * One minutes in miliseconds.
- */
- private static final int ONEMINITUES = 60000;
-
- /**
- * Intercepts the request for filling in the "Authorization"
- * header field, as well as recovering from certain unsuccessful
- * error codes wherein the Credential must refresh its token for a
- * retry.
- */
- private final Credential wrappedCredential;
-
- /**
- * A sleeper; you can replace it with a mock in your test.
- */
- private final Sleeper sleeper;
-
- /**
- * A constructor.
- *
- * @param wrappedCredential Credential which will be wrapped and
- * used for providing auth header.
- */
- public RetryHttpInitializerWrapper(final Credential wrappedCredential) {
- this(wrappedCredential, Sleeper.DEFAULT);
- }
-
- /**
- * A protected constructor only for testing.
- *
- * @param wrappedCredential Credential which will be wrapped and
- * used for providing auth header.
- * @param sleeper Sleeper for easy testing.
- */
- RetryHttpInitializerWrapper(
- final Credential wrappedCredential, final Sleeper sleeper) {
- this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
- this.sleeper = sleeper;
- }
-
- /**
- * Initializes the given request.
- */
- @Override
- public final void initialize(final HttpRequest request) {
- request.setReadTimeout(2 * ONEMINITUES); // 2 minutes read timeout
- final HttpUnsuccessfulResponseHandler backoffHandler =
- new HttpBackOffUnsuccessfulResponseHandler(
- new ExponentialBackOff())
- .setSleeper(sleeper);
- request.setInterceptor(wrappedCredential);
- request.setUnsuccessfulResponseHandler(
- new HttpUnsuccessfulResponseHandler() {
- @Override
- public boolean handleResponse(
- final HttpRequest request,
- final HttpResponse response,
- final boolean supportsRetry) throws IOException {
- if (wrappedCredential.handleResponse(
- request, response, supportsRetry)) {
- // If credential decides it can handle it,
- // the return code or message indicated
- // something specific to authentication,
- // and no backoff is desired.
- return true;
- } else if (backoffHandler.handleResponse(
- request, response, supportsRetry)) {
- // Otherwise, we defer to the judgement of
- // our internal backoff handler.
- LOG.info("Retrying "
- + request.getUrl().toString());
- return true;
- } else {
- return false;
- }
- }
- });
- request.setIOExceptionHandler(
- new HttpBackOffIOExceptionHandler(new ExponentialBackOff())
- .setSleeper(sleeper));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
deleted file mode 100644
index 4bcfb72..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
+++ /dev/null
@@ -1,135 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.utils;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.complete.game.UserScore;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Generate, format, and write BigQuery table row information. Use provided information about
- * the field names and types, as well as lambda functions that describe how to generate their
- * values.
- */
-public class WriteToBigQuery<T>
- extends PTransform<PCollection<T>, PDone> {
-
- protected String tableName;
- protected Map<String, FieldInfo<T>> fieldInfo;
-
- public WriteToBigQuery() {
- }
-
- public WriteToBigQuery(String tableName,
- Map<String, FieldInfo<T>> fieldInfo) {
- this.tableName = tableName;
- this.fieldInfo = fieldInfo;
- }
-
- /** Define a class to hold information about output table field definitions. */
- public static class FieldInfo<T> implements Serializable {
- // The BigQuery 'type' of the field
- private String fieldType;
- // A lambda function to generate the field value
- private SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn;
-
- public FieldInfo(String fieldType,
- SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fieldFn) {
- this.fieldType = fieldType;
- this.fieldFn = fieldFn;
- }
-
- String getFieldType() {
- return this.fieldType;
- }
-
- SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> getFieldFn() {
- return this.fieldFn;
- }
- }
- /** Convert each key/score pair into a BigQuery TableRow as specified by fieldFn. */
- protected class BuildRowFn extends DoFn<T, TableRow> {
-
- @Override
- public void processElement(ProcessContext c) {
-
- TableRow row = new TableRow();
- for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
- String key = entry.getKey();
- FieldInfo<T> fcnInfo = entry.getValue();
- SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
- fcnInfo.getFieldFn();
- row.set(key, fcn.apply(c));
- }
- c.output(row);
- }
- }
-
- /** Build the output table schema. */
- protected TableSchema getSchema() {
- List<TableFieldSchema> fields = new ArrayList<>();
- for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
- String key = entry.getKey();
- FieldInfo<T> fcnInfo = entry.getValue();
- String bqType = fcnInfo.getFieldType();
- fields.add(new TableFieldSchema().setName(key).setType(bqType));
- }
- return new TableSchema().setFields(fields);
- }
-
- @Override
- public PDone apply(PCollection<T> teamAndScore) {
- return teamAndScore
- .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
- .apply(BigQueryIO.Write
- .to(getTable(teamAndScore.getPipeline(),
- tableName))
- .withSchema(getSchema())
- .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(WriteDisposition.WRITE_APPEND));
- }
-
- /** Utility to construct an output table reference. */
- static TableReference getTable(Pipeline pipeline, String tableName) {
- PipelineOptions options = pipeline.getOptions();
- TableReference table = new TableReference();
- table.setDatasetId(options.as(UserScore.Options.class).getDataset());
- table.setProjectId(options.as(GcpOptions.class).getProject());
- table.setTableId(tableName);
- return table;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
deleted file mode 100644
index 41257ca..0000000
--- a/examples/java8/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ /dev/null
@@ -1,77 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete.game.utils;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-import java.util.Map;
-
-/**
- * Generate, format, and write BigQuery table row information. Subclasses {@link WriteToBigQuery}
- * to require windowing; so this subclass may be used for writes that require access to the
- * context's window information.
- */
-public class WriteWindowedToBigQuery<T>
- extends WriteToBigQuery<T> {
-
- public WriteWindowedToBigQuery(String tableName,
- Map<String, FieldInfo<T>> fieldInfo) {
- super(tableName, fieldInfo);
- }
-
- /** Convert each key/score pair into a BigQuery TableRow. */
- protected class BuildRowFn extends DoFn<T, TableRow>
- implements RequiresWindowAccess {
-
- @Override
- public void processElement(ProcessContext c) {
-
- TableRow row = new TableRow();
- for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
- String key = entry.getKey();
- FieldInfo<T> fcnInfo = entry.getValue();
- SerializableFunction<DoFn<T, TableRow>.ProcessContext, Object> fcn =
- fcnInfo.getFieldFn();
- row.set(key, fcn.apply(c));
- }
- c.output(row);
- }
- }
-
- @Override
- public PDone apply(PCollection<T> teamAndScore) {
- return teamAndScore
- .apply(ParDo.named("ConvertToRow").of(new BuildRowFn()))
- .apply(BigQueryIO.Write
- .to(getTable(teamAndScore.getPipeline(),
- tableName))
- .withSchema(getSchema())
- .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(WriteDisposition.WRITE_APPEND));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
new file mode 100644
index 0000000..8705ed3
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.FlatMapElements;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import java.util.Arrays;
+
+/**
+ * An example that counts words in Shakespeare, using Java 8 language features.
+ *
+ * <p>See {@link MinimalWordCount} for a comprehensive explanation.
+ */
+public class MinimalWordCountJava8 {
+
+ public static void main(String[] args) {
+ DataflowPipelineOptions options = PipelineOptionsFactory.create()
+ .as(DataflowPipelineOptions.class);
+
+ options.setRunner(BlockingDataflowPipelineRunner.class);
+
+ // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
+ options.setProject("SET_YOUR_PROJECT_ID_HERE");
+
+ // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
+ options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
+ .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
+ .withOutputType(new TypeDescriptor<String>() {}))
+ .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+ .apply(Count.<String>perElement())
+ .apply(MapElements
+ .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
+ .withOutputType(new TypeDescriptor<String>() {}))
+
+ // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
+ .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
new file mode 100644
index 0000000..89832b2
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.Mean;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This class is the fourth in a series of four pipelines that tell a story in a 'gaming'
+ * domain, following {@link UserScore}, {@link HourlyTeamScore}, and {@link LeaderBoard}.
+ * New concepts: session windows and finding session duration; use of both
+ * singleton and non-singleton side inputs.
+ *
+ * <p> This pipeline builds on the {@link LeaderBoard} functionality, and adds some "business
+ * intelligence" analysis: abuse detection and usage patterns. The pipeline derives the Mean user
+ * score sum for a window, and uses that information to identify likely spammers/robots. (The robots
+ * have a higher click rate than the human users). The 'robot' users are then filtered out when
+ * calculating the team scores.
+ *
+ * <p> Additionally, user sessions are tracked: that is, we find bursts of user activity using
+ * session windows. Then, the mean session duration information is recorded in the context of
+ * subsequent fixed windowing. (This could be used to tell us what games are giving us greater
+ * user retention).
+ *
+ * <p> Run {@code com.google.cloud.dataflow.examples.complete.game.injector.Injector} to generate
+ * pubsub data for this pipeline. The {@code Injector} documentation provides more detail.
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
+ * like this:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * --dataset=YOUR-DATASET
+ * --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist. The PubSub topic you specify should
+ * be the same topic to which the Injector is publishing.
+ */
+public class GameStats extends LeaderBoard {
+
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
+
+ private static DateTimeFormatter fmt =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+ .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+ /**
+ * Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs.
+ * We do this by finding the mean total score per user, then using that information as a side
+ * input to filter out all but those user scores that are > (mean * SCORE_WEIGHT)
+ */
+ // [START DocInclude_AbuseDetect]
+ public static class CalculateSpammyUsers
+ extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
+ private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
+ private static final double SCORE_WEIGHT = 2.5;
+
+ @Override
+ public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {
+
+ // Get the sum of scores for each user.
+ PCollection<KV<String, Integer>> sumScores = userScores
+ .apply("UserSum", Sum.<String>integersPerKey());
+
+ // Extract the score from each element, and use it to find the global mean.
+ final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
+ .apply(Mean.<Integer>globally().asSingletonView());
+
+ // Filter the user sums using the global mean.
+ PCollection<KV<String, Integer>> filtered = sumScores
+ .apply(ParDo
+ .named("ProcessAndFilter")
+ // use the derived mean total score as a side input
+ .withSideInputs(globalMeanScore)
+ .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+ private final Aggregator<Long, Long> numSpammerUsers =
+ createAggregator("SpammerUsers", new Sum.SumLongFn());
+ @Override
+ public void processElement(ProcessContext c) {
+ Integer score = c.element().getValue();
+ Double gmc = c.sideInput(globalMeanScore);
+ if (score > (gmc * SCORE_WEIGHT)) {
+ LOG.info("user " + c.element().getKey() + " spammer score " + score
+ + " with mean " + gmc);
+ numSpammerUsers.addValue(1L);
+ c.output(c.element());
+ }
+ }
+ }));
+ return filtered;
+ }
+ }
+ // [END DocInclude_AbuseDetect]
+
+ /**
+ * Calculate and output an element's session duration.
+ */
+ private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, Integer>
+ implements RequiresWindowAccess {
+
+ @Override
+ public void processElement(ProcessContext c) {
+ IntervalWindow w = (IntervalWindow) c.window();
+ int duration = new Duration(
+ w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes();
+ c.output(duration);
+ }
+ }
+
+
+ /**
+ * Options supported by {@link GameStats}.
+ */
+ static interface Options extends LeaderBoard.Options {
+ @Description("Numeric value of fixed window duration for user analysis, in minutes")
+ @Default.Integer(60)
+ Integer getFixedWindowDuration();
+ void setFixedWindowDuration(Integer value);
+
+ @Description("Numeric value of gap between user sessions, in minutes")
+ @Default.Integer(5)
+ Integer getSessionGap();
+ void setSessionGap(Integer value);
+
+ @Description("Numeric value of fixed window for finding mean of user session duration, "
+ + "in minutes")
+ @Default.Integer(30)
+ Integer getUserActivityWindowDuration();
+ void setUserActivityWindowDuration(Integer value);
+
+ @Description("Prefix used for the BigQuery table names")
+ @Default.String("game_stats")
+ String getTablePrefix();
+ void setTablePrefix(String value);
+ }
+
+
+ /**
+ * Create a map of information that describes how to write pipeline output to BigQuery. This map
+ * is used to write information about team score sums.
+ */
+ protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
+ configureWindowedWrite() {
+ Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+ new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
+ tableConfigure.put("team",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+ c -> c.element().getKey()));
+ tableConfigure.put("total_score",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
+ c -> c.element().getValue()));
+ tableConfigure.put("window_start",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+ c -> { IntervalWindow w = (IntervalWindow) c.window();
+ return fmt.print(w.start()); }));
+ tableConfigure.put("processing_time",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", c -> fmt.print(Instant.now())));
+ return tableConfigure;
+ }
+
+ /**
+ * Create a map of information that describes how to write pipeline output to BigQuery. This map
+ * is used to write information about mean user session time.
+ */
+ protected static Map<String, WriteWindowedToBigQuery.FieldInfo<Double>>
+ configureSessionWindowWrite() {
+
+ Map<String, WriteWindowedToBigQuery.FieldInfo<Double>> tableConfigure =
+ new HashMap<String, WriteWindowedToBigQuery.FieldInfo<Double>>();
+ tableConfigure.put("window_start",
+ new WriteWindowedToBigQuery.FieldInfo<Double>("STRING",
+ c -> { IntervalWindow w = (IntervalWindow) c.window();
+ return fmt.print(w.start()); }));
+ tableConfigure.put("mean_duration",
+ new WriteWindowedToBigQuery.FieldInfo<Double>("FLOAT", c -> c.element()));
+ return tableConfigure;
+ }
+
+
+
+ public static void main(String[] args) throws Exception {
+
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ // Enforce that this pipeline is always run in streaming mode.
+ options.setStreaming(true);
+ // Allow the pipeline to be cancelled automatically.
+ options.setRunner(DataflowPipelineRunner.class);
+ DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ Pipeline pipeline = Pipeline.create(options);
+
+ // Read Events from Pub/Sub using custom timestamps
+ PCollection<GameActionInfo> rawEvents = pipeline
+ .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+ .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+
+ // Extract username/score pairs from the event stream
+ PCollection<KV<String, Integer>> userEvents =
+ rawEvents.apply("ExtractUserScore",
+ MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
+ .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
+
+ // Calculate the total score per user over fixed windows, and
+ // cumulative updates for late data.
+ final PCollectionView<Map<String, Integer>> spammersView = userEvents
+ .apply(Window.named("FixedWindowsUser")
+ .<KV<String, Integer>>into(FixedWindows.of(
+ Duration.standardMinutes(options.getFixedWindowDuration())))
+ )
+
+ // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
+ // These might be robots/spammers.
+ .apply("CalculateSpammyUsers", new CalculateSpammyUsers())
+ // Derive a view from the collection of spammer users. It will be used as a side input
+ // in calculating the team score sums, below.
+ .apply("CreateSpammersView", View.<String, Integer>asMap());
+
+ // [START DocInclude_FilterAndCalc]
+ // Calculate the total score per team over fixed windows,
+ // and emit cumulative updates for late data. Uses the side input derived above-- the set of
+ // suspected robots-- to filter out scores from those users from the sum.
+ // Write the results to BigQuery.
+ rawEvents
+ .apply(Window.named("WindowIntoFixedWindows")
+ .<GameActionInfo>into(FixedWindows.of(
+ Duration.standardMinutes(options.getFixedWindowDuration())))
+ )
+ // Filter out the detected spammer users, using the side input derived above.
+ .apply(ParDo.named("FilterOutSpammers")
+ .withSideInputs(spammersView)
+ .of(new DoFn<GameActionInfo, GameActionInfo>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ // If the user is not in the spammers Map, output the data element.
+ if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
+ c.output(c.element());
+ }
+ }
+ }))
+ // Extract and sum teamname/score pairs from the event data.
+ .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+ // [END DocInclude_FilterAndCalc]
+ // Write the result to BigQuery
+ .apply("WriteTeamSums",
+ new WriteWindowedToBigQuery<KV<String, Integer>>(
+ options.getTablePrefix() + "_team", configureWindowedWrite()));
+
+
+ // [START DocInclude_SessionCalc]
+ // Detect user sessions-- that is, a burst of activity separated by a gap from further
+ // activity. Find and record the mean session lengths.
+ // This information could help the game designers track the changing user engagement
+ // as their set of games changes.
+ userEvents
+ .apply(Window.named("WindowIntoSessions")
+ .<KV<String, Integer>>into(
+ Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+ // For this use, we care only about the existence of the session, not any particular
+ // information aggregated over it, so the following is an efficient way to do that.
+ .apply(Combine.perKey(x -> 0))
+ // Get the duration per session.
+ .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
+ // [END DocInclude_SessionCalc]
+ // [START DocInclude_Rewindow]
+ // Re-window to process groups of session sums according to when the sessions complete.
+ .apply(Window.named("WindowToExtractSessionMean")
+ .<Integer>into(
+ FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
+ // Find the mean session duration in each window.
+ .apply(Mean.<Integer>globally().withoutDefaults())
+ // Write this info to a BigQuery table.
+ .apply("WriteAvgSessionLength",
+ new WriteWindowedToBigQuery<Double>(
+ options.getTablePrefix() + "_sessions", configureSessionWindowWrite()));
+ // [END DocInclude_Rewindow]
+
+
+ // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+ // command line.
+ PipelineResult result = pipeline.run();
+ dataflowUtils.waitToFinish(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
new file mode 100644
index 0000000..58944c5
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.WithTimestamps;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This class is the second in a series of four pipelines that tell a story in a 'gaming'
+ * domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore},
+ * new concepts include: windowing and element timestamps; use of {@code Filter.byPredicate()}.
+ *
+ * <p> This pipeline processes data collected from gaming events in batch, building on {@link
+ * UserScore} but using fixed windows. It calculates the sum of scores per team, for each window,
+ * optionally allowing specification of two timestamps before and after which data is filtered out.
+ * This allows a model where late data collected after the intended analysis window can be included,
+ * and any late-arriving data prior to the beginning of the analysis window can be removed as well.
+ * By using windowing and adding element timestamps, we can do finer-grained analysis than with the
+ * {@link UserScore} pipeline. However, our batch processing is high-latency, in that we don't get
+ * results from plays at the beginning of the batch's time period until the batch is processed.
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
+ * like this:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * --dataset=YOUR-DATASET
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist.
+ *
+ * <p> Optionally include {@code --input} to specify the batch input file path.
+ * To indicate a time after which the data should be filtered out, include the
+ * {@code --stopMin} arg. E.g., {@code --stopMin=2015-10-18-23-59} indicates that any data
+ * timestamped after 23:59 PST on 2015-10-18 should not be included in the analysis.
+ * To indicate a time before which data should be filtered out, include the {@code --startMin} arg.
+ * If you're using the default input specified in {@link UserScore},
+ * "gs://dataflow-samples/game/gaming_data*.csv", then
+ * {@code --startMin=2015-11-16-16-10 --stopMin=2015-11-17-16-10} are good values.
+ */
+public class HourlyTeamScore extends UserScore {
+
+ private static DateTimeFormatter fmt =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+ .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+ private static DateTimeFormatter minFmt =
+ DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm")
+ .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+
+
+ /**
+ * Options supported by {@link HourlyTeamScore}.
+ */
+ static interface Options extends UserScore.Options {
+
+ @Description("Numeric value of fixed window duration, in minutes")
+ @Default.Integer(60)
+ Integer getWindowDuration();
+ void setWindowDuration(Integer value);
+
+ @Description("String representation of the first minute after which to generate results,"
+ + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST."
+ + "Any input data timestamped prior to that minute won't be included in the sums.")
+ @Default.String("1970-01-01-00-00")
+ String getStartMin();
+ void setStartMin(String value);
+
+ @Description("String representation of the first minute for which to not generate results,"
+ + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST."
+ + "Any input data timestamped after that minute won't be included in the sums.")
+ @Default.String("2100-01-01-00-00")
+ String getStopMin();
+ void setStopMin(String value);
+
+ @Description("The BigQuery table name. Should not already exist.")
+ @Default.String("hourly_team_score")
+ String getTableName();
+ void setTableName(String value);
+ }
+
+ /**
+ * Create a map of information that describes how to write pipeline output to BigQuery. This map
+ * is passed to the {@link WriteWindowedToBigQuery} constructor to write team score sums and
+ * includes information about window start time.
+ */
+ protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
+ configureWindowedTableWrite() {
+ Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfig =
+ new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
+ tableConfig.put("team",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+ c -> c.element().getKey()));
+ tableConfig.put("total_score",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
+ c -> c.element().getValue()));
+ tableConfig.put("window_start",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+ c -> { IntervalWindow w = (IntervalWindow) c.window();
+ return fmt.print(w.start()); }));
+ return tableConfig;
+ }
+
+
+ /**
+ * Run a batch pipeline to do windowed analysis of the data.
+ */
+ // [START DocInclude_HTSMain]
+ public static void main(String[] args) throws Exception {
+ // Begin constructing a pipeline configured by commandline flags.
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline pipeline = Pipeline.create(options);
+
+ final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
+ final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));
+
+ // Read 'gaming' events from a text file.
+ pipeline.apply(TextIO.Read.from(options.getInput()))
+ // Parse the incoming data.
+ .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
+
+ // Filter out data before and after the given times so that it is not included
+ // in the calculations. As we collect data in batches (say, by day), the batch for the day
+ // that we want to analyze could potentially include some late-arriving data from the previous
+ // day. If so, we want to weed it out. Similarly, if we include data from the following day
+ // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
+ // that fall after the time period we want to analyze.
+ // [START DocInclude_HTSFilters]
+ .apply("FilterStartTime", Filter.byPredicate(
+ (GameActionInfo gInfo)
+ -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
+ .apply("FilterEndTime", Filter.byPredicate(
+ (GameActionInfo gInfo)
+ -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
+ // [END DocInclude_HTSFilters]
+
+ // [START DocInclude_HTSAddTsAndWindow]
+ // Add an element timestamp based on the event log, and apply fixed windowing.
+ .apply("AddEventTimestamps",
+ WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
+ .apply(Window.named("FixedWindowsTeam")
+ .<GameActionInfo>into(FixedWindows.of(
+ Duration.standardMinutes(options.getWindowDuration()))))
+ // [END DocInclude_HTSAddTsAndWindow]
+
+ // Extract and sum teamname/score pairs from the event data.
+ .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+ .apply("WriteTeamScoreSums",
+ new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
+ configureWindowedTableWrite()));
+
+
+ pipeline.run();
+ }
+ // [END DocInclude_HTSMain]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
new file mode 100644
index 0000000..cedd696
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete.game;
+
+import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery;
+import com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+/**
+ * This class is the third in a series of four pipelines that tell a story in a 'gaming' domain,
+ * following {@link UserScore} and {@link HourlyTeamScore}. Concepts include: processing unbounded
+ * data using fixed windows; use of custom timestamps and event-time processing; generation of
+ * early/speculative results; using .accumulatingFiredPanes() to do cumulative processing of late-
+ * arriving data.
+ *
+ * <p> This pipeline processes an unbounded stream of 'game events'. The calculation of the team
+ * scores uses fixed windowing based on event time (the time of the game play event), not
+ * processing time (the time that an event is processed by the pipeline). The pipeline calculates
+ * the sum of scores per team, for each window. By default, the team scores are calculated using
+ * one-hour windows.
+ *
+ * <p> In contrast-- to demo another windowing option-- the user scores are calculated using a
+ * global window, which periodically (every ten minutes) emits cumulative user score sums.
+ *
+ * <p> In contrast to the previous pipelines in the series, which used static, finite input data,
+ * here we're using an unbounded data source, which lets us provide speculative results, and allows
+ * handling of late data, at much lower latency. We can use the early/speculative results to keep a
+ * 'leaderboard' updated in near-realtime. Our handling of late data lets us generate correct
+ * results, e.g. for 'team prizes'. We're now outputing window results as they're
+ * calculated, giving us much lower latency than with the previous batch examples.
+ *
+ * <p> Run {@link injector.Injector} to generate pubsub data for this pipeline. The Injector
+ * documentation provides more detail on how to do this.
+ *
+ * <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
+ * like this:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * --dataset=YOUR-DATASET
+ * --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
+ * }
+ * </pre>
+ * where the BigQuery dataset you specify must already exist.
+ * The PubSub topic you specify should be the same topic to which the Injector is publishing.
+ */
+public class LeaderBoard extends HourlyTeamScore {
+
+ private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
+
+ private static DateTimeFormatter fmt =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+ .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
+ static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
+ static final Duration TEN_MINUTES = Duration.standardMinutes(10);
+
+
+ /**
+ * Options supported by {@link LeaderBoard}.
+ */
+ static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions {
+
+ @Description("Pub/Sub topic to read from")
+ @Validation.Required
+ String getTopic();
+ void setTopic(String value);
+
+ @Description("Numeric value of fixed window duration for team analysis, in minutes")
+ @Default.Integer(60)
+ Integer getTeamWindowDuration();
+ void setTeamWindowDuration(Integer value);
+
+ @Description("Numeric value of allowed data lateness, in minutes")
+ @Default.Integer(120)
+ Integer getAllowedLateness();
+ void setAllowedLateness(Integer value);
+
+ @Description("Prefix used for the BigQuery table names")
+ @Default.String("leaderboard")
+ String getTableName();
+ void setTableName(String value);
+ }
+
+ /**
+ * Create a map of information that describes how to write pipeline output to BigQuery. This map
+ * is used to write team score sums and includes event timing information.
+ */
+ protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
+ configureWindowedTableWrite() {
+
+ Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+ new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>();
+ tableConfigure.put("team",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+ c -> c.element().getKey()));
+ tableConfigure.put("total_score",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
+ c -> c.element().getValue()));
+ tableConfigure.put("window_start",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
+ c -> { IntervalWindow w = (IntervalWindow) c.window();
+ return fmt.print(w.start()); }));
+ tableConfigure.put("processing_time",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", c -> fmt.print(Instant.now())));
+ tableConfigure.put("timing",
+ new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", c -> c.pane().getTiming().toString()));
+ return tableConfigure;
+ }
+
+ /**
+ * Create a map of information that describes how to write pipeline output to BigQuery. This map
+ * is used to write user score sums.
+ */
+ protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
+ configureGlobalWindowBigQueryWrite() {
+
+ Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
+ configureBigQueryWrite();
+ tableConfigure.put("processing_time",
+ new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
+ "STRING", c -> fmt.print(Instant.now())));
+ return tableConfigure;
+ }
+
+
+ public static void main(String[] args) throws Exception {
+
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ // Enforce that this pipeline is always run in streaming mode.
+ options.setStreaming(true);
+ // For example purposes, allow the pipeline to be easily cancelled instead of running
+ // continuously.
+ options.setRunner(DataflowPipelineRunner.class);
+ DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ Pipeline pipeline = Pipeline.create(options);
+
+ // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
+ // data elements, and parse the data.
+ PCollection<GameActionInfo> gameEvents = pipeline
+ .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+ .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
+
+ // [START DocInclude_WindowAndTrigger]
+ // Extract team/score pairs from the event stream, using hour-long windows by default.
+ gameEvents
+ .apply(Window.named("LeaderboardTeamFixedWindows")
+ .<GameActionInfo>into(FixedWindows.of(
+ Duration.standardMinutes(options.getTeamWindowDuration())))
+ // We will get early (speculative) results as well as cumulative
+ // processing of late data.
+ .triggering(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(FIVE_MINUTES))
+ .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(TEN_MINUTES)))
+ .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))
+ .accumulatingFiredPanes())
+ // Extract and sum teamname/score pairs from the event data.
+ .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
+ // Write the results to BigQuery.
+ .apply("WriteTeamScoreSums",
+ new WriteWindowedToBigQuery<KV<String, Integer>>(
+ options.getTableName() + "_team", configureWindowedTableWrite()));
+ // [END DocInclude_WindowAndTrigger]
+
+ // [START DocInclude_ProcTimeTrigger]
+ // Extract user/score pairs from the event stream using processing time, via global windowing.
+ // Get periodic updates on all users' running scores.
+ gameEvents
+ .apply(Window.named("LeaderboardUserGlobalWindow")
+ .<GameActionInfo>into(new GlobalWindows())
+ // Get periodic results every ten minutes.
+ .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(TEN_MINUTES)))
+ .accumulatingFiredPanes()
+ .withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())))
+ // Extract and sum username/score pairs from the event data.
+ .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+ // Write the results to BigQuery.
+ .apply("WriteUserScoreSums",
+ new WriteToBigQuery<KV<String, Integer>>(
+ options.getTableName() + "_user", configureGlobalWindowBigQueryWrite()));
+ // [END DocInclude_ProcTimeTrigger]
+
+ // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+ // command line.
+ PipelineResult result = pipeline.run();
+ dataflowUtils.waitToFinish(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md
new file mode 100644
index 0000000..79b55ce
--- /dev/null
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/README.md
@@ -0,0 +1,113 @@
+
+# 'Gaming' examples
+
+
+This directory holds a series of example Dataflow pipelines in a simple 'mobile
+gaming' domain. They all require Java 8. Each pipeline successively introduces
+new concepts, and gives some examples of using Java 8 syntax in constructing
+Dataflow pipelines. Other than usage of Java 8 lambda expressions, the concepts
+that are used apply equally well in Java 7.
+
+In the gaming scenario, many users play, as members of different teams, over
+the course of a day, and their actions are logged for processing. Some of the
+logged game events may be late-arriving, if users play on mobile devices and go
+transiently offline for a period.
+
+The scenario includes not only "regular" users, but "robot users", which have a
+higher click rate than the regular users, and may move from team to team.
+
+The first two pipelines in the series use pre-generated batch data samples. The
+second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/)
+topic input. For these examples, you will also need to run the
+`injector.Injector` program, which generates and publishes the gaming data to
+PubSub. The javadocs for each pipeline have more detailed information on how to
+run that pipeline.
+
+All of these pipelines write their results to BigQuery table(s).
+
+
+## The pipelines in the 'gaming' series
+
+### UserScore
+
+The first pipeline in the series is `UserScore`. This pipeline does batch
+processing of data collected from gaming events. It calculates the sum of
+scores per user, over an entire batch of gaming data (collected, say, for each
+day). The batch processing will not include any late data that arrives after
+the day's cutoff point.
+
+### HourlyTeamScore
+
+The next pipeline in the series is `HourlyTeamScore`. This pipeline also
+processes data collected from gaming events in batch. It builds on `UserScore`,
+but uses [fixed windows](https://cloud.google.com/dataflow/model/windowing), by
+default an hour in duration. It calculates the sum of scores per team, for each
+window, optionally allowing specification of two timestamps before and after
+which data is filtered out. This allows a model where late data collected after
+the intended analysis window can be included in the analysis, and any late-
+arriving data prior to the beginning of the analysis window can be removed as
+well.
+
+By using windowing and adding element timestamps, we can do finer-grained
+analysis than with the `UserScore` pipeline — we're now tracking scores for
+each hour rather than over the course of a whole day. However, our batch
+processing is high-latency, in that we don't get results from plays at the
+beginning of the batch's time period until the complete batch is processed.
+
+### LeaderBoard
+
+The third pipeline in the series is `LeaderBoard`. This pipeline processes an
+unbounded stream of 'game events' from a PubSub topic. The calculation of the
+team scores uses fixed windowing based on event time (the time of the game play
+event), not processing time (the time that an event is processed by the
+pipeline). The pipeline calculates the sum of scores per team, for each window.
+By default, the team scores are calculated using one-hour windows.
+
+In contrast — to demo another windowing option — the user scores are calculated
+using a global window, which periodically (every ten minutes) emits cumulative
+user score sums.
+
+In contrast to the previous pipelines in the series, which used static, finite
+input data, here we're using an unbounded data source, which lets us provide
+_speculative_ results, and allows handling of late data, at much lower latency.
+E.g., we could use the early/speculative results to keep a 'leaderboard'
+updated in near-realtime. Our handling of late data lets us generate correct
+results, e.g. for 'team prizes'. We're now outputing window results as they're
+calculated, giving us much lower latency than with the previous batch examples.
+
+### GameStats
+
+The fourth pipeline in the series is `GameStats`. This pipeline builds
+on the `LeaderBoard` functionality — supporting output of speculative and late
+data — and adds some "business intelligence" analysis: identifying abuse
+detection. The pipeline derives the Mean user score sum for a window, and uses
+that information to identify likely spammers/robots. (The injector is designed
+so that the "robots" have a higher click rate than the "real" users). The robot
+users are then filtered out when calculating the team scores.
+
+Additionally, user sessions are tracked: that is, we find bursts of user
+activity using session windows. Then, the mean session duration information is
+recorded in the context of subsequent fixed windowing. (This could be used to
+tell us what games are giving us greater user retention).
+
+### Running the PubSub Injector
+
+The `LeaderBoard` and `GameStats` example pipelines read unbounded data
+from a PubSub topic.
+
+Use the `injector.Injector` program to generate this data and publish to a
+PubSub topic. See the `Injector`javadocs for more information on how to run the
+injector. Set up the injector before you start one of these pipelines. Then,
+when you start the pipeline, pass as an argument the name of that PubSub topic.
+See the pipeline javadocs for the details.
+
+## Viewing the results in BigQuery
+
+All of the pipelines write their results to BigQuery. `UserScore` and
+`HourlyTeamScore` each write one table, and `LeaderBoard` and
+`GameStats` each write two. The pipelines have default table names that
+you can override when you start up the pipeline if those tables already exist.
+
+Depending on the windowing intervals defined in a given pipeline, you may have
+to wait for a while (more than an hour) before you start to see results written
+to the BigQuery tables.