You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:31 UTC
[44/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
new file mode 100644
index 0000000..c61873c
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
+import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * A streaming Dataflow Example using BigQuery output.
+ *
+ * <p>This pipeline example reads lines of text from a PubSub topic, splits each line
+ * into individual words, capitalizes those words, and writes the output to
+ * a BigQuery table.
+ *
+ * <p>By default, the example will run a separate pipeline to inject the data from the default
+ * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
+ * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
+ * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
+ * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
+ * to this example.
+ *
+ * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
+ * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+public class StreamingWordExtract {
+
+ /** A DoFn that tokenizes lines of text into individual words. */
+ static class ExtractWords extends DoFn<String, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ String[] words = c.element().split("[^a-zA-Z']+");
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ /** A DoFn that uppercases a word. */
+ static class Uppercase extends DoFn<String, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().toUpperCase());
+ }
+ }
+
+ /**
+ * Converts strings into BigQuery rows.
+ */
+ static class StringToRowConverter extends DoFn<String, TableRow> {
+ /**
+ * In this example, put the whole string into single BigQuery field.
+ */
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(new TableRow().set("string_field", c.element()));
+ }
+
+ static TableSchema getSchema() {
+ return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
+ // Compose the list of TableFieldSchema from tableSchema.
+ {
+ add(new TableFieldSchema().setName("string_field").setType("STRING"));
+ }
+ });
+ }
+ }
+
+ /**
+ * Options supported by {@link StreamingWordExtract}.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ private interface StreamingWordExtractOptions
+ extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
+ @Description("Input file to inject to Pub/Sub topic")
+ @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+ String getInputFile();
+ void setInputFile(String value);
+ }
+
+ /**
+ * Sets up and starts streaming pipeline.
+ *
+ * @throws IOException if there is a problem setting up resources
+ */
+ public static void main(String[] args) throws IOException {
+ StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(StreamingWordExtractOptions.class);
+ options.setStreaming(true);
+ // In order to cancel the pipelines automatically,
+ // {@literal DataflowPipelineRunner} is forced to be used.
+ options.setRunner(DataflowPipelineRunner.class);
+
+ options.setBigQuerySchema(StringToRowConverter.getSchema());
+ DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+ dataflowUtils.setup();
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ String tableSpec = new StringBuilder()
+ .append(options.getProject()).append(":")
+ .append(options.getBigQueryDataset()).append(".")
+ .append(options.getBigQueryTable())
+ .toString();
+ pipeline
+ .apply(PubsubIO.Read.topic(options.getPubsubTopic()))
+ .apply(ParDo.of(new ExtractWords()))
+ .apply(ParDo.of(new Uppercase()))
+ .apply(ParDo.of(new StringToRowConverter()))
+ .apply(BigQueryIO.Write.to(tableSpec)
+ .withSchema(StringToRowConverter.getSchema()));
+
+ PipelineResult result = pipeline.run();
+
+ if (!options.getInputFile().isEmpty()) {
+ // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
+ dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic());
+ }
+
+ // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
+ dataflowUtils.waitToFinish(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
new file mode 100644
index 0000000..79ce823
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.GcsOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.Keys;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.util.GcsUtil;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PDone;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * An example that computes a basic TF-IDF search table for a directory or GCS prefix.
+ *
+ * <p>Concepts: joining data; side inputs; logging
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * }</pre>
+ * and a local output file or output prefix on GCS:
+ * <pre>{@code
+ * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * and an output prefix on GCS:
+ * --output=gs://YOUR_OUTPUT_PREFIX
+ * }</pre>
+ *
+ * <p>The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with
+ * {@code --input}.
+ */
+public class TfIdf {
+ /**
+ * Options supported by {@link TfIdf}.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ private static interface Options extends PipelineOptions {
+ @Description("Path to the directory or GCS prefix containing files to read from")
+ @Default.String("gs://dataflow-samples/shakespeare/")
+ String getInput();
+ void setInput(String value);
+
+ @Description("Prefix of output URI to write to")
+ @Validation.Required
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ /**
+ * Lists documents contained beneath the {@code options.input} prefix/directory.
+ */
+ public static Set<URI> listInputDocuments(Options options)
+ throws URISyntaxException, IOException {
+ URI baseUri = new URI(options.getInput());
+
+ // List all documents in the directory or GCS prefix.
+ URI absoluteUri;
+ if (baseUri.getScheme() != null) {
+ absoluteUri = baseUri;
+ } else {
+ absoluteUri = new URI(
+ "file",
+ baseUri.getAuthority(),
+ baseUri.getPath(),
+ baseUri.getQuery(),
+ baseUri.getFragment());
+ }
+
+ Set<URI> uris = new HashSet<>();
+ if (absoluteUri.getScheme().equals("file")) {
+ File directory = new File(absoluteUri);
+ for (String entry : directory.list()) {
+ File path = new File(directory, entry);
+ uris.add(path.toURI());
+ }
+ } else if (absoluteUri.getScheme().equals("gs")) {
+ GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
+ URI gcsUriGlob = new URI(
+ absoluteUri.getScheme(),
+ absoluteUri.getAuthority(),
+ absoluteUri.getPath() + "*",
+ absoluteUri.getQuery(),
+ absoluteUri.getFragment());
+ for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
+ uris.add(entry.toUri());
+ }
+ }
+
+ return uris;
+ }
+
+ /**
+ * Reads the documents at the provided uris and returns all lines
+ * from the documents tagged with which document they are from.
+ */
+ public static class ReadDocuments
+ extends PTransform<PInput, PCollection<KV<URI, String>>> {
+ private Iterable<URI> uris;
+
+ public ReadDocuments(Iterable<URI> uris) {
+ this.uris = uris;
+ }
+
+ @Override
+ public Coder<?> getDefaultOutputCoder() {
+ return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
+ }
+
+ @Override
+ public PCollection<KV<URI, String>> apply(PInput input) {
+ Pipeline pipeline = input.getPipeline();
+
+ // Create one TextIO.Read transform for each document
+ // and add its output to a PCollectionList
+ PCollectionList<KV<URI, String>> urisToLines =
+ PCollectionList.empty(pipeline);
+
+ // TextIO.Read supports:
+ // - file: URIs and paths locally
+ // - gs: URIs on the service
+ for (final URI uri : uris) {
+ String uriString;
+ if (uri.getScheme().equals("file")) {
+ uriString = new File(uri).getPath();
+ } else {
+ uriString = uri.toString();
+ }
+
+ PCollection<KV<URI, String>> oneUriToLines = pipeline
+ .apply(TextIO.Read.from(uriString)
+ .named("TextIO.Read(" + uriString + ")"))
+ .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
+
+ urisToLines = urisToLines.and(oneUriToLines);
+ }
+
+ return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
+ }
+ }
+
+ /**
+ * A transform containing a basic TF-IDF pipeline. The input consists of KV objects
+ * where the key is the document's URI and the value is a piece
+ * of the document's content. The output is mapping from terms to
+ * scores for each document URI.
+ */
+ public static class ComputeTfIdf
+ extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
+ public ComputeTfIdf() { }
+
+ @Override
+ public PCollection<KV<String, KV<URI, Double>>> apply(
+ PCollection<KV<URI, String>> uriToContent) {
+
+ // Compute the total number of documents, and
+ // prepare this singleton PCollectionView for
+ // use as a side input.
+ final PCollectionView<Long> totalDocuments =
+ uriToContent
+ .apply("GetURIs", Keys.<URI>create())
+ .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
+ .apply(Count.<URI>globally())
+ .apply(View.<Long>asSingleton());
+
+ // Create a collection of pairs mapping a URI to each
+ // of the words in the document associated with that that URI.
+ PCollection<KV<URI, String>> uriToWords = uriToContent
+ .apply(ParDo.named("SplitWords").of(
+ new DoFn<KV<URI, String>, KV<URI, String>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ String line = c.element().getValue();
+ for (String word : line.split("\\W+")) {
+ // Log INFO messages when the word “love” is found.
+ if (word.toLowerCase().equals("love")) {
+ LOG.info("Found {}", word.toLowerCase());
+ }
+
+ if (!word.isEmpty()) {
+ c.output(KV.of(uri, word.toLowerCase()));
+ }
+ }
+ }
+ }));
+
+ // Compute a mapping from each word to the total
+ // number of documents in which it appears.
+ PCollection<KV<String, Long>> wordToDocCount = uriToWords
+ .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
+ .apply(Values.<String>create())
+ .apply("CountDocs", Count.<String>perElement());
+
+ // Compute a mapping from each URI to the total
+ // number of words in the document associated with that URI.
+ PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
+ .apply("GetURIs2", Keys.<URI>create())
+ .apply("CountWords", Count.<URI>perElement());
+
+ // Count, for each (URI, word) pair, the number of
+ // occurrences of that word in the document associated
+ // with the URI.
+ PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
+ .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
+
+ // Adjust the above collection to a mapping from
+ // (URI, word) pairs to counts into an isomorphic mapping
+ // from URI to (word, count) pairs, to prepare for a join
+ // by the URI key.
+ PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
+ .apply(ParDo.named("ShiftKeys").of(
+ new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey().getKey();
+ String word = c.element().getKey().getValue();
+ Long occurrences = c.element().getValue();
+ c.output(KV.of(uri, KV.of(word, occurrences)));
+ }
+ }));
+
+ // Prepare to join the mapping of URI to (word, count) pairs with
+ // the mapping of URI to total word counts, by associating
+ // each of the input PCollection<KV<URI, ...>> with
+ // a tuple tag. Each input must have the same key type, URI
+ // in this case. The type parameter of the tuple tag matches
+ // the types of the values for each collection.
+ final TupleTag<Long> wordTotalsTag = new TupleTag<Long>();
+ final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<KV<String, Long>>();
+ KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
+ .of(wordTotalsTag, uriToWordTotal)
+ .and(wordCountsTag, uriToWordAndCount);
+
+ // Perform a CoGroupByKey (a sort of pre-join) on the prepared
+ // inputs. This yields a mapping from URI to a CoGbkResult
+ // (CoGroupByKey Result). The CoGbkResult is a mapping
+ // from the above tuple tags to the values in each input
+ // associated with a particular URI. In this case, each
+ // KV<URI, CoGbkResult> group a URI with the total number of
+ // words in that document as well as all the (word, count)
+ // pairs for particular words.
+ PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
+ .apply("CoGroupByUri", CoGroupByKey.<URI>create());
+
+ // Compute a mapping from each word to a (URI, term frequency)
+ // pair for each URI. A word's term frequency for a document
+ // is simply the number of times that word occurs in the document
+ // divided by the total number of words in the document.
+ PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
+ .apply(ParDo.named("ComputeTermFrequencies").of(
+ new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
+
+ for (KV<String, Long> wordAndCount
+ : c.element().getValue().getAll(wordCountsTag)) {
+ String word = wordAndCount.getKey();
+ Long wordCount = wordAndCount.getValue();
+ Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
+ c.output(KV.of(word, KV.of(uri, termFrequency)));
+ }
+ }
+ }));
+
+ // Compute a mapping from each word to its document frequency.
+ // A word's document frequency in a corpus is the number of
+ // documents in which the word appears divided by the total
+ // number of documents in the corpus. Note how the total number of
+ // documents is passed as a side input; the same value is
+ // presented to each invocation of the DoFn.
+ PCollection<KV<String, Double>> wordToDf = wordToDocCount
+ .apply(ParDo
+ .named("ComputeDocFrequencies")
+ .withSideInputs(totalDocuments)
+ .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Long documentCount = c.element().getValue();
+ Long documentTotal = c.sideInput(totalDocuments);
+ Double documentFrequency = documentCount.doubleValue()
+ / documentTotal.doubleValue();
+
+ c.output(KV.of(word, documentFrequency));
+ }
+ }));
+
+ // Join the term frequency and document frequency
+ // collections, each keyed on the word.
+ final TupleTag<KV<URI, Double>> tfTag = new TupleTag<KV<URI, Double>>();
+ final TupleTag<Double> dfTag = new TupleTag<Double>();
+ PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
+ .of(tfTag, wordToUriAndTf)
+ .and(dfTag, wordToDf)
+ .apply(CoGroupByKey.<String>create());
+
+ // Compute a mapping from each word to a (URI, TF-IDF) score
+ // for each URI. There are a variety of definitions of TF-IDF
+ // ("term frequency - inverse document frequency") score;
+ // here we use a basic version that is the term frequency
+ // divided by the log of the document frequency.
+ PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf
+ .apply(ParDo.named("ComputeTfIdf").of(
+ new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Double df = c.element().getValue().getOnly(dfTag);
+
+ for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
+ URI uri = uriAndTf.getKey();
+ Double tf = uriAndTf.getValue();
+ Double tfIdf = tf * Math.log(1 / df);
+ c.output(KV.of(word, KV.of(uri, tfIdf)));
+ }
+ }
+ }));
+
+ return wordToUriAndTfIdf;
+ }
+
+ // Instantiate Logger.
+ // It is suggested that the user specify the class name of the containing class
+ // (in this case ComputeTfIdf).
+ private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
+ }
+
+ /**
+ * A {@link PTransform} to write, in CSV format, a mapping from term and URI
+ * to score.
+ */
+ public static class WriteTfIdf
+ extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
+ private String output;
+
+ public WriteTfIdf(String output) {
+ this.output = output;
+ }
+
+ @Override
+ public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
+ return wordToUriAndTfIdf
+ .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(String.format("%s,\t%s,\t%f",
+ c.element().getKey(),
+ c.element().getValue().getKey(),
+ c.element().getValue().getValue()));
+ }
+ }))
+ .apply(TextIO.Write
+ .to(output)
+ .withSuffix(".csv"));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
+
+ pipeline
+ .apply(new ReadDocuments(listInputDocuments(options)))
+ .apply(new ComputeTfIdf())
+ .apply(new WriteTfIdf(options.getOutput()));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
new file mode 100644
index 0000000..f1d8d1a
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
+import com.google.cloud.dataflow.sdk.transforms.Top;
+import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.List;
+
+/**
+ * An example that reads Wikipedia edit data from Cloud Storage and computes the user with
+ * the longest string of edits separated by no more than an hour within each month.
+ *
+ * <p>Concepts: Using Windowing to perform time-based aggregations of data.
+ *
+ * <p>It is not recommended to execute this pipeline locally, given the size of the default input
+ * data.
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and an output prefix on GCS:
+ * <pre>{@code
+ * --output=gs://YOUR_OUTPUT_PREFIX
+ * }</pre>
+ *
+ * <p>The default input is {@code gs://dataflow-samples/wikipedia_edits/*.json} and can be
+ * overridden with {@code --input}.
+ *
+ * <p>The input for this example is large enough that it's a good place to enable (experimental)
+ * autoscaling:
+ * <pre>{@code
+ * --autoscalingAlgorithm=BASIC
+ * --maxNumWorkers=20
+ * }
+ * </pre>
+ * This will automatically scale the number of workers up over time until the job completes.
+ */
+public class TopWikipediaSessions {
+ private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json";
+
+ /**
+ * Extracts user and timestamp from a TableRow representing a Wikipedia edit.
+ */
+ static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
+ int timestamp = (Integer) row.get("timestamp");
+ String userName = (String) row.get("contributor_username");
+ if (userName != null) {
+ // Sets the implicit timestamp field to be used in windowing.
+ c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+ }
+ }
+ }
+
+ /**
+ * Computes the number of edits in each user session. A session is defined as
+ * a string of edits where each is separated from the next by less than an hour.
+ */
+ static class ComputeSessions
+ extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
+ @Override
+ public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
+ return actions
+ .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
+
+ .apply(Count.<String>perElement());
+ }
+ }
+
+ /**
+ * Computes the longest session ending in each month.
+ */
+ private static class TopPerMonth
+ extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
+ @Override
+ public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
+ return sessions
+ .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
+
+ .apply(Top.of(1, new SerializableComparator<KV<String, Long>>() {
+ @Override
+ public int compare(KV<String, Long> o1, KV<String, Long> o2) {
+ return Long.compare(o1.getValue(), o2.getValue());
+ }
+ }).withoutDefaults());
+ }
+ }
+
+ static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>>
+ implements RequiresWindowAccess {
+
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(
+ c.element().getKey() + " : " + c.window(), c.element().getValue()));
+ }
+ }
+
+ static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String>
+ implements RequiresWindowAccess {
+ @Override
+ public void processElement(ProcessContext c) {
+ for (KV<String, Long> item : c.element()) {
+ String session = item.getKey();
+ long count = item.getValue();
+ c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start());
+ }
+ }
+ }
+
+ static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> {
+
+ private final double samplingThreshold;
+
+ public ComputeTopSessions(double samplingThreshold) {
+ this.samplingThreshold = samplingThreshold;
+ }
+
+ @Override
+ public PCollection<String> apply(PCollection<TableRow> input) {
+ return input
+ .apply(ParDo.of(new ExtractUserAndTimestamp()))
+
+ .apply(ParDo.named("SampleUsers").of(
+ new DoFn<String, String>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
+ c.output(c.element());
+ }
+ }
+ }))
+
+ .apply(new ComputeSessions())
+
+ .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
+ .apply(new TopPerMonth())
+ .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
+ }
+ }
+
+ /**
+ * Options supported by this class.
+ *
+ * <p>Inherits standard Dataflow configuration options.
+ */
+ private static interface Options extends PipelineOptions {
+ @Description(
+ "Input specified as a GCS path containing a BigQuery table exported as json")
+ @Default.String(EXPORTED_WIKI_TABLE)
+ String getInput();
+ void setInput(String value);
+
+ @Description("File to output results to")
+ @Validation.Required
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ public static void main(String[] args) {
+ Options options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(Options.class);
+ DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+
+ Pipeline p = Pipeline.create(dataflowOptions);
+
+ double samplingThreshold = 0.1;
+
+ p.apply(TextIO.Read
+ .from(options.getInput())
+ .withCoder(TableRowJsonCoder.of()))
+ .apply(new ComputeTopSessions(samplingThreshold))
+ .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
+
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
new file mode 100644
index 0000000..2c3c857
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
+import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Strings;
+
+import org.apache.avro.reflect.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
+ * You can configure the running mode by setting {@literal --streaming} to true or false.
+ *
+ * <p>Concepts: The batch and streaming runners, sliding windows, Google Cloud Pub/Sub
+ * topic injection, use of the AvroCoder to encode a custom class, and custom Combine transforms.
+ *
+ * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
+ * it finds the lane that had the highest flow recorded, for each sensor station. It writes
+ * those max values along with auxiliary info to a BigQuery table.
+ *
+ * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
+ *
+ * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
+ * By default, the example will run a separate pipeline to inject the data from the default
+ * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
+ * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
+ * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
+ * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
+ * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
+ * is provided in
+ * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
+ *
+ * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
+ * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+public class TrafficMaxLaneFlow {
+
+ private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+ private static final Integer VALID_INPUTS = 4999;
+
+ static final int WINDOW_DURATION = 60; // Default sliding window duration in minutes
+ static final int WINDOW_SLIDE_EVERY = 5; // Default window 'slide every' setting in minutes
+
+ /**
+ * This class holds information about each lane in a station reading, along with some general
+ * information from the reading.
+ */
+ @DefaultCoder(AvroCoder.class)
+ static class LaneInfo {
+ @Nullable String stationId;
+ @Nullable String lane;
+ @Nullable String direction;
+ @Nullable String freeway;
+ @Nullable String recordedTimestamp;
+ @Nullable Integer laneFlow;
+ @Nullable Integer totalFlow;
+ @Nullable Double laneAO;
+ @Nullable Double laneAS;
+
+ public LaneInfo() {}
+
+ public LaneInfo(String stationId, String lane, String direction, String freeway,
+ String timestamp, Integer laneFlow, Double laneAO,
+ Double laneAS, Integer totalFlow) {
+ this.stationId = stationId;
+ this.lane = lane;
+ this.direction = direction;
+ this.freeway = freeway;
+ this.recordedTimestamp = timestamp;
+ this.laneFlow = laneFlow;
+ this.laneAO = laneAO;
+ this.laneAS = laneAS;
+ this.totalFlow = totalFlow;
+ }
+
+ public String getStationId() {
+ return this.stationId;
+ }
+ public String getLane() {
+ return this.lane;
+ }
+ public String getDirection() {
+ return this.direction;
+ }
+ public String getFreeway() {
+ return this.freeway;
+ }
+ public String getRecordedTimestamp() {
+ return this.recordedTimestamp;
+ }
+ public Integer getLaneFlow() {
+ return this.laneFlow;
+ }
+ public Double getLaneAO() {
+ return this.laneAO;
+ }
+ public Double getLaneAS() {
+ return this.laneAS;
+ }
+ public Integer getTotalFlow() {
+ return this.totalFlow;
+ }
+ }
+
+ /**
+ * Extract the timestamp field from the input string, and use it as the element timestamp.
+ */
+ static class ExtractTimestamps extends DoFn<String, String> {
+ private static final DateTimeFormatter dateTimeFormat =
+ DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
+
+ @Override
+ public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+ String[] items = c.element().split(",");
+ if (items.length > 0) {
+ try {
+ String timestamp = items[0];
+ c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
+ } catch (IllegalArgumentException e) {
+ // Skip the invalid input.
+ }
+ }
+ }
+ }
+
+ /**
+ * Extract flow information for each of the 8 lanes in a reading, and output as separate tuples.
+ * This will let us determine which lane has the max flow for that station over the span of the
+ * window, and output not only the max flow from that calculation, but other associated
+ * information. The number of lanes for which data is present depends upon which freeway the data
+ * point comes from.
+ */
+ static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String[] items = c.element().split(",");
+ if (items.length < 48) {
+ // Skip the invalid input.
+ return;
+ }
+ // extract the sensor information for the lanes from the input string fields.
+ String timestamp = items[0];
+ String stationId = items[1];
+ String freeway = items[2];
+ String direction = items[3];
+ Integer totalFlow = tryIntParse(items[7]);
+ for (int i = 1; i <= 8; ++i) {
+ Integer laneFlow = tryIntParse(items[6 + 5 * i]);
+ Double laneAvgOccupancy = tryDoubleParse(items[7 + 5 * i]);
+ Double laneAvgSpeed = tryDoubleParse(items[8 + 5 * i]);
+ if (laneFlow == null || laneAvgOccupancy == null || laneAvgSpeed == null) {
+ return;
+ }
+ LaneInfo laneInfo = new LaneInfo(stationId, "lane" + i, direction, freeway, timestamp,
+ laneFlow, laneAvgOccupancy, laneAvgSpeed, totalFlow);
+ c.output(KV.of(stationId, laneInfo));
+ }
+ }
+ }
+
+ /**
+ * A custom 'combine function' used with the Combine.perKey transform. Used to find the max lane
+ * flow over all the data points in the Window. Extracts the lane flow from the input string and
+ * determines whether it's the max seen so far. We're using a custom combiner instead of the Max
+ * transform because we want to retain the additional information we've associated with the flow
+ * value.
+ */
+ public static class MaxFlow implements SerializableFunction<Iterable<LaneInfo>, LaneInfo> {
+ @Override
+ public LaneInfo apply(Iterable<LaneInfo> input) {
+ Integer max = 0;
+ LaneInfo maxInfo = new LaneInfo();
+ for (LaneInfo item : input) {
+ Integer flow = item.getLaneFlow();
+ if (flow != null && (flow >= max)) {
+ max = flow;
+ maxInfo = item;
+ }
+ }
+ return maxInfo;
+ }
+ }
+
+ /**
+ * Format the results of the Max Lane flow calculation to a TableRow, to save to BigQuery.
+ * Add the timestamp from the window context.
+ */
+ static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> {
+ @Override
+ public void processElement(ProcessContext c) {
+
+ LaneInfo laneInfo = c.element().getValue();
+ TableRow row = new TableRow()
+ .set("station_id", c.element().getKey())
+ .set("direction", laneInfo.getDirection())
+ .set("freeway", laneInfo.getFreeway())
+ .set("lane_max_flow", laneInfo.getLaneFlow())
+ .set("lane", laneInfo.getLane())
+ .set("avg_occ", laneInfo.getLaneAO())
+ .set("avg_speed", laneInfo.getLaneAS())
+ .set("total_flow", laneInfo.getTotalFlow())
+ .set("recorded_timestamp", laneInfo.getRecordedTimestamp())
+ .set("window_timestamp", c.timestamp().toString());
+ c.output(row);
+ }
+
+ /** Defines the BigQuery schema used for the output. */
+ static TableSchema getSchema() {
+ List<TableFieldSchema> fields = new ArrayList<>();
+ fields.add(new TableFieldSchema().setName("station_id").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("direction").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("lane_max_flow").setType("INTEGER"));
+ fields.add(new TableFieldSchema().setName("lane").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("avg_occ").setType("FLOAT"));
+ fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
+ fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
+ fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
+ fields.add(new TableFieldSchema().setName("recorded_timestamp").setType("STRING"));
+ TableSchema schema = new TableSchema().setFields(fields);
+ return schema;
+ }
+ }
+
+ /**
+ * This PTransform extracts lane info, calculates the max lane flow found for a given station (for
+ * the current Window) using a custom 'combiner', and formats the results for BigQuery.
+ */
+ static class MaxLaneFlow
+ extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> {
+ @Override
+ public PCollection<TableRow> apply(PCollection<KV<String, LaneInfo>> flowInfo) {
+ // stationId, LaneInfo => stationId + max lane flow info
+ PCollection<KV<String, LaneInfo>> flowMaxes =
+ flowInfo.apply(Combine.<String, LaneInfo>perKey(
+ new MaxFlow()));
+
+ // <stationId, max lane flow info>... => row...
+ PCollection<TableRow> results = flowMaxes.apply(
+ ParDo.of(new FormatMaxesFn()));
+
+ return results;
+ }
+ }
+
+ static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
+ private final String inputFile;
+
+ public ReadFileAndExtractTimestamps(String inputFile) {
+ this.inputFile = inputFile;
+ }
+
+ @Override
+ public PCollection<String> apply(PBegin begin) {
+ return begin
+ .apply(TextIO.Read.from(inputFile))
+ .apply(ParDo.of(new ExtractTimestamps()));
+ }
+ }
+
+ /**
+ * Options supported by {@link TrafficMaxLaneFlow}.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions,
+ ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
+ @Description("Input file to inject to Pub/Sub topic")
+ @Default.String("gs://dataflow-samples/traffic_sensor/"
+ + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
+ String getInputFile();
+ void setInputFile(String value);
+
+ @Description("Numeric value of sliding window duration, in minutes")
+ @Default.Integer(WINDOW_DURATION)
+ Integer getWindowDuration();
+ void setWindowDuration(Integer value);
+
+ @Description("Numeric value of window 'slide every' setting, in minutes")
+ @Default.Integer(WINDOW_SLIDE_EVERY)
+ Integer getWindowSlideEvery();
+ void setWindowSlideEvery(Integer value);
+
+ @Description("Whether to run the pipeline with unbounded input")
+ @Default.Boolean(false)
+ boolean isUnbounded();
+ void setUnbounded(boolean value);
+ }
+
+ /**
+ * Sets up and starts streaming pipeline.
+ *
+ * @throws IOException if there is a problem setting up resources
+ */
+ public static void main(String[] args) throws IOException {
+ TrafficMaxLaneFlowOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(TrafficMaxLaneFlowOptions.class);
+ options.setBigQuerySchema(FormatMaxesFn.getSchema());
+ // Using DataflowExampleUtils to set up required resources.
+ DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
+
+ Pipeline pipeline = Pipeline.create(options);
+ TableReference tableRef = new TableReference();
+ tableRef.setProjectId(options.getProject());
+ tableRef.setDatasetId(options.getBigQueryDataset());
+ tableRef.setTableId(options.getBigQueryTable());
+
+ PCollection<String> input;
+ if (options.isUnbounded()) {
+ // Read unbounded PubSubIO.
+ input = pipeline.apply(PubsubIO.Read
+ .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+ .subscription(options.getPubsubSubscription()));
+ } else {
+ // Read bounded PubSubIO.
+ input = pipeline.apply(PubsubIO.Read
+ .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+ .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
+
+ // To read bounded TextIO files, use:
+ // input = pipeline.apply(new ReadFileAndExtractTimestamps(options.getInputFile()));
+ }
+ input
+ // row... => <station route, station speed> ...
+ .apply(ParDo.of(new ExtractFlowInfoFn()))
+ // map the incoming data stream into sliding windows. The default window duration values
+ // work well if you're running the accompanying Pub/Sub generator script with the
+ // --replay flag, which simulates pauses in the sensor data publication. You may want to
+ // adjust them otherwise.
+ .apply(Window.<KV<String, LaneInfo>>into(SlidingWindows.of(
+ Duration.standardMinutes(options.getWindowDuration())).
+ every(Duration.standardMinutes(options.getWindowSlideEvery()))))
+ .apply(new MaxLaneFlow())
+ .apply(BigQueryIO.Write.to(tableRef)
+ .withSchema(FormatMaxesFn.getSchema()));
+
+ // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
+ if (!Strings.isNullOrEmpty(options.getInputFile())
+ && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
+ dataflowUtils.runInjectorPipeline(
+ new ReadFileAndExtractTimestamps(options.getInputFile()),
+ options.getPubsubTopic(),
+ PUBSUB_TIMESTAMP_LABEL_KEY);
+ }
+
+ // Run the pipeline.
+ PipelineResult result = pipeline.run();
+
+ // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
+ dataflowUtils.waitToFinish(result);
+ }
+
+ private static Integer tryIntParse(String number) {
+ try {
+ return Integer.parseInt(number);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ private static Double tryDoubleParse(String number) {
+ try {
+ return Double.parseDouble(number);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
new file mode 100644
index 0000000..b1c72e6
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
+import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.avro.reflect.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
+ * You can configure the running mode by setting {@literal --streaming} to true or false.
+ *
+ * <p>Concepts: The batch and streaming runners, GroupByKey, sliding windows, and
+ * Google Cloud Pub/Sub topic injection.
+ *
+ * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
+ * it calculates the average speed over the window for some small set of predefined 'routes',
+ * and looks for 'slowdowns' in those routes. It writes its results to a BigQuery table.
+ *
+ * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
+ *
+ * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
+ * By default, the example will run a separate pipeline to inject the data from the default
+ * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
+ * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
+ * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
+ * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
+ * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
+ * is provided in
+ * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
+ *
+ * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
+ * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+
+public class TrafficRoutes {
+
+ private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+ private static final Integer VALID_INPUTS = 4999;
+
+ // Instantiate some small predefined San Diego routes to analyze
+ static Map<String, String> sdStations = buildStationInfo();
+ static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes
+ static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes
+
+ /**
+ * This class holds information about a station reading's average speed.
+ */
+ @DefaultCoder(AvroCoder.class)
+ static class StationSpeed implements Comparable<StationSpeed> {
+ @Nullable String stationId;
+ @Nullable Double avgSpeed;
+ @Nullable Long timestamp;
+
+ public StationSpeed() {}
+
+ public StationSpeed(String stationId, Double avgSpeed, Long timestamp) {
+ this.stationId = stationId;
+ this.avgSpeed = avgSpeed;
+ this.timestamp = timestamp;
+ }
+
+ public String getStationId() {
+ return this.stationId;
+ }
+ public Double getAvgSpeed() {
+ return this.avgSpeed;
+ }
+
+ @Override
+ public int compareTo(StationSpeed other) {
+ return Long.compare(this.timestamp, other.timestamp);
+ }
+ }
+
+ /**
+ * This class holds information about a route's speed/slowdown.
+ */
+ @DefaultCoder(AvroCoder.class)
+ static class RouteInfo {
+ @Nullable String route;
+ @Nullable Double avgSpeed;
+ @Nullable Boolean slowdownEvent;
+
+
+ public RouteInfo() {}
+
+ public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) {
+ this.route = route;
+ this.avgSpeed = avgSpeed;
+ this.slowdownEvent = slowdownEvent;
+ }
+
+ public String getRoute() {
+ return this.route;
+ }
+ public Double getAvgSpeed() {
+ return this.avgSpeed;
+ }
+ public Boolean getSlowdownEvent() {
+ return this.slowdownEvent;
+ }
+ }
+
+ /**
+ * Extract the timestamp field from the input string, and use it as the element timestamp.
+ */
+ static class ExtractTimestamps extends DoFn<String, String> {
+ private static final DateTimeFormatter dateTimeFormat =
+ DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
+
+ @Override
+ public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+ String[] items = c.element().split(",");
+ String timestamp = tryParseTimestamp(items);
+ if (timestamp != null) {
+ try {
+ c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
+ } catch (IllegalArgumentException e) {
+ // Skip the invalid input.
+ }
+ }
+ }
+ }
+
+ /**
+ * Filter out readings for the stations along predefined 'routes', and output
+ * (station, speed info) keyed on route.
+ */
+ static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> {
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String[] items = c.element().split(",");
+ String stationType = tryParseStationType(items);
+ // For this analysis, use only 'main line' station types
+ if (stationType != null && stationType.equals("ML")) {
+ Double avgSpeed = tryParseAvgSpeed(items);
+ String stationId = tryParseStationId(items);
+ // For this simple example, filter out everything but some hardwired routes.
+ if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
+ StationSpeed stationSpeed =
+ new StationSpeed(stationId, avgSpeed, c.timestamp().getMillis());
+ // The tuple key is the 'route' name stored in the 'sdStations' hash.
+ KV<String, StationSpeed> outputValue = KV.of(sdStations.get(stationId), stationSpeed);
+ c.output(outputValue);
+ }
+ }
+ }
+ }
+
+ /**
+ * For a given route, track average speed for the window. Calculate whether
+ * traffic is currently slowing down, via a predefined threshold. If a supermajority of
+ * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
+ * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
+ */
+ static class GatherStats
+ extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
+ @Override
+ public void processElement(ProcessContext c) throws IOException {
+ String route = c.element().getKey();
+ double speedSum = 0.0;
+ int speedCount = 0;
+ int speedups = 0;
+ int slowdowns = 0;
+ List<StationSpeed> infoList = Lists.newArrayList(c.element().getValue());
+ // StationSpeeds sort by embedded timestamp.
+ Collections.sort(infoList);
+ Map<String, Double> prevSpeeds = new HashMap<>();
+ // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
+ for (StationSpeed item : infoList) {
+ Double speed = item.getAvgSpeed();
+ if (speed != null) {
+ speedSum += speed;
+ speedCount++;
+ Double lastSpeed = prevSpeeds.get(item.getStationId());
+ if (lastSpeed != null) {
+ if (lastSpeed < speed) {
+ speedups += 1;
+ } else {
+ slowdowns += 1;
+ }
+ }
+ prevSpeeds.put(item.getStationId(), speed);
+ }
+ }
+ if (speedCount == 0) {
+ // No average to compute.
+ return;
+ }
+ double speedAvg = speedSum / speedCount;
+ boolean slowdownEvent = slowdowns >= 2 * speedups;
+ RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
+ c.output(KV.of(route, routeInfo));
+ }
+ }
+
+ /**
+ * Format the results of the slowdown calculations to a TableRow, to save to BigQuery.
+ */
+ static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> {
+ @Override
+ public void processElement(ProcessContext c) {
+ RouteInfo routeInfo = c.element().getValue();
+ TableRow row = new TableRow()
+ .set("avg_speed", routeInfo.getAvgSpeed())
+ .set("slowdown_event", routeInfo.getSlowdownEvent())
+ .set("route", c.element().getKey())
+ .set("window_timestamp", c.timestamp().toString());
+ c.output(row);
+ }
+
+ /**
+ * Defines the BigQuery schema used for the output.
+ */
+ static TableSchema getSchema() {
+ List<TableFieldSchema> fields = new ArrayList<>();
+ fields.add(new TableFieldSchema().setName("route").setType("STRING"));
+ fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
+ fields.add(new TableFieldSchema().setName("slowdown_event").setType("BOOLEAN"));
+ fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
+ TableSchema schema = new TableSchema().setFields(fields);
+ return schema;
+ }
+ }
+
+ /**
+ * This PTransform extracts speed info from traffic station readings.
+ * It groups the readings by 'route' and analyzes traffic slowdown for that route.
+ * Lastly, it formats the results for BigQuery.
+ */
+ static class TrackSpeed extends
+ PTransform<PCollection<KV<String, StationSpeed>>, PCollection<TableRow>> {
+ @Override
+ public PCollection<TableRow> apply(PCollection<KV<String, StationSpeed>> stationSpeed) {
+ // Apply a GroupByKey transform to collect a list of all station
+ // readings for a given route.
+ PCollection<KV<String, Iterable<StationSpeed>>> timeGroup = stationSpeed.apply(
+ GroupByKey.<String, StationSpeed>create());
+
+ // Analyze 'slowdown' over the route readings.
+ PCollection<KV<String, RouteInfo>> stats = timeGroup.apply(ParDo.of(new GatherStats()));
+
+ // Format the results for writing to BigQuery
+ PCollection<TableRow> results = stats.apply(
+ ParDo.of(new FormatStatsFn()));
+
+ return results;
+ }
+ }
+
+ static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
+ private final String inputFile;
+
+ public ReadFileAndExtractTimestamps(String inputFile) {
+ this.inputFile = inputFile;
+ }
+
+ @Override
+ public PCollection<String> apply(PBegin begin) {
+ return begin
+ .apply(TextIO.Read.from(inputFile))
+ .apply(ParDo.of(new ExtractTimestamps()));
+ }
+ }
+
+ /**
+ * Options supported by {@link TrafficRoutes}.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ private interface TrafficRoutesOptions extends DataflowExampleOptions,
+ ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
+ @Description("Input file to inject to Pub/Sub topic")
+ @Default.String("gs://dataflow-samples/traffic_sensor/"
+ + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
+ String getInputFile();
+ void setInputFile(String value);
+
+ @Description("Numeric value of sliding window duration, in minutes")
+ @Default.Integer(WINDOW_DURATION)
+ Integer getWindowDuration();
+ void setWindowDuration(Integer value);
+
+ @Description("Numeric value of window 'slide every' setting, in minutes")
+ @Default.Integer(WINDOW_SLIDE_EVERY)
+ Integer getWindowSlideEvery();
+ void setWindowSlideEvery(Integer value);
+
+ @Description("Whether to run the pipeline with unbounded input")
+ @Default.Boolean(false)
+ boolean isUnbounded();
+ void setUnbounded(boolean value);
+ }
+
+ /**
+ * Sets up and starts streaming pipeline.
+ *
+ * @throws IOException if there is a problem setting up resources
+ */
+ public static void main(String[] args) throws IOException {
+ TrafficRoutesOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(TrafficRoutesOptions.class);
+
+ options.setBigQuerySchema(FormatStatsFn.getSchema());
+ // Using DataflowExampleUtils to set up required resources.
+ DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
+
+ Pipeline pipeline = Pipeline.create(options);
+ TableReference tableRef = new TableReference();
+ tableRef.setProjectId(options.getProject());
+ tableRef.setDatasetId(options.getBigQueryDataset());
+ tableRef.setTableId(options.getBigQueryTable());
+
+ PCollection<String> input;
+ if (options.isUnbounded()) {
+ // Read unbounded PubSubIO.
+ input = pipeline.apply(PubsubIO.Read
+ .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+ .subscription(options.getPubsubSubscription()));
+ } else {
+ // Read bounded PubSubIO.
+ input = pipeline.apply(PubsubIO.Read
+ .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+ .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
+
+ // To read bounded TextIO files, use:
+ // input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
+ // .apply(ParDo.of(new ExtractTimestamps()));
+ }
+ input
+ // row... => <station route, station speed> ...
+ .apply(ParDo.of(new ExtractStationSpeedFn()))
+ // map the incoming data stream into sliding windows.
+ // The default window duration values work well if you're running the accompanying Pub/Sub
+ // generator script without the --replay flag, so that there are no simulated pauses in
+ // the sensor data publication. You may want to adjust the values otherwise.
+ .apply(Window.<KV<String, StationSpeed>>into(SlidingWindows.of(
+ Duration.standardMinutes(options.getWindowDuration())).
+ every(Duration.standardMinutes(options.getWindowSlideEvery()))))
+ .apply(new TrackSpeed())
+ .apply(BigQueryIO.Write.to(tableRef)
+ .withSchema(FormatStatsFn.getSchema()));
+
+ // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
+ if (!Strings.isNullOrEmpty(options.getInputFile())
+ && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
+ dataflowUtils.runInjectorPipeline(
+ new ReadFileAndExtractTimestamps(options.getInputFile()),
+ options.getPubsubTopic(),
+ PUBSUB_TIMESTAMP_LABEL_KEY);
+ }
+
+ // Run the pipeline.
+ PipelineResult result = pipeline.run();
+
+ // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
+ dataflowUtils.waitToFinish(result);
+ }
+
+ private static Double tryParseAvgSpeed(String[] inputItems) {
+ try {
+ return Double.parseDouble(tryParseString(inputItems, 9));
+ } catch (NumberFormatException e) {
+ return null;
+ } catch (NullPointerException e) {
+ return null;
+ }
+ }
+
+ private static String tryParseStationType(String[] inputItems) {
+ return tryParseString(inputItems, 4);
+ }
+
+ private static String tryParseStationId(String[] inputItems) {
+ return tryParseString(inputItems, 1);
+ }
+
+ private static String tryParseTimestamp(String[] inputItems) {
+ return tryParseString(inputItems, 0);
+ }
+
+ private static String tryParseString(String[] inputItems, int index) {
+ return inputItems.length >= index ? inputItems[index] : null;
+ }
+
+ /**
+ * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
+ */
+ private static Map<String, String> buildStationInfo() {
+ Map<String, String> stations = new Hashtable<String, String>();
+ stations.put("1108413", "SDRoute1"); // from freeway 805 S
+ stations.put("1108699", "SDRoute2"); // from freeway 78 E
+ stations.put("1108702", "SDRoute2");
+ return stations;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
new file mode 100644
index 0000000..e5fd015
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An example that reads the public samples of weather data from BigQuery, counts the number of
+ * tornadoes that occur in each month, and writes the results to BigQuery.
+ *
+ * <p>Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms
+ *
+ * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and the BigQuery table for the output, with the form
+ * <pre>{@code
+ * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ * --project=YOUR_PROJECT_ID
+ * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ * --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations}
+ * and can be overridden with {@code --input}.
+ */
+public class BigQueryTornadoes {
+ // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+ private static final String WEATHER_SAMPLES_TABLE =
+ "clouddataflow-readonly:samples.weather_stations";
+
+ /**
+ * Examines each row in the input table. If a tornado was recorded
+ * in that sample, the month in which it occurred is output.
+ */
+ static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
+ @Override
+ public void processElement(ProcessContext c){
+ TableRow row = c.element();
+ if ((Boolean) row.get("tornado")) {
+ c.output(Integer.parseInt((String) row.get("month")));
+ }
+ }
+ }
+
+ /**
+ * Prepares the data for writing to BigQuery by building a TableRow object containing an
+ * integer representation of month and the number of tornadoes that occurred in each month.
+ */
+ static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
+ @Override
+ public void processElement(ProcessContext c) {
+ TableRow row = new TableRow()
+ .set("month", c.element().getKey())
+ .set("tornado_count", c.element().getValue());
+ c.output(row);
+ }
+ }
+
+ /**
+ * Takes rows from a table and generates a table of counts.
+ *
+ * <p>The input schema is described by
+ * https://developers.google.com/bigquery/docs/dataset-gsod .
+ * The output contains the total number of tornadoes found in each month in
+ * the following schema:
+ * <ul>
+ * <li>month: integer</li>
+ * <li>tornado_count: integer</li>
+ * </ul>
+ */
+ static class CountTornadoes
+ extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
+ @Override
+ public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+
+ // row... => month...
+ PCollection<Integer> tornadoes = rows.apply(
+ ParDo.of(new ExtractTornadoesFn()));
+
+ // month... => <month,count>...
+ PCollection<KV<Integer, Long>> tornadoCounts =
+ tornadoes.apply(Count.<Integer>perElement());
+
+ // <month,count>... => row...
+ PCollection<TableRow> results = tornadoCounts.apply(
+ ParDo.of(new FormatCountsFn()));
+
+ return results;
+ }
+ }
+
+ /**
+ * Options supported by {@link BigQueryTornadoes}.
+ *
+ * <p>Inherits standard configuration options.
+ */
+ private static interface Options extends PipelineOptions {
+ @Description("Table to read from, specified as "
+ + "<project_id>:<dataset_id>.<table_id>")
+ @Default.String(WEATHER_SAMPLES_TABLE)
+ String getInput();
+ void setInput(String value);
+
+ @Description("BigQuery table to write to, specified as "
+ + "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
+ @Validation.Required
+ String getOutput();
+ void setOutput(String value);
+ }
+
+ public static void main(String[] args) {
+ Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ // Build the table schema for the output table.
+ List<TableFieldSchema> fields = new ArrayList<>();
+ fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
+ fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
+ TableSchema schema = new TableSchema().setFields(fields);
+
+ p.apply(BigQueryIO.Read.from(options.getInput()))
+ .apply(new CountTornadoes())
+ .apply(BigQueryIO.Write
+ .to(options.getOutput())
+ .withSchema(schema)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+
+ p.run();
+ }
+}