You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:21 UTC
[57/67] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java
new file mode 100644
index 0000000..745c5d6
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+
+/**
+ * This example shows how to do a join on two collections.
+ * It uses a sample of the GDELT 'world event' data (http://goo.gl/OB6oin), joining the event
+ * 'action' country code against a table that maps country codes to country names.
+ *
+ * <p>Concepts: Join operation; multiple input sources.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and a local output file or output prefix on GCS:
+ * <pre>{@code
+ * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and an output prefix on GCS:
+ * <pre>{@code
+ * --output=gs://YOUR_OUTPUT_PREFIX
+ * }</pre>
+ */
+public class JoinExamples {
+
+ // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
+ private static final String GDELT_EVENTS_TABLE =
+ "clouddataflow-readonly:samples.gdelt_sample";
+ // A table that maps country codes to country names.
+ private static final String COUNTRY_CODES =
+ "gdelt-bq:full.crosswalk_geocountrycodetohuman";
+
+ /**
+ * Join two collections, using country code as the key.
+ */
+ static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
+ PCollection<TableRow> countryCodes) throws Exception {
+
+ final TupleTag<String> eventInfoTag = new TupleTag<String>();
+ final TupleTag<String> countryInfoTag = new TupleTag<String>();
+
+ // transform both input collections to tuple collections, where the keys are country
+ // codes in both cases.
+ PCollection<KV<String, String>> eventInfo = eventsTable.apply(
+ ParDo.of(new ExtractEventDataFn()));
+ PCollection<KV<String, String>> countryInfo = countryCodes.apply(
+ ParDo.of(new ExtractCountryInfoFn()));
+
+ // country code 'key' -> CGBKR (<event info>, <country name>)
+ PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+ .of(eventInfoTag, eventInfo)
+ .and(countryInfoTag, countryInfo)
+ .apply(CoGroupByKey.<String>create());
+
+ // Process the CoGbkResult elements generated by the CoGroupByKey transform.
+ // country code 'key' -> string of <event info>, <country name>
+ PCollection<KV<String, String>> finalResultCollection =
+ kvpCollection.apply(ParDo.named("Process").of(
+ new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<String, CoGbkResult> e = c.element();
+ String countryCode = e.getKey();
+ String countryName = "none";
+ countryName = e.getValue().getOnly(countryInfoTag);
+ for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
+ // Generate a string that combines information from both collection values
+ c.output(KV.of(countryCode, "Country name: " + countryName
+ + ", Event info: " + eventInfo));
+ }
+ }
+ }));
+
+ // write to GCS
+ PCollection<String> formattedResults = finalResultCollection
+ .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ String outputstring = "Country code: " + c.element().getKey()
+ + ", " + c.element().getValue();
+ c.output(outputstring);
+ }
+ }));
+ return formattedResults;
+ }
+
+ /**
+ * Examines each row (event) in the input table. Output a KV with the key the country
+ * code of the event, and the value a string encoding event information.
+ */
+ static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
+ String countryCode = (String) row.get("ActionGeo_CountryCode");
+ String sqlDate = (String) row.get("SQLDATE");
+ String actor1Name = (String) row.get("Actor1Name");
+ String sourceUrl = (String) row.get("SOURCEURL");
+ String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
+ c.output(KV.of(countryCode, eventInfo));
+ }
+ }
+
+
+ /**
+ * Examines each row (country info) in the input table. Output a KV with the key the country
+ * code, and the value the country name.
+ */
+ static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
+ String countryCode = (String) row.get("FIPSCC");
+ String countryName = (String) row.get("HumanName");
+ c.output(KV.of(countryCode, countryName));
+ }
+ }
+
+
+ /**
+ * Options supported by {@link JoinExamples}.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ private static interface Options extends PipelineOptions {
+ @Description("Path of the file to write to")
+ @Validation.Required
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline p = Pipeline.create(options);
+ // the following two 'applys' create multiple inputs to our pipeline, one for each
+ // of our two input sources.
+ PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
+ PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
+ PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
+ formattedResults.apply(TextIO.Write.to(options.getOutput()));
+ p.run();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java
new file mode 100644
index 0000000..1c26d0f
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Max;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An example that reads the public samples of weather data from BigQuery, and finds
+ * the maximum temperature ('mean_temp') for each month.
+ *
+ * <p>Concepts: The 'Max' statistical combination function, and how to find the max per
+ * key group.
+ *
+ * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and the BigQuery table for the output, with the form
+ * <pre>{@code
+ * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations }
+ * and can be overridden with {@code --input}.
+ */
+public class MaxPerKeyExamples {
+ // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+ private static final String WEATHER_SAMPLES_TABLE =
+ "clouddataflow-readonly:samples.weather_stations";
+
+ /**
+ * Examines each row (weather reading) in the input table. Output the month of the reading,
+ * and the mean_temp.
+ */
+ static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
+ Integer month = Integer.parseInt((String) row.get("month"));
+ Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
+ c.output(KV.of(month, meanTemp));
+ }
+ }
+
+ /**
+ * Format the results to a TableRow, to save to BigQuery.
+ *
+ */
+ static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = new TableRow()
+ .set("month", c.element().getKey())
+ .set("max_mean_temp", c.element().getValue());
+ c.output(row);
+ }
+ }
+
+ /**
+ * Reads rows from a weather data table, and finds the max mean_temp for each
+ * month via the 'Max' statistical combination function.
+ */
+ static class MaxMeanTemp
+ extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
+ @Override
+ public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+
+ // row... => <month, mean_temp> ...
+ PCollection<KV<Integer, Double>> temps = rows.apply(
+ ParDo.of(new ExtractTempFn()));
+
+ // month, mean_temp... => <month, max mean temp>...
+ PCollection<KV<Integer, Double>> tempMaxes =
+ temps.apply(Max.<Integer>doublesPerKey());
+
+ // <month, max>... => row...
+ PCollection<TableRow> results = tempMaxes.apply(
+ ParDo.of(new FormatMaxesFn()));
+
+ return results;
+ }
+ }
+
+ /**
+ * Options supported by {@link MaxPerKeyExamples}.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ private static interface Options extends PipelineOptions {
+ @Description("Table to read from, specified as "
+ + "<project_id>:<dataset_id>.<table_id>")
+ @Default.String(WEATHER_SAMPLES_TABLE)
+ String getInput();
+ void setInput(String value);
+
+ @Description("Table to write to, specified as "
+ + "<project_id>:<dataset_id>.<table_id>")
+ @Validation.Required
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline p = Pipeline.create(options);
+
+ // Build the table schema for the output table.
+ List<TableFieldSchema> fields = new ArrayList<>();
+ fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
+ fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT"));
+ TableSchema schema = new TableSchema().setFields(fields);
+
+ p.apply(BigQueryIO.Read.from(options.getInput()))
+ .apply(new MaxMeanTemp())
+ .apply(BigQueryIO.Write
+ .to(options.getOutput())
+ .withSchema(schema)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md
new file mode 100644
index 0000000..99f3080
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/README.md
@@ -0,0 +1,55 @@
+
+# "Cookbook" Examples
+
+This directory holds simple "cookbook" examples, which show how to define
+commonly-used data analysis patterns that you would likely incorporate into a
+larger Dataflow pipeline. They include:
+
+ <ul>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java">BigQueryTornadoes</a>
+ — An example that reads the public samples of weather data from Google
+ BigQuery, counts the number of tornadoes that occur in each month, and
+ writes the results to BigQuery. Demonstrates reading/writing BigQuery,
+ counting a <code>PCollection</code>, and user-defined <code>PTransforms</code>.</li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java">CombinePerKeyExamples</a>
+ — An example that reads the public "Shakespeare" data, and for
+ each word in the dataset that exceeds a given length, generates a string
+ containing the list of play names in which that word appears.
+ Demonstrates the <code>Combine.perKey</code>
+ transform, which lets you combine the values in a key-grouped
+ <code>PCollection</code>.
+ </li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java">DatastoreWordCount</a>
+ — An example that shows you how to read from Google Cloud Datastore.</li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java">DeDupExample</a>
+ — An example that uses Shakespeare's plays as plain text files, and
+ removes duplicate lines across all the files. Demonstrates the
+ <code>RemoveDuplicates</code>, <code>TextIO.Read</code>,
+ and <code>TextIO.Write</code> transforms, and how to wire transforms together.
+ </li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java">FilterExamples</a>
+ — An example that shows different approaches to filtering, including
+ selection and projection. It also shows how to dynamically set parameters
+ by defining and using new pipeline options, and use how to use a value derived
+ by a pipeline. Demonstrates the <code>Mean</code> transform,
+ <code>Options</code> configuration, and using pipeline-derived data as a side
+ input.
+ </li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java">JoinExamples</a>
+ — An example that shows how to join two collections. It uses a
+ sample of the <a href="http://goo.gl/OB6oin">GDELT "world event"
+ data</a>, joining the event <code>action</code> country code against a table
+ that maps country codes to country names. Demonstrates the <code>Join</code>
+ operation, and using multiple input sources.
+ </li>
+ <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java">MaxPerKeyExamples</a>
+ — An example that reads the public samples of weather data from BigQuery,
+ and finds the maximum temperature (<code>mean_temp</code>) for each month.
+ Demonstrates the <code>Max</code> statistical combination transform, and how to
+ find the max-per-key group.
+ </li>
+ </ul>
+
+See the [documentation](https://cloud.google.com/dataflow/getting-started) and the [Examples
+README](../../../../../../../../../README.md) for
+information about how to run these examples.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/TriggerExample.java
new file mode 100644
index 0000000..ce5e08e
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/TriggerExample.java
@@ -0,0 +1,564 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
+import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
+import com.google.cloud.dataflow.examples.common.PubsubFileInjector;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterEach;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This example illustrates the basic concepts behind triggering. It shows how to use different
+ * trigger definitions to produce partial (speculative) results before all the data is processed and
+ * to control when updated results are produced for late data. The example performs a streaming
+ * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the
+ * data into {@link Window windows} to be processed, and demonstrates using various kinds of {@link
+ * Trigger triggers} to control when the results for each window are emitted.
+ *
+ * <p> This example uses a portion of real traffic data from San Diego freeways. It contains
+ * readings from sensor stations set up along each freeway. Each sensor reading includes a
+ * calculation of the 'total flow' across all lanes in that freeway direction.
+ *
+ * <p> Concepts:
+ * <pre>
+ * 1. The default triggering behavior
+ * 2. Late data with the default trigger
+ * 3. How to get speculative estimates
+ * 4. Combining late data and speculative estimates
+ * </pre>
+ *
+ * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers
+ * and understand the concept of 'late data',
+ * See: <a href="https://cloud.google.com/dataflow/model/triggers">
+ * https://cloud.google.com/dataflow/model/triggers </a> and
+ * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced">
+ * https://cloud.google.com/dataflow/model/windowing#Advanced </a>
+ *
+ * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will
+ * also run an auxiliary pipeline to inject data from the default {@code --input} file to the
+ * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the
+ * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary
+ * pipeline also randomly simulates late data, by setting the timestamps of some of the data
+ * elements to be in the past. You may override the default {@code --input} with the file of your
+ * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow
+ * you to use a separate tool to publish to the given topic.
+ *
+ * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and
+ * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p> The pipeline outputs its results to a BigQuery table.
+ * Here are some queries you can use to see interesting results:
+ * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table.
+ * Replace {@code <enter_window_interval>} in the query below with the window interval.
+ *
+ * <p> To see the results of the default trigger,
+ * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
+ * the window duration, until the first pane of non-late data has been emitted, to see more
+ * interesting results.
+ * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC}
+ *
+ * <p> To see the late data i.e. dropped by the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and
+ * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time}
+ *
+ * <p>To see the the difference between accumulation mode and discarding mode,
+ * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
+ * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY
+ * window DESC, processing_time}
+ *
+ * <p> To see speculative results every minute,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5"
+ * ORDER BY window DESC, processing_time}
+ *
+ * <p> To see speculative results every five minutes after the end of the window
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY"
+ * and freeway = "5" ORDER BY window DESC, processing_time}
+ *
+ * <p> To see the first and the last pane for a freeway in a window for all the trigger types,
+ * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
+ *
+ * <p> To reduce the number of results for each query we can add additional where clauses.
+ * For examples, To see the results of the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND
+ * window = "<enter_window_interval>"}
+ *
+ * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+
+public class TriggerExample {
+ //Numeric value of fixed window duration, in minutes
+ public static final int WINDOW_DURATION = 30;
+ // Constants used in triggers.
+ // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
+ // ONE_MINUTE is used only with processing time before the end of the window
+ public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
+ // FIVE_MINUTES is used only with processing time after the end of the window
+ public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
+ // ONE_DAY is used to specify the amount of lateness allowed for the data elements.
+ public static final Duration ONE_DAY = Duration.standardDays(1);
+
+ /**
+ * This transform demonstrates using triggers to control when data is produced for each window
+ * Consider an example to understand the results generated by each type of trigger.
+ * The example uses "freeway" as the key. Event time is the timestamp associated with the data
+ * element and processing time is the time when the data element gets processed in the pipeline.
+ * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
+ * Key (freeway) | Value (total_flow) | event time | processing time
+ * 5 | 50 | 10:00:03 | 10:00:47
+ * 5 | 30 | 10:01:00 | 10:01:03
+ * 5 | 30 | 10:02:00 | 11:07:00
+ * 5 | 20 | 10:04:10 | 10:05:15
+ * 5 | 60 | 10:05:00 | 11:03:00
+ * 5 | 20 | 10:05:01 | 11.07:30
+ * 5 | 60 | 10:15:00 | 10:27:15
+ * 5 | 40 | 10:26:40 | 10:26:43
+ * 5 | 60 | 10:27:20 | 10:27:25
+ * 5 | 60 | 10:29:00 | 11:11:00
+ *
+ * <p> Dataflow tracks a watermark which records up to what point in event time the data is
+ * complete. For the purposes of the example, we'll assume the watermark is approximately 15m
+ * behind the current processing time. In practice, the actual value would vary over time based
+ * on the systems knowledge of the current PubSub delay and contents of the backlog (data
+ * that has not yet been processed).
+ *
+ * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
+ * close at 10:44:59, when the watermark passes 10:30:00.
+ */
+ static class CalculateTotalFlow
+ extends PTransform <PCollection<KV<String, Integer>>, PCollectionList<TableRow>> {
+ private int windowDuration;
+
+ CalculateTotalFlow(int windowDuration) {
+ this.windowDuration = windowDuration;
+ }
+
+ @Override
+ public PCollectionList<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
+
+ // Concept #1: The default triggering behavior
+ // By default Dataflow uses a trigger which fires when the watermark has passed the end of the
+ // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+
+ // The system also defaults to dropping late data -- data which arrives after the watermark
+ // has passed the event timestamp of the arriving element. This means that the default trigger
+ // will only fire once.
+
+ // Each pane produced by the default trigger with no allowed lateness will be the first and
+ // last pane in the window, and will be ON_TIME.
+
+ // The results for the example above with the default trigger and zero allowed lateness
+ // would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 260 | 6 | true | true | ON_TIME
+
+ // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
+ // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
+ // late, and dropped.
+
+ PCollection<TableRow> defaultTriggerResults = flowInfo
+ .apply("Default", Window
+ // The default window duration values work well if you're running the default input
+ // file. You may want to adjust the window duration otherwise.
+ .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
+ // The default trigger first emits output when the system's watermark passes the end
+ // of the window.
+ .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+ // Late data is dropped
+ .withAllowedLateness(Duration.ZERO)
+ // Discard elements after emitting each pane.
+ // With no allowed lateness and the specified trigger there will only be a single
+ // pane, so this doesn't have a noticeable effect. See concept 2 for more details.
+ .discardingFiredPanes())
+ .apply(new TotalFlow("default"));
+
+ // Concept #2: Late data with the default trigger
+ // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
+ // leads to each window staying open for ONE_DAY after the watermark has passed the end of the
+ // window. Any late data will result in an additional pane being fired for that same window.
+
+ // The first pane produced will be ON_TIME and the remaining panes will be LATE.
+ // To definitely get the last pane when the window closes, use
+ // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
+
+ // The results for the example above with the default trigger and ONE_DAY allowed lateness
+ // would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 260 | 6 | true | false | ON_TIME
+ // 5 | 60 | 1 | false | false | LATE
+ // 5 | 30 | 1 | false | false | LATE
+ // 5 | 20 | 1 | false | false | LATE
+ // 5 | 60 | 1 | false | false | LATE
+ PCollection<TableRow> withAllowedLatenessResults = flowInfo
+ .apply("WithLateData", Window
+ .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
+ // Late data is emitted as it arrives
+ .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+ // Once the output is produced, the pane is dropped and we start preparing the next
+ // pane for the window
+ .discardingFiredPanes()
+ // Late data is handled up to one day
+ .withAllowedLateness(ONE_DAY))
+ .apply(new TotalFlow("withAllowedLateness"));
+
+ // Concept #3: How to get speculative estimates
+ // We can specify a trigger that fires independent of the watermark, for instance after
+ // ONE_MINUTE of processing time. This allows us to produce speculative estimates before
+ // all the data is available. Since we don't have any triggers that depend on the watermark
+ // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
+
+ // We also use accumulatingFiredPanes to build up the results across each pane firing.
+
+ // The results for the example above for this trigger would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 80 | 2 | true | false | EARLY
+ // 5 | 100 | 3 | false | false | EARLY
+ // 5 | 260 | 6 | false | false | EARLY
+ // 5 | 320 | 7 | false | false | LATE
+ // 5 | 370 | 9 | false | false | LATE
+ // 5 | 430 | 10 | false | false | LATE
+ PCollection<TableRow> speculativeResults = flowInfo
+ .apply("Speculative" , Window
+ .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
+ // Trigger fires every minute.
+ .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+ // Speculative every ONE_MINUTE
+ .plusDelayOf(ONE_MINUTE)))
+ // After emitting each pane, it will continue accumulating the elements so that each
+ // approximation includes all of the previous data in addition to the newly arrived
+ // data.
+ .accumulatingFiredPanes()
+ .withAllowedLateness(ONE_DAY))
+ .apply(new TotalFlow("speculative"));
+
+ // Concept #4: Combining late data and speculative estimates
+ // We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
+ // and LATE updates based on late data.
+
+ // Each time a triggering condition is satisfied it advances to the next trigger.
+ // If there are new elements this trigger emits a window under following condition:
+ // > Early approximations every minute till the end of the window.
+ // > An on-time firing when the watermark has passed the end of the window
+ // > Every five minutes of late data.
+
+ // Every pane produced will either be EARLY, ON_TIME or LATE.
+
+ // The results for the example above for this trigger would be:
+ // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+ // 5 | 80 | 2 | true | false | EARLY
+ // 5 | 100 | 3 | false | false | EARLY
+ // 5 | 260 | 6 | false | false | EARLY
+ // [First pane fired after the end of the window]
+ // 5 | 320 | 7 | false | false | ON_TIME
+ // 5 | 430 | 10 | false | false | LATE
+
+ // For more possibilities of how to build advanced triggers, see {@link Trigger}.
+ PCollection<TableRow> sequentialResults = flowInfo
+ .apply("Sequential", Window
+ .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
+ .triggering(AfterEach.inOrder(
+ Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+ // Speculative every ONE_MINUTE
+ .plusDelayOf(ONE_MINUTE)).orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+ // Late data every FIVE_MINUTES
+ .plusDelayOf(FIVE_MINUTES))))
+ .accumulatingFiredPanes()
+ // For up to ONE_DAY
+ .withAllowedLateness(ONE_DAY))
+ .apply(new TotalFlow("sequential"));
+
+ // Adds the results generated by each trigger type to a PCollectionList.
+ PCollectionList<TableRow> resultsList = PCollectionList.of(defaultTriggerResults)
+ .and(withAllowedLatenessResults)
+ .and(speculativeResults)
+ .and(sequentialResults);
+
+ return resultsList;
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // The remaining parts of the pipeline are needed to produce the output for each
+ // concept above. Not directly relevant to understanding the trigger examples.
+
+ /**
+ * Calculate total flow and number of records for each freeway and format the results to TableRow
+ * objects, to save to BigQuery.
+ */
+ static class TotalFlow extends
+ PTransform <PCollection<KV<String, Integer>>, PCollection<TableRow>> {
+ private String triggerType;
+
+ public TotalFlow(String triggerType) {
+ this.triggerType = triggerType;
+ }
+
+ @Override
+ public PCollection<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) {
+ PCollection<KV<String, Iterable<Integer>>> flowPerFreeway = flowInfo
+ .apply(GroupByKey.<String, Integer>create());
+
+ PCollection<KV<String, String>> results = flowPerFreeway.apply(ParDo.of(
+ new DoFn <KV<String, Iterable<Integer>>, KV<String, String>>() {
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ Iterable<Integer> flows = c.element().getValue();
+ Integer sum = 0;
+ Long numberOfRecords = 0L;
+ for (Integer value : flows) {
+ sum += value;
+ numberOfRecords++;
+ }
+ c.output(KV.of(c.element().getKey(), sum + "," + numberOfRecords));
+ }
+ }));
+ PCollection<TableRow> output = results.apply(ParDo.of(new FormatTotalFlow(triggerType)));
+ return output;
+ }
+ }
+
+ /**
+ * Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
+ * Adds the triggerType, pane information, processing time and the window timestamp.
+ * */
+ static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow>
+ implements RequiresWindowAccess {
+ private String triggerType;
+
+ public FormatTotalFlow(String triggerType) {
+ this.triggerType = triggerType;
+ }
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ String[] values = c.element().getValue().split(",");
+ TableRow row = new TableRow()
+ .set("trigger_type", triggerType)
+ .set("freeway", c.element().getKey())
+ .set("total_flow", Integer.parseInt(values[0]))
+ .set("number_of_records", Long.parseLong(values[1]))
+ .set("window", c.window().toString())
+ .set("isFirst", c.pane().isFirst())
+ .set("isLast", c.pane().isLast())
+ .set("timing", c.pane().getTiming().toString())
+ .set("event_time", c.timestamp().toString())
+ .set("processing_time", Instant.now().toString());
+ c.output(row);
+ }
+ }
+
+ /**
+ * Extract the freeway and total flow in a reading.
+ * Freeway is used as key since we are calculating the total flow for each freeway.
+ */
+ static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ String[] laneInfo = c.element().split(",");
+ if (laneInfo[0].equals("timestamp")) {
+ // Header row
+ return;
+ }
+ if (laneInfo.length < 48) {
+ //Skip the invalid input.
+ return;
+ }
+ String freeway = laneInfo[2];
+ Integer totalFlow = tryIntegerParse(laneInfo[7]);
+ // Ignore the records with total flow 0 to easily understand the working of triggers.
+ // Skip the records with total flow -1 since they are invalid input.
+ if (totalFlow == null || totalFlow <= 0) {
+ return;
+ }
+ c.output(KV.of(freeway, totalFlow));
+ }
+ }
+
+ /**
+ * Inherits standard configuration options.
+ */
+ public interface TrafficFlowOptions
+ extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions, DataflowExampleOptions {
+
+ @Description("Input file to inject to Pub/Sub topic")
+ @Default.String("gs://dataflow-samples/traffic_sensor/"
+ + "Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
+ String getInput();
+ void setInput(String value);
+
+ @Description("Numeric value of window duration for fixed windows, in minutes")
+ @Default.Integer(WINDOW_DURATION)
+ Integer getWindowDuration();
+ void setWindowDuration(Integer value);
+ }
+
+ private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+
+ public static void main(String[] args) throws Exception {
+ TrafficFlowOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(TrafficFlowOptions.class);
+ options.setStreaming(true);
+
+ // In order to cancel the pipelines automatically,
+ // {@code DataflowPipelineRunner} is forced to be used.
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setBigQuerySchema(getSchema());
+
+ DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ dataflowUtils.setup();
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ TableReference tableRef = getTableReference(options.getProject(),
+ options.getBigQueryDataset(), options.getBigQueryTable());
+
+ PCollectionList<TableRow> resultList = pipeline.apply(PubsubIO.Read.named("ReadPubsubInput")
+ .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+ .topic(options.getPubsubTopic()))
+ .apply(ParDo.of(new ExtractFlowInfo()))
+ .apply(new CalculateTotalFlow(options.getWindowDuration()));
+
+ for (int i = 0; i < resultList.size(); i++){
+ resultList.get(i).apply(BigQueryIO.Write.to(tableRef).withSchema(getSchema()));
+ }
+
+ PipelineResult result = pipeline.run();
+ if (!options.getInput().isEmpty()){
+ //Inject the data into the pubsub topic
+ dataflowUtils.runInjectorPipeline(runInjector(options));
+ }
+ // dataflowUtils will try to cancel the pipeline and the injector before the program exits.
+ dataflowUtils.waitToFinish(result);
+ }
+
+ private static Pipeline runInjector(TrafficFlowOptions options){
+ DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
+ copiedOptions.setStreaming(false);
+ copiedOptions.setNumWorkers(options.as(DataflowExampleOptions.class).getInjectorNumWorkers());
+ copiedOptions.setJobName(options.getJobName() + "-injector");
+ Pipeline injectorPipeline = Pipeline.create(copiedOptions);
+ injectorPipeline
+ .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
+ .apply(ParDo.named("InsertRandomDelays").of(new InsertDelays()))
+ .apply(IntraBundleParallelization.of(PubsubFileInjector
+ .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY)
+ .publish(options.getPubsubTopic()))
+ .withMaxParallelism(20));
+
+ return injectorPipeline;
+ }
+
+ /**
+ * Add current time to each record.
+ * Also insert a delay at random to demo the triggers.
+ */
+ public static class InsertDelays extends DoFn<String, String> {
+ private static final double THRESHOLD = 0.001;
+ // MIN_DELAY and MAX_DELAY in minutes.
+ private static final int MIN_DELAY = 1;
+ private static final int MAX_DELAY = 100;
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ Instant timestamp = Instant.now();
+ if (Math.random() < THRESHOLD){
+ int range = MAX_DELAY - MIN_DELAY;
+ int delayInMinutes = (int) (Math.random() * range) + MIN_DELAY;
+ long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes);
+ timestamp = new Instant(timestamp.getMillis() - delayInMillis);
+ }
+ c.outputWithTimestamp(c.element(), timestamp);
+ }
+ }
+
+
+ /**Sets the table reference. **/
+ private static TableReference getTableReference(String project, String dataset, String table){
+ TableReference tableRef = new TableReference();
+ tableRef.setProjectId(project);
+ tableRef.setDatasetId(dataset);
+ tableRef.setTableId(table);
+ return tableRef;
+ }
+
+ /** Defines the BigQuery schema used for the output. */
+ private static TableSchema getSchema() {
+ List<TableFieldSchema> fields = new ArrayList<>();
+ fields.add(new TableFieldSchema().setName("trigger_type").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
+ fields.add(new TableFieldSchema().setName("number_of_records").setType("INTEGER"));
+ fields.add(new TableFieldSchema().setName("window").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("isFirst").setType("BOOLEAN"));
+ fields.add(new TableFieldSchema().setName("isLast").setType("BOOLEAN"));
+ fields.add(new TableFieldSchema().setName("timing").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("event_time").setType("TIMESTAMP"));
+ fields.add(new TableFieldSchema().setName("processing_time").setType("TIMESTAMP"));
+ TableSchema schema = new TableSchema().setFields(fields);
+ return schema;
+ }
+
+ private static Integer tryIntegerParse(String number) {
+ try {
+ return Integer.parseInt(number);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java
new file mode 100644
index 0000000..77d7bc8
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples;
+
+import com.google.common.io.Files;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Tests for {@link DebuggingWordCount}.
+ */
+@RunWith(JUnit4.class)
+public class DebuggingWordCountTest {
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testDebuggingWordCount() throws Exception {
+ File file = tmpFolder.newFile();
+ Files.write("stomach secret Flourish message Flourish here Flourish", file,
+ StandardCharsets.UTF_8);
+ DebuggingWordCount.main(new String[]{"--inputFile=" + file.getAbsolutePath()});
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java
new file mode 100644
index 0000000..4542c48
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples;
+
+import com.google.cloud.dataflow.examples.WordCount.CountWords;
+import com.google.cloud.dataflow.examples.WordCount.ExtractWordsFn;
+import com.google.cloud.dataflow.examples.WordCount.FormatAsTextFn;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests of WordCount.
+ */
+@RunWith(JUnit4.class)
+public class WordCountTest {
+
+ /** Example test that tests a specific DoFn. */
+ @Test
+ public void testExtractWordsFn() {
+ DoFnTester<String, String> extractWordsFn =
+ DoFnTester.of(new ExtractWordsFn());
+
+ Assert.assertThat(extractWordsFn.processBatch(" some input words "),
+ CoreMatchers.hasItems("some", "input", "words"));
+ Assert.assertThat(extractWordsFn.processBatch(" "),
+ CoreMatchers.<String>hasItems());
+ Assert.assertThat(extractWordsFn.processBatch(" some ", " input", " words"),
+ CoreMatchers.hasItems("some", "input", "words"));
+ }
+
+ static final String[] WORDS_ARRAY = new String[] {
+ "hi there", "hi", "hi sue bob",
+ "hi sue", "", "bob hi"};
+
+ static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+
+ static final String[] COUNTS_ARRAY = new String[] {
+ "hi: 5", "there: 1", "sue: 2", "bob: 2"};
+
+ /** Example test that tests a PTransform by using an in-memory input and inspecting the output. */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testCountWords() throws Exception {
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
+
+ PCollection<String> output = input.apply(new CountWords())
+ .apply(MapElements.via(new FormatAsTextFn()));
+
+ DataflowAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java
new file mode 100644
index 0000000..aec1557
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.cloud.dataflow.examples.complete.AutoComplete.CompletionCandidate;
+import com.google.cloud.dataflow.examples.complete.AutoComplete.ComputeTopCompletions;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Filter;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Tests of AutoComplete.
+ */
+@RunWith(Parameterized.class)
+public class AutoCompleteTest implements Serializable {
+ private boolean recursive;
+
+ public AutoCompleteTest(Boolean recursive) {
+ this.recursive = recursive;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> testRecursive() {
+ return Arrays.asList(new Object[][] {
+ { true },
+ { false }
+ });
+ }
+
+ @Test
+ public void testAutoComplete() {
+ List<String> words = Arrays.asList(
+ "apple",
+ "apple",
+ "apricot",
+ "banana",
+ "blackberry",
+ "blackberry",
+ "blackberry",
+ "blueberry",
+ "blueberry",
+ "cherry");
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(words));
+
+ PCollection<KV<String, List<CompletionCandidate>>> output =
+ input.apply(new ComputeTopCompletions(2, recursive))
+ .apply(Filter.byPredicate(
+ new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() {
+ @Override
+ public Boolean apply(KV<String, List<CompletionCandidate>> element) {
+ return element.getKey().length() <= 2;
+ }
+ }));
+
+ DataflowAssert.that(output).containsInAnyOrder(
+ KV.of("a", parseList("apple:2", "apricot:1")),
+ KV.of("ap", parseList("apple:2", "apricot:1")),
+ KV.of("b", parseList("blackberry:3", "blueberry:2")),
+ KV.of("ba", parseList("banana:1")),
+ KV.of("bl", parseList("blackberry:3", "blueberry:2")),
+ KV.of("c", parseList("cherry:1")),
+ KV.of("ch", parseList("cherry:1")));
+ p.run();
+ }
+
+ @Test
+ public void testTinyAutoComplete() {
+ List<String> words = Arrays.asList("x", "x", "x", "xy", "xy", "xyz");
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of(words));
+
+ PCollection<KV<String, List<CompletionCandidate>>> output =
+ input.apply(new ComputeTopCompletions(2, recursive));
+
+ DataflowAssert.that(output).containsInAnyOrder(
+ KV.of("x", parseList("x:3", "xy:2")),
+ KV.of("xy", parseList("xy:2", "xyz:1")),
+ KV.of("xyz", parseList("xyz:1")));
+ p.run();
+ }
+
+ @Test
+ public void testWindowedAutoComplete() {
+ List<TimestampedValue<String>> words = Arrays.asList(
+ TimestampedValue.of("xA", new Instant(1)),
+ TimestampedValue.of("xA", new Instant(1)),
+ TimestampedValue.of("xB", new Instant(1)),
+ TimestampedValue.of("xB", new Instant(2)),
+ TimestampedValue.of("xB", new Instant(2)));
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input = p
+ .apply(Create.of(words))
+ .apply(new ReifyTimestamps<String>());
+
+ PCollection<KV<String, List<CompletionCandidate>>> output =
+ input.apply(Window.<String>into(SlidingWindows.of(new Duration(2))))
+ .apply(new ComputeTopCompletions(2, recursive));
+
+ DataflowAssert.that(output).containsInAnyOrder(
+ // Window [0, 2)
+ KV.of("x", parseList("xA:2", "xB:1")),
+ KV.of("xA", parseList("xA:2")),
+ KV.of("xB", parseList("xB:1")),
+
+ // Window [1, 3)
+ KV.of("x", parseList("xB:3", "xA:2")),
+ KV.of("xA", parseList("xA:2")),
+ KV.of("xB", parseList("xB:3")),
+
+ // Window [2, 3)
+ KV.of("x", parseList("xB:2")),
+ KV.of("xB", parseList("xB:2")));
+ p.run();
+ }
+
+ private static List<CompletionCandidate> parseList(String... entries) {
+ List<CompletionCandidate> all = new ArrayList<>();
+ for (String s : entries) {
+ String[] countValue = s.split(":");
+ all.add(new CompletionCandidate(countValue[0], Integer.valueOf(countValue[1])));
+ }
+ return all;
+ }
+
+ private static class ReifyTimestamps<T>
+ extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> {
+ @Override
+ public PCollection<T> apply(PCollection<TimestampedValue<T>> input) {
+ return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
+ }
+ }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java
new file mode 100644
index 0000000..5ee136c
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.Keys;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.net.URI;
+import java.util.Arrays;
+
+/**
+ * Tests of {@link TfIdf}.
+ */
+@RunWith(JUnit4.class)
+public class TfIdfTest {
+
+ /** Test that the example runs. */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testTfIdf() throws Exception {
+ Pipeline pipeline = TestPipeline.create();
+
+ pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+
+ PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
+ .apply(Create.of(
+ KV.of(new URI("x"), "a b c d"),
+ KV.of(new URI("y"), "a b c"),
+ KV.of(new URI("z"), "a m n")))
+ .apply(new TfIdf.ComputeTfIdf());
+
+ PCollection<String> words = wordToUriAndTfIdf
+ .apply(Keys.<String>create())
+ .apply(RemoveDuplicates.<String>create());
+
+ DataflowAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d"));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java
new file mode 100644
index 0000000..ce9de51
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+
+/** Unit tests for {@link TopWikipediaSessions}. */
+@RunWith(JUnit4.class)
+public class TopWikipediaSessionsTest {
+ @Test
+ @Category(RunnableOnService.class)
+ public void testComputeTopUsers() {
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> output =
+ p.apply(Create.of(Arrays.asList(
+ new TableRow().set("timestamp", 0).set("contributor_username", "user1"),
+ new TableRow().set("timestamp", 1).set("contributor_username", "user1"),
+ new TableRow().set("timestamp", 2).set("contributor_username", "user1"),
+ new TableRow().set("timestamp", 0).set("contributor_username", "user2"),
+ new TableRow().set("timestamp", 1).set("contributor_username", "user2"),
+ new TableRow().set("timestamp", 3601).set("contributor_username", "user2"),
+ new TableRow().set("timestamp", 3602).set("contributor_username", "user2"),
+ new TableRow().set("timestamp", 35 * 24 * 3600).set("contributor_username", "user3"))))
+ .apply(new TopWikipediaSessions.ComputeTopSessions(1.0));
+
+ DataflowAssert.that(output).containsInAnyOrder(Arrays.asList(
+ "user1 : [1970-01-01T00:00:00.000Z..1970-01-01T01:00:02.000Z)"
+ + " : 3 : 1970-01-01T00:00:00.000Z",
+ "user3 : [1970-02-05T00:00:00.000Z..1970-02-05T01:00:00.000Z)"
+ + " : 1 : 1970-02-01T00:00:00.000Z"));
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoesTest.java
new file mode 100644
index 0000000..6dce4ed
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoesTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.examples.cookbook.BigQueryTornadoes.ExtractTornadoesFn;
+import com.google.cloud.dataflow.examples.cookbook.BigQueryTornadoes.FormatCountsFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.values.KV;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.List;
+
+/**
+ * Test case for {@link BigQueryTornadoes}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryTornadoesTest {
+
+ @Test
+ public void testExtractTornadoes() throws Exception {
+ TableRow row = new TableRow()
+ .set("month", "6")
+ .set("tornado", true);
+ DoFnTester<TableRow, Integer> extractWordsFn =
+ DoFnTester.of(new ExtractTornadoesFn());
+ Assert.assertThat(extractWordsFn.processBatch(row),
+ CoreMatchers.hasItems(6));
+ }
+
+ @Test
+ public void testNoTornadoes() throws Exception {
+ TableRow row = new TableRow()
+ .set("month", 6)
+ .set("tornado", false);
+ DoFnTester<TableRow, Integer> extractWordsFn =
+ DoFnTester.of(new ExtractTornadoesFn());
+ Assert.assertTrue(extractWordsFn.processBatch(row).isEmpty());
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testFormatCounts() throws Exception {
+ DoFnTester<KV<Integer, Long>, TableRow> formatCountsFn =
+ DoFnTester.of(new FormatCountsFn());
+ KV empty[] = {};
+ List<TableRow> results = formatCountsFn.processBatch(empty);
+ Assert.assertTrue(results.size() == 0);
+ KV input[] = { KV.of(3, 0L),
+ KV.of(4, Long.MAX_VALUE),
+ KV.of(5, Long.MIN_VALUE) };
+ results = formatCountsFn.processBatch(input);
+ Assert.assertEquals(results.size(), 3);
+ Assert.assertEquals(results.get(0).get("month"), 3);
+ Assert.assertEquals(results.get(0).get("tornado_count"), 0L);
+ Assert.assertEquals(results.get(1).get("month"), 4);
+ Assert.assertEquals(results.get(1).get("tornado_count"), Long.MAX_VALUE);
+ Assert.assertEquals(results.get(2).get("month"), 5);
+ Assert.assertEquals(results.get(2).get("tornado_count"), Long.MIN_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamplesTest.java
new file mode 100644
index 0000000..fe4823d
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamplesTest.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.examples.cookbook.CombinePerKeyExamples.ExtractLargeWordsFn;
+import com.google.cloud.dataflow.examples.cookbook.CombinePerKeyExamples.FormatShakespeareOutputFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.values.KV;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.List;
+
+/** Unit tests for {@link CombinePerKeyExamples}. */
+@RunWith(JUnit4.class)
+public class CombinePerKeyExamplesTest {
+
+ private static final TableRow row1 = new TableRow()
+ .set("corpus", "king_lear").set("word", "snuffleupaguses");
+ private static final TableRow row2 = new TableRow()
+ .set("corpus", "macbeth").set("word", "antidisestablishmentarianism");
+ private static final TableRow row3 = new TableRow()
+ .set("corpus", "king_lear").set("word", "antidisestablishmentarianism");
+ private static final TableRow row4 = new TableRow()
+ .set("corpus", "macbeth").set("word", "bob");
+ private static final TableRow row5 = new TableRow()
+ .set("corpus", "king_lear").set("word", "hi");
+
+ static final TableRow[] ROWS_ARRAY = new TableRow[] {
+ row1, row2, row3, row4, row5
+ };
+
+ private static final KV<String, String> tuple1 = KV.of("snuffleupaguses", "king_lear");
+ private static final KV<String, String> tuple2 = KV.of("antidisestablishmentarianism", "macbeth");
+ private static final KV<String, String> tuple3 = KV.of("antidisestablishmentarianism",
+ "king_lear");
+
+ private static final KV<String, String> combinedTuple1 = KV.of("antidisestablishmentarianism",
+ "king_lear,macbeth");
+ private static final KV<String, String> combinedTuple2 = KV.of("snuffleupaguses", "king_lear");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ static final KV<String, String>[] COMBINED_TUPLES_ARRAY = new KV[] {
+ combinedTuple1, combinedTuple2
+ };
+
+ private static final TableRow resultRow1 = new TableRow()
+ .set("word", "snuffleupaguses").set("all_plays", "king_lear");
+ private static final TableRow resultRow2 = new TableRow()
+ .set("word", "antidisestablishmentarianism")
+ .set("all_plays", "king_lear,macbeth");
+
+ @Test
+ public void testExtractLargeWordsFn() {
+ DoFnTester<TableRow, KV<String, String>> extractLargeWordsFn =
+ DoFnTester.of(new ExtractLargeWordsFn());
+ List<KV<String, String>> results = extractLargeWordsFn.processBatch(ROWS_ARRAY);
+ Assert.assertThat(results, CoreMatchers.hasItem(tuple1));
+ Assert.assertThat(results, CoreMatchers.hasItem(tuple2));
+ Assert.assertThat(results, CoreMatchers.hasItem(tuple3));
+ }
+
+ @Test
+ public void testFormatShakespeareOutputFn() {
+ DoFnTester<KV<String, String>, TableRow> formatShakespeareOutputFn =
+ DoFnTester.of(new FormatShakespeareOutputFn());
+ List<TableRow> results = formatShakespeareOutputFn.processBatch(COMBINED_TUPLES_ARRAY);
+ Assert.assertThat(results, CoreMatchers.hasItem(resultRow1));
+ Assert.assertThat(results, CoreMatchers.hasItem(resultRow2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java
new file mode 100644
index 0000000..bce6b11
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Unit tests for {@link DeDupExample}. */
+@RunWith(JUnit4.class)
+public class DeDupExampleTest {
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testRemoveDuplicates() {
+ List<String> strings = Arrays.asList(
+ "k1",
+ "k5",
+ "k5",
+ "k2",
+ "k1",
+ "k2",
+ "k3");
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input =
+ p.apply(Create.of(strings)
+ .withCoder(StringUtf8Coder.of()));
+
+ PCollection<String> output =
+ input.apply(RemoveDuplicates.<String>create());
+
+ DataflowAssert.that(output)
+ .containsInAnyOrder("k1", "k5", "k2", "k3");
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testRemoveDuplicatesEmpty() {
+ List<String> strings = Arrays.asList();
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> input =
+ p.apply(Create.of(strings)
+ .withCoder(StringUtf8Coder.of()));
+
+ PCollection<String> output =
+ input.apply(RemoveDuplicates.<String>create());
+
+ DataflowAssert.that(output).empty();
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/FilterExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/FilterExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/FilterExamplesTest.java
new file mode 100644
index 0000000..6d822f9
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/FilterExamplesTest.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.examples.cookbook.FilterExamples.FilterSingleMonthDataFn;
+import com.google.cloud.dataflow.examples.cookbook.FilterExamples.ProjectionFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Unit tests for {@link FilterExamples}. */
+@RunWith(JUnit4.class)
+public class FilterExamplesTest {
+
+ private static final TableRow row1 = new TableRow()
+ .set("month", "6").set("day", "21")
+ .set("year", "2014").set("mean_temp", "85.3")
+ .set("tornado", true);
+ private static final TableRow row2 = new TableRow()
+ .set("month", "7").set("day", "20")
+ .set("year", "2014").set("mean_temp", "75.4")
+ .set("tornado", false);
+ private static final TableRow row3 = new TableRow()
+ .set("month", "6").set("day", "18")
+ .set("year", "2014").set("mean_temp", "45.3")
+ .set("tornado", true);
+ static final TableRow[] ROWS_ARRAY = new TableRow[] {
+ row1, row2, row3
+ };
+ static final List<TableRow> ROWS = Arrays.asList(ROWS_ARRAY);
+
+ private static final TableRow outRow1 = new TableRow()
+ .set("year", 2014).set("month", 6)
+ .set("day", 21).set("mean_temp", 85.3);
+ private static final TableRow outRow2 = new TableRow()
+ .set("year", 2014).set("month", 7)
+ .set("day", 20).set("mean_temp", 75.4);
+ private static final TableRow outRow3 = new TableRow()
+ .set("year", 2014).set("month", 6)
+ .set("day", 18).set("mean_temp", 45.3);
+ private static final TableRow[] PROJROWS_ARRAY = new TableRow[] {
+ outRow1, outRow2, outRow3
+ };
+
+
+ @Test
+ public void testProjectionFn() {
+ DoFnTester<TableRow, TableRow> projectionFn =
+ DoFnTester.of(new ProjectionFn());
+ List<TableRow> results = projectionFn.processBatch(ROWS_ARRAY);
+ Assert.assertThat(results, CoreMatchers.hasItem(outRow1));
+ Assert.assertThat(results, CoreMatchers.hasItem(outRow2));
+ Assert.assertThat(results, CoreMatchers.hasItem(outRow3));
+ }
+
+ @Test
+ public void testFilterSingleMonthDataFn() {
+ DoFnTester<TableRow, TableRow> filterSingleMonthDataFn =
+ DoFnTester.of(new FilterSingleMonthDataFn(7));
+ List<TableRow> results = filterSingleMonthDataFn.processBatch(PROJROWS_ARRAY);
+ Assert.assertThat(results, CoreMatchers.hasItem(outRow2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java
new file mode 100644
index 0000000..db3ae34
--- /dev/null
+++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.examples.cookbook.JoinExamples.ExtractCountryInfoFn;
+import com.google.cloud.dataflow.examples.cookbook.JoinExamples.ExtractEventDataFn;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Unit tests for {@link JoinExamples}. */
+@RunWith(JUnit4.class)
+public class JoinExamplesTest {
+
+ private static final TableRow row1 = new TableRow()
+ .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
+ .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com");
+ private static final TableRow row2 = new TableRow()
+ .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
+ .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com");
+ private static final TableRow row3 = new TableRow()
+ .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213")
+ .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com");
+ static final TableRow[] EVENTS = new TableRow[] {
+ row1, row2, row3
+ };
+ static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
+
+ private static final KV<String, String> kv1 = KV.of("VM",
+ "Date: 20141212, Actor1: LAOS, url: http://www.chicagotribune.com");
+ private static final KV<String, String> kv2 = KV.of("BE",
+ "Date: 20141213, Actor1: AFGHANISTAN, url: http://cnn.com");
+ private static final KV<String, String> kv3 = KV.of("BE", "Belgium");
+ private static final KV<String, String> kv4 = KV.of("VM", "Vietnam");
+
+ private static final TableRow cc1 = new TableRow()
+ .set("FIPSCC", "VM").set("HumanName", "Vietnam");
+ private static final TableRow cc2 = new TableRow()
+ .set("FIPSCC", "BE").set("HumanName", "Belgium");
+ static final TableRow[] CCS = new TableRow[] {
+ cc1, cc2
+ };
+ static final List<TableRow> CC_ARRAY = Arrays.asList(CCS);
+
+ static final String[] JOINED_EVENTS = new String[] {
+ "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
+ + "url: http://www.chicagotribune.com",
+ "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
+ + "url: http://cnn.com",
+ "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
+ + "url: http://cnn.com"
+ };
+
+ @Test
+ public void testExtractEventDataFn() {
+ DoFnTester<TableRow, KV<String, String>> extractEventDataFn =
+ DoFnTester.of(new ExtractEventDataFn());
+ List<KV<String, String>> results = extractEventDataFn.processBatch(EVENTS);
+ Assert.assertThat(results, CoreMatchers.hasItem(kv1));
+ Assert.assertThat(results, CoreMatchers.hasItem(kv2));
+ }
+
+ @Test
+ public void testExtractCountryInfoFn() {
+ DoFnTester<TableRow, KV<String, String>> extractCountryInfoFn =
+ DoFnTester.of(new ExtractCountryInfoFn());
+ List<KV<String, String>> results = extractCountryInfoFn.processBatch(CCS);
+ Assert.assertThat(results, CoreMatchers.hasItem(kv3));
+ Assert.assertThat(results, CoreMatchers.hasItem(kv4));
+ }
+
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testJoin() throws java.lang.Exception {
+ Pipeline p = TestPipeline.create();
+ PCollection<TableRow> input1 = p.apply("CreateEvent", Create.of(EVENT_ARRAY));
+ PCollection<TableRow> input2 = p.apply("CreateCC", Create.of(CC_ARRAY));
+
+ PCollection<String> output = JoinExamples.joinEvents(input1, input2);
+ DataflowAssert.that(output).containsInAnyOrder(JOINED_EVENTS);
+ p.run();
+ }
+}