You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:19 UTC
[55/67] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java
deleted file mode 100644
index 4bedf31..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.common;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * Options that can be used to configure Pub/Sub topic in Dataflow examples.
- */
-public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions {
- @Description("Pub/Sub topic")
- @Default.InstanceFactory(PubsubTopicFactory.class)
- String getPubsubTopic();
- void setPubsubTopic(String topic);
-
- /**
- * Returns a default Pub/Sub topic based on the project and the job names.
- */
- static class PubsubTopicFactory implements DefaultValueFactory<String> {
- @Override
- public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowPipelineOptions =
- options.as(DataflowPipelineOptions.class);
- return "projects/" + dataflowPipelineOptions.getProject()
- + "/topics/" + dataflowPipelineOptions.getJobName();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java
deleted file mode 100644
index 4a82ae6..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.common;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
-import com.google.cloud.dataflow.sdk.util.Transport;
-import com.google.common.collect.ImmutableMap;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * A batch Dataflow pipeline for injecting a set of GCS files into
- * a PubSub topic line by line. Empty lines are skipped.
- *
- * <p>This is useful for testing streaming
- * pipelines. Note that since batch pipelines might retry chunks, this
- * does _not_ guarantee exactly-once injection of file data. Some lines may
- * be published multiple times.
- * </p>
- */
-public class PubsubFileInjector {
-
- /**
- * An incomplete {@code PubsubFileInjector} transform with unbound output topic.
- */
- public static class Unbound {
- private final String timestampLabelKey;
-
- Unbound() {
- this.timestampLabelKey = null;
- }
-
- Unbound(String timestampLabelKey) {
- this.timestampLabelKey = timestampLabelKey;
- }
-
- Unbound withTimestampLabelKey(String timestampLabelKey) {
- return new Unbound(timestampLabelKey);
- }
-
- public Bound publish(String outputTopic) {
- return new Bound(outputTopic, timestampLabelKey);
- }
- }
-
- /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */
- public static class Bound extends DoFn<String, Void> {
- private final String outputTopic;
- private final String timestampLabelKey;
- public transient Pubsub pubsub;
-
- public Bound(String outputTopic, String timestampLabelKey) {
- this.outputTopic = outputTopic;
- this.timestampLabelKey = timestampLabelKey;
- }
-
- @Override
- public void startBundle(Context context) {
- this.pubsub =
- Transport.newPubsubClient(context.getPipelineOptions().as(DataflowPipelineOptions.class))
- .build();
- }
-
- @Override
- public void processElement(ProcessContext c) throws IOException {
- if (c.element().isEmpty()) {
- return;
- }
- PubsubMessage pubsubMessage = new PubsubMessage();
- pubsubMessage.encodeData(c.element().getBytes());
- if (timestampLabelKey != null) {
- pubsubMessage.setAttributes(
- ImmutableMap.of(timestampLabelKey, Long.toString(c.timestamp().getMillis())));
- }
- PublishRequest publishRequest = new PublishRequest();
- publishRequest.setMessages(Arrays.asList(pubsubMessage));
- this.pubsub.projects().topics().publish(outputTopic, publishRequest).execute();
- }
- }
-
- /**
- * Creates a {@code PubsubFileInjector} transform with the given timestamp label key.
- */
- public static Unbound withTimestampLabelKey(String timestampLabelKey) {
- return new Unbound(timestampLabelKey);
- }
-
- /**
- * Creates a {@code PubsubFileInjector} transform that publishes to the given output topic.
- */
- public static Bound publish(String outputTopic) {
- return new Unbound().publish(outputTopic);
- }
-
- /**
- * Command line parameter options.
- */
- private interface PubsubFileInjectorOptions extends PipelineOptions {
- @Description("GCS location of files.")
- @Validation.Required
- String getInput();
- void setInput(String value);
-
- @Description("Topic to publish on.")
- @Validation.Required
- String getOutputTopic();
- void setOutputTopic(String value);
- }
-
- /**
- * Sets up and starts streaming pipeline.
- */
- public static void main(String[] args) {
- PubsubFileInjectorOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(PubsubFileInjectorOptions.class);
-
- Pipeline pipeline = Pipeline.create(options);
-
- pipeline
- .apply(TextIO.Read.from(options.getInput()))
- .apply(IntraBundleParallelization.of(PubsubFileInjector.publish(options.getOutputTopic()))
- .withMaxParallelism(20));
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
deleted file mode 100644
index f897338..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
+++ /dev/null
@@ -1,516 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Value;
-import com.google.api.services.datastore.client.DatastoreHelper;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.DatastoreIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Filter;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Partition;
-import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Duration;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * An example that computes the most popular hash tags
- * for every prefix, which can be used for auto-completion.
- *
- * <p>Concepts: Using the same pipeline in both streaming and batch, combiners,
- * composite transforms.
- *
- * <p>To execute this pipeline using the Dataflow service in batch mode,
- * specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=DataflowPipelineRunner
- * --inputFile=gs://path/to/input*.txt
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service in streaming mode,
- * specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=DataflowPipelineRunner
- * --inputFile=gs://YOUR_INPUT_DIRECTORY/*.txt
- * --streaming
- * }</pre>
- *
- * <p>This will update the datastore every 10 seconds based on the last
- * 30 minutes of data received.
- */
-public class AutoComplete {
-
- /**
- * A PTransform that takes as input a list of tokens and returns
- * the most common tokens per prefix.
- */
- public static class ComputeTopCompletions
- extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> {
- private final int candidatesPerPrefix;
- private final boolean recursive;
-
- protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) {
- this.candidatesPerPrefix = candidatesPerPrefix;
- this.recursive = recursive;
- }
-
- public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) {
- return new ComputeTopCompletions(candidatesPerPrefix, recursive);
- }
-
- @Override
- public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
- PCollection<CompletionCandidate> candidates = input
- // First count how often each token appears.
- .apply(new Count.PerElement<String>())
-
- // Map the KV outputs of Count into our own CompletionCandiate class.
- .apply(ParDo.named("CreateCompletionCandidates").of(
- new DoFn<KV<String, Long>, CompletionCandidate>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(new CompletionCandidate(c.element().getKey(), c.element().getValue()));
- }
- }));
-
- // Compute the top via either a flat or recursive algorithm.
- if (recursive) {
- return candidates
- .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
- .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
- } else {
- return candidates
- .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
- }
- }
- }
-
- /**
- * Lower latency, but more expensive.
- */
- private static class ComputeTopFlat
- extends PTransform<PCollection<CompletionCandidate>,
- PCollection<KV<String, List<CompletionCandidate>>>> {
- private final int candidatesPerPrefix;
- private final int minPrefix;
-
- public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
- this.candidatesPerPrefix = candidatesPerPrefix;
- this.minPrefix = minPrefix;
- }
-
- @Override
- public PCollection<KV<String, List<CompletionCandidate>>> apply(
- PCollection<CompletionCandidate> input) {
- return input
- // For each completion candidate, map it to all prefixes.
- .apply(ParDo.of(new AllPrefixes(minPrefix)))
-
- // Find and return the top candiates for each prefix.
- .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)
- .withHotKeyFanout(new HotKeyFanout()));
- }
-
- private static class HotKeyFanout implements SerializableFunction<String, Integer> {
- @Override
- public Integer apply(String input) {
- return (int) Math.pow(4, 5 - input.length());
- }
- }
- }
-
- /**
- * Cheaper but higher latency.
- *
- * <p>Returns two PCollections, the first is top prefixes of size greater
- * than minPrefix, and the second is top prefixes of size exactly
- * minPrefix.
- */
- private static class ComputeTopRecursive
- extends PTransform<PCollection<CompletionCandidate>,
- PCollectionList<KV<String, List<CompletionCandidate>>>> {
- private final int candidatesPerPrefix;
- private final int minPrefix;
-
- public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
- this.candidatesPerPrefix = candidatesPerPrefix;
- this.minPrefix = minPrefix;
- }
-
- private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> {
- @Override
- public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) {
- return elem.getKey().length() > minPrefix ? 0 : 1;
- }
- }
-
- private static class FlattenTops
- extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
- @Override
- public void processElement(ProcessContext c) {
- for (CompletionCandidate cc : c.element().getValue()) {
- c.output(cc);
- }
- }
- }
-
- @Override
- public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
- PCollection<CompletionCandidate> input) {
- if (minPrefix > 10) {
- // Base case, partitioning to return the output in the expected format.
- return input
- .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
- .apply(Partition.of(2, new KeySizePartitionFn()));
- } else {
- // If a candidate is in the top N for prefix a...b, it must also be in the top
- // N for a...bX for every X, which is typlically a much smaller set to consider.
- // First, compute the top candidate for prefixes of size at least minPrefix + 1.
- PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
- .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1));
- // Consider the top candidates for each prefix of length minPrefix + 1...
- PCollection<KV<String, List<CompletionCandidate>>> small =
- PCollectionList
- .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
- // ...together with those (previously excluded) candidates of length
- // exactly minPrefix...
- .and(input.apply(Filter.byPredicate(
- new SerializableFunction<CompletionCandidate, Boolean>() {
- @Override
- public Boolean apply(CompletionCandidate c) {
- return c.getValue().length() == minPrefix;
- }
- })))
- .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
- // ...set the key to be the minPrefix-length prefix...
- .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
- // ...and (re)apply the Top operator to all of them together.
- .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix));
-
- PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger
- .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
-
- return PCollectionList.of(flattenLarger).and(small);
- }
- }
- }
-
- /**
- * A DoFn that keys each candidate by all its prefixes.
- */
- private static class AllPrefixes
- extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
- private final int minPrefix;
- private final int maxPrefix;
- public AllPrefixes(int minPrefix) {
- this(minPrefix, Integer.MAX_VALUE);
- }
- public AllPrefixes(int minPrefix, int maxPrefix) {
- this.minPrefix = minPrefix;
- this.maxPrefix = maxPrefix;
- }
- @Override
- public void processElement(ProcessContext c) {
- String word = c.element().value;
- for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
- c.output(KV.of(word.substring(0, i), c.element()));
- }
- }
- }
-
- /**
- * Class used to store tag-count pairs.
- */
- @DefaultCoder(AvroCoder.class)
- static class CompletionCandidate implements Comparable<CompletionCandidate> {
- private long count;
- private String value;
-
- public CompletionCandidate(String value, long count) {
- this.value = value;
- this.count = count;
- }
-
- public long getCount() {
- return count;
- }
-
- public String getValue() {
- return value;
- }
-
- // Empty constructor required for Avro decoding.
- public CompletionCandidate() {}
-
- @Override
- public int compareTo(CompletionCandidate o) {
- if (this.count < o.count) {
- return -1;
- } else if (this.count == o.count) {
- return this.value.compareTo(o.value);
- } else {
- return 1;
- }
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof CompletionCandidate) {
- CompletionCandidate that = (CompletionCandidate) other;
- return this.count == that.count && this.value.equals(that.value);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Long.valueOf(count).hashCode() ^ value.hashCode();
- }
-
- @Override
- public String toString() {
- return "CompletionCandidate[" + value + ", " + count + "]";
- }
- }
-
- /**
- * Takes as input a set of strings, and emits each #hashtag found therein.
- */
- static class ExtractHashtags extends DoFn<String, String> {
- @Override
- public void processElement(ProcessContext c) {
- Matcher m = Pattern.compile("#\\S+").matcher(c.element());
- while (m.find()) {
- c.output(m.group().substring(1));
- }
- }
- }
-
- static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> {
- @Override
- public void processElement(ProcessContext c) {
- List<TableRow> completions = new ArrayList<>();
- for (CompletionCandidate cc : c.element().getValue()) {
- completions.add(new TableRow()
- .set("count", cc.getCount())
- .set("tag", cc.getValue()));
- }
- TableRow row = new TableRow()
- .set("prefix", c.element().getKey())
- .set("tags", completions);
- c.output(row);
- }
-
- /**
- * Defines the BigQuery schema used for the output.
- */
- static TableSchema getSchema() {
- List<TableFieldSchema> tagFields = new ArrayList<>();
- tagFields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
- tagFields.add(new TableFieldSchema().setName("tag").setType("STRING"));
- List<TableFieldSchema> fields = new ArrayList<>();
- fields.add(new TableFieldSchema().setName("prefix").setType("STRING"));
- fields.add(new TableFieldSchema()
- .setName("tags").setType("RECORD").setMode("REPEATED").setFields(tagFields));
- return new TableSchema().setFields(fields);
- }
- }
-
- /**
- * Takes as input a the top candidates per prefix, and emits an entity
- * suitable for writing to Datastore.
- */
- static class FormatForDatastore extends DoFn<KV<String, List<CompletionCandidate>>, Entity> {
- private String kind;
-
- public FormatForDatastore(String kind) {
- this.kind = kind;
- }
-
- @Override
- public void processElement(ProcessContext c) {
- Entity.Builder entityBuilder = Entity.newBuilder();
- Key key = DatastoreHelper.makeKey(kind, c.element().getKey()).build();
-
- entityBuilder.setKey(key);
- List<Value> candidates = new ArrayList<>();
- for (CompletionCandidate tag : c.element().getValue()) {
- Entity.Builder tagEntity = Entity.newBuilder();
- tagEntity.addProperty(
- DatastoreHelper.makeProperty("tag", DatastoreHelper.makeValue(tag.value)));
- tagEntity.addProperty(
- DatastoreHelper.makeProperty("count", DatastoreHelper.makeValue(tag.count)));
- candidates.add(DatastoreHelper.makeValue(tagEntity).setIndexed(false).build());
- }
- entityBuilder.addProperty(
- DatastoreHelper.makeProperty("candidates", DatastoreHelper.makeValue(candidates)));
- c.output(entityBuilder.build());
- }
- }
-
- /**
- * Options supported by this class.
- *
- * <p>Inherits standard Dataflow configuration options.
- */
- private static interface Options extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
- @Description("Input text file")
- String getInputFile();
- void setInputFile(String value);
-
- @Description("Whether to use the recursive algorithm")
- @Default.Boolean(true)
- Boolean getRecursive();
- void setRecursive(Boolean value);
-
- @Description("Dataset entity kind")
- @Default.String("autocomplete-demo")
- String getKind();
- void setKind(String value);
-
- @Description("Whether output to BigQuery")
- @Default.Boolean(true)
- Boolean getOutputToBigQuery();
- void setOutputToBigQuery(Boolean value);
-
- @Description("Whether output to Datastore")
- @Default.Boolean(false)
- Boolean getOutputToDatastore();
- void setOutputToDatastore(Boolean value);
-
- @Description("Datastore output dataset ID, defaults to project ID")
- String getOutputDataset();
- void setOutputDataset(String value);
- }
-
- public static void main(String[] args) throws IOException {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
- if (options.isStreaming()) {
- // In order to cancel the pipelines automatically,
- // {@literal DataflowPipelineRunner} is forced to be used.
- options.setRunner(DataflowPipelineRunner.class);
- }
-
- options.setBigQuerySchema(FormatForBigquery.getSchema());
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
-
- // We support running the same pipeline in either
- // batch or windowed streaming mode.
- PTransform<? super PBegin, PCollection<String>> readSource;
- WindowFn<Object, ?> windowFn;
- if (options.isStreaming()) {
- Preconditions.checkArgument(
- !options.getOutputToDatastore(), "DatastoreIO is not supported in streaming.");
- dataflowUtils.setupPubsub();
-
- readSource = PubsubIO.Read.topic(options.getPubsubTopic());
- windowFn = SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5));
- } else {
- readSource = TextIO.Read.from(options.getInputFile());
- windowFn = new GlobalWindows();
- }
-
- // Create the pipeline.
- Pipeline p = Pipeline.create(options);
- PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
- .apply(readSource)
- .apply(ParDo.of(new ExtractHashtags()))
- .apply(Window.<String>into(windowFn))
- .apply(ComputeTopCompletions.top(10, options.getRecursive()));
-
- if (options.getOutputToDatastore()) {
- toWrite
- .apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind())))
- .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
- options.getOutputDataset(), options.getProject())));
- }
- if (options.getOutputToBigQuery()) {
- dataflowUtils.setupBigQueryTable();
-
- TableReference tableRef = new TableReference();
- tableRef.setProjectId(options.getProject());
- tableRef.setDatasetId(options.getBigQueryDataset());
- tableRef.setTableId(options.getBigQueryTable());
-
- toWrite
- .apply(ParDo.of(new FormatForBigquery()))
- .apply(BigQueryIO.Write
- .to(tableRef)
- .withSchema(FormatForBigquery.getSchema())
- .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
- }
-
- // Run the pipeline.
- PipelineResult result = p.run();
-
- if (options.isStreaming() && !options.getInputFile().isEmpty()) {
- // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
- dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic());
- }
-
- // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
- dataflowUtils.waitToFinish(result);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md
deleted file mode 100644
index 5fba154..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md
+++ /dev/null
@@ -1,44 +0,0 @@
-
-# "Complete" Examples
-
-This directory contains end-to-end example pipelines that perform complex data processing tasks. They include:
-
-<ul>
- <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java">AutoComplete</a>
- — An example that computes the most popular hash tags for every
- prefix, which can be used for auto-completion. Demonstrates how to use the
- same pipeline in both streaming and batch, combiners, and composite
- transforms.</li>
- <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java">StreamingWordExtract</a>
- — A streaming pipeline example that inputs lines of text from a Cloud
- Pub/Sub topic, splits each line into individual words, capitalizes those
- words, and writes the output to a BigQuery table.
- </li>
- <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java">TfIdf</a>
- — An example that computes a basic TF-IDF search table for a directory or
- Cloud Storage prefix. Demonstrates joining data, side inputs, and logging.
- </li>
- <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java">TopWikipediaSessions</a>
- — An example that reads Wikipedia edit data from Cloud Storage and
- computes the user with the longest string of edits separated by no more than
- an hour within each month. Demonstrates using Cloud Dataflow
- <code>Windowing</code> to perform time-based aggregations of data.
- </li>
- <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java">TrafficMaxLaneFlow</a>
- — A streaming Cloud Dataflow example using BigQuery output in the
- <code>traffic sensor</code> domain. Demonstrates the Cloud Dataflow streaming
- runner, sliding windows, Cloud Pub/Sub topic ingestion, the use of the
- <code>AvroCoder</code> to encode a custom class, and custom
- <code>Combine</code> transforms.
- </li>
- <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java">TrafficRoutes</a>
- — A streaming Cloud Dataflow example using BigQuery output in the
- <code>traffic sensor</code> domain. Demonstrates the Cloud Dataflow streaming
- runner, <code>GroupByKey</code>, keyed state, sliding windows, and Cloud
- Pub/Sub topic ingestion.
- </li>
- </ul>
-
-See the [documentation](https://cloud.google.com/dataflow/getting-started) and the [Examples
-README](../../../../../../../../../README.md) for
-information about how to run these examples.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java
deleted file mode 100644
index 99c5249..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-/**
- * A streaming Dataflow Example using BigQuery output.
- *
- * <p>This pipeline example reads lines of text from a PubSub topic, splits each line
- * into individual words, capitalizes those words, and writes the output to
- * a BigQuery table.
- *
- * <p>By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
- *
- * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
- */
-public class StreamingWordExtract {
-
- /** A DoFn that tokenizes lines of text into individual words. */
- static class ExtractWords extends DoFn<String, String> {
- @Override
- public void processElement(ProcessContext c) {
- String[] words = c.element().split("[^a-zA-Z']+");
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- /** A DoFn that uppercases a word. */
- static class Uppercase extends DoFn<String, String> {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().toUpperCase());
- }
- }
-
- /**
- * Converts strings into BigQuery rows.
- */
- static class StringToRowConverter extends DoFn<String, TableRow> {
- /**
- * In this example, put the whole string into single BigQuery field.
- */
- @Override
- public void processElement(ProcessContext c) {
- c.output(new TableRow().set("string_field", c.element()));
- }
-
- static TableSchema getSchema() {
- return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
- // Compose the list of TableFieldSchema from tableSchema.
- {
- add(new TableFieldSchema().setName("string_field").setType("STRING"));
- }
- });
- }
- }
-
- /**
- * Options supported by {@link StreamingWordExtract}.
- *
- * <p>Inherits standard configuration options.
- */
- private interface StreamingWordExtractOptions
- extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
- @Description("Input file to inject to Pub/Sub topic")
- @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
- String getInputFile();
- void setInputFile(String value);
- }
-
- /**
- * Sets up and starts streaming pipeline.
- *
- * @throws IOException if there is a problem setting up resources
- */
- public static void main(String[] args) throws IOException {
- StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(StreamingWordExtractOptions.class);
- options.setStreaming(true);
- // In order to cancel the pipelines automatically,
- // {@literal DataflowPipelineRunner} is forced to be used.
- options.setRunner(DataflowPipelineRunner.class);
-
- options.setBigQuerySchema(StringToRowConverter.getSchema());
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
- dataflowUtils.setup();
-
- Pipeline pipeline = Pipeline.create(options);
-
- String tableSpec = new StringBuilder()
- .append(options.getProject()).append(":")
- .append(options.getBigQueryDataset()).append(".")
- .append(options.getBigQueryTable())
- .toString();
- pipeline
- .apply(PubsubIO.Read.topic(options.getPubsubTopic()))
- .apply(ParDo.of(new ExtractWords()))
- .apply(ParDo.of(new Uppercase()))
- .apply(ParDo.of(new StringToRowConverter()))
- .apply(BigQueryIO.Write.to(tableSpec)
- .withSchema(StringToRowConverter.getSchema()));
-
- PipelineResult result = pipeline.run();
-
- if (!options.getInputFile().isEmpty()) {
- // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
- dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic());
- }
-
- // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
- dataflowUtils.waitToFinish(result);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java
deleted file mode 100644
index 65ac753..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Keys;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.WithKeys;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.util.GcsUtil;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * An example that computes a basic TF-IDF search table for a directory or GCS prefix.
- *
- * <p>Concepts: joining data; side inputs; logging
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }</pre>
- * and a local output file or output prefix on GCS:
- * <pre>{@code
- * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
- * and an output prefix on GCS:
- * --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- *
- * <p>The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with
- * {@code --input}.
- */
-public class TfIdf {
- /**
- * Options supported by {@link TfIdf}.
- *
- * <p>Inherits standard configuration options.
- */
- private static interface Options extends PipelineOptions {
- @Description("Path to the directory or GCS prefix containing files to read from")
- @Default.String("gs://dataflow-samples/shakespeare/")
- String getInput();
- void setInput(String value);
-
- @Description("Prefix of output URI to write to")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- /**
- * Lists documents contained beneath the {@code options.input} prefix/directory.
- */
- public static Set<URI> listInputDocuments(Options options)
- throws URISyntaxException, IOException {
- URI baseUri = new URI(options.getInput());
-
- // List all documents in the directory or GCS prefix.
- URI absoluteUri;
- if (baseUri.getScheme() != null) {
- absoluteUri = baseUri;
- } else {
- absoluteUri = new URI(
- "file",
- baseUri.getAuthority(),
- baseUri.getPath(),
- baseUri.getQuery(),
- baseUri.getFragment());
- }
-
- Set<URI> uris = new HashSet<>();
- if (absoluteUri.getScheme().equals("file")) {
- File directory = new File(absoluteUri);
- for (String entry : directory.list()) {
- File path = new File(directory, entry);
- uris.add(path.toURI());
- }
- } else if (absoluteUri.getScheme().equals("gs")) {
- GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
- URI gcsUriGlob = new URI(
- absoluteUri.getScheme(),
- absoluteUri.getAuthority(),
- absoluteUri.getPath() + "*",
- absoluteUri.getQuery(),
- absoluteUri.getFragment());
- for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
- uris.add(entry.toUri());
- }
- }
-
- return uris;
- }
-
- /**
- * Reads the documents at the provided uris and returns all lines
- * from the documents tagged with which document they are from.
- */
- public static class ReadDocuments
- extends PTransform<PInput, PCollection<KV<URI, String>>> {
- private Iterable<URI> uris;
-
- public ReadDocuments(Iterable<URI> uris) {
- this.uris = uris;
- }
-
- @Override
- public Coder<?> getDefaultOutputCoder() {
- return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
- }
-
- @Override
- public PCollection<KV<URI, String>> apply(PInput input) {
- Pipeline pipeline = input.getPipeline();
-
- // Create one TextIO.Read transform for each document
- // and add its output to a PCollectionList
- PCollectionList<KV<URI, String>> urisToLines =
- PCollectionList.empty(pipeline);
-
- // TextIO.Read supports:
- // - file: URIs and paths locally
- // - gs: URIs on the service
- for (final URI uri : uris) {
- String uriString;
- if (uri.getScheme().equals("file")) {
- uriString = new File(uri).getPath();
- } else {
- uriString = uri.toString();
- }
-
- PCollection<KV<URI, String>> oneUriToLines = pipeline
- .apply(TextIO.Read.from(uriString)
- .named("TextIO.Read(" + uriString + ")"))
- .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
-
- urisToLines = urisToLines.and(oneUriToLines);
- }
-
- return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
- }
- }
-
- /**
- * A transform containing a basic TF-IDF pipeline. The input consists of KV objects
- * where the key is the document's URI and the value is a piece
- * of the document's content. The output is mapping from terms to
- * scores for each document URI.
- */
- public static class ComputeTfIdf
- extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
- public ComputeTfIdf() { }
-
- @Override
- public PCollection<KV<String, KV<URI, Double>>> apply(
- PCollection<KV<URI, String>> uriToContent) {
-
- // Compute the total number of documents, and
- // prepare this singleton PCollectionView for
- // use as a side input.
- final PCollectionView<Long> totalDocuments =
- uriToContent
- .apply("GetURIs", Keys.<URI>create())
- .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
- .apply(Count.<URI>globally())
- .apply(View.<Long>asSingleton());
-
- // Create a collection of pairs mapping a URI to each
- // of the words in the document associated with that that URI.
- PCollection<KV<URI, String>> uriToWords = uriToContent
- .apply(ParDo.named("SplitWords").of(
- new DoFn<KV<URI, String>, KV<URI, String>>() {
- @Override
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey();
- String line = c.element().getValue();
- for (String word : line.split("\\W+")) {
- // Log INFO messages when the word “love” is found.
- if (word.toLowerCase().equals("love")) {
- LOG.info("Found {}", word.toLowerCase());
- }
-
- if (!word.isEmpty()) {
- c.output(KV.of(uri, word.toLowerCase()));
- }
- }
- }
- }));
-
- // Compute a mapping from each word to the total
- // number of documents in which it appears.
- PCollection<KV<String, Long>> wordToDocCount = uriToWords
- .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
- .apply(Values.<String>create())
- .apply("CountDocs", Count.<String>perElement());
-
- // Compute a mapping from each URI to the total
- // number of words in the document associated with that URI.
- PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
- .apply("GetURIs2", Keys.<URI>create())
- .apply("CountWords", Count.<URI>perElement());
-
- // Count, for each (URI, word) pair, the number of
- // occurrences of that word in the document associated
- // with the URI.
- PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
- .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
-
- // Adjust the above collection to a mapping from
- // (URI, word) pairs to counts into an isomorphic mapping
- // from URI to (word, count) pairs, to prepare for a join
- // by the URI key.
- PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
- .apply(ParDo.named("ShiftKeys").of(
- new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
- @Override
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey().getKey();
- String word = c.element().getKey().getValue();
- Long occurrences = c.element().getValue();
- c.output(KV.of(uri, KV.of(word, occurrences)));
- }
- }));
-
- // Prepare to join the mapping of URI to (word, count) pairs with
- // the mapping of URI to total word counts, by associating
- // each of the input PCollection<KV<URI, ...>> with
- // a tuple tag. Each input must have the same key type, URI
- // in this case. The type parameter of the tuple tag matches
- // the types of the values for each collection.
- final TupleTag<Long> wordTotalsTag = new TupleTag<Long>();
- final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<KV<String, Long>>();
- KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
- .of(wordTotalsTag, uriToWordTotal)
- .and(wordCountsTag, uriToWordAndCount);
-
- // Perform a CoGroupByKey (a sort of pre-join) on the prepared
- // inputs. This yields a mapping from URI to a CoGbkResult
- // (CoGroupByKey Result). The CoGbkResult is a mapping
- // from the above tuple tags to the values in each input
- // associated with a particular URI. In this case, each
- // KV<URI, CoGbkResult> group a URI with the total number of
- // words in that document as well as all the (word, count)
- // pairs for particular words.
- PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
- .apply("CoGroupByUri", CoGroupByKey.<URI>create());
-
- // Compute a mapping from each word to a (URI, term frequency)
- // pair for each URI. A word's term frequency for a document
- // is simply the number of times that word occurs in the document
- // divided by the total number of words in the document.
- PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
- .apply(ParDo.named("ComputeTermFrequencies").of(
- new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- @Override
- public void processElement(ProcessContext c) {
- URI uri = c.element().getKey();
- Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
-
- for (KV<String, Long> wordAndCount
- : c.element().getValue().getAll(wordCountsTag)) {
- String word = wordAndCount.getKey();
- Long wordCount = wordAndCount.getValue();
- Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
- c.output(KV.of(word, KV.of(uri, termFrequency)));
- }
- }
- }));
-
- // Compute a mapping from each word to its document frequency.
- // A word's document frequency in a corpus is the number of
- // documents in which the word appears divided by the total
- // number of documents in the corpus. Note how the total number of
- // documents is passed as a side input; the same value is
- // presented to each invocation of the DoFn.
- PCollection<KV<String, Double>> wordToDf = wordToDocCount
- .apply(ParDo
- .named("ComputeDocFrequencies")
- .withSideInputs(totalDocuments)
- .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
- @Override
- public void processElement(ProcessContext c) {
- String word = c.element().getKey();
- Long documentCount = c.element().getValue();
- Long documentTotal = c.sideInput(totalDocuments);
- Double documentFrequency = documentCount.doubleValue()
- / documentTotal.doubleValue();
-
- c.output(KV.of(word, documentFrequency));
- }
- }));
-
- // Join the term frequency and document frequency
- // collections, each keyed on the word.
- final TupleTag<KV<URI, Double>> tfTag = new TupleTag<KV<URI, Double>>();
- final TupleTag<Double> dfTag = new TupleTag<Double>();
- PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
- .of(tfTag, wordToUriAndTf)
- .and(dfTag, wordToDf)
- .apply(CoGroupByKey.<String>create());
-
- // Compute a mapping from each word to a (URI, TF-IDF) score
- // for each URI. There are a variety of definitions of TF-IDF
- // ("term frequency - inverse document frequency") score;
- // here we use a basic version that is the term frequency
- // divided by the log of the document frequency.
- PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf
- .apply(ParDo.named("ComputeTfIdf").of(
- new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- @Override
- public void processElement(ProcessContext c) {
- String word = c.element().getKey();
- Double df = c.element().getValue().getOnly(dfTag);
-
- for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
- URI uri = uriAndTf.getKey();
- Double tf = uriAndTf.getValue();
- Double tfIdf = tf * Math.log(1 / df);
- c.output(KV.of(word, KV.of(uri, tfIdf)));
- }
- }
- }));
-
- return wordToUriAndTfIdf;
- }
-
- // Instantiate Logger.
- // It is suggested that the user specify the class name of the containing class
- // (in this case ComputeTfIdf).
- private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
- }
-
- /**
- * A {@link PTransform} to write, in CSV format, a mapping from term and URI
- * to score.
- */
- public static class WriteTfIdf
- extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
- private String output;
-
- public WriteTfIdf(String output) {
- this.output = output;
- }
-
- @Override
- public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
- return wordToUriAndTfIdf
- .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(String.format("%s,\t%s,\t%f",
- c.element().getKey(),
- c.element().getValue().getKey(),
- c.element().getValue().getValue()));
- }
- }))
- .apply(TextIO.Write
- .to(output)
- .withSuffix(".csv"));
- }
- }
-
- public static void main(String[] args) throws Exception {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- Pipeline pipeline = Pipeline.create(options);
- pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
- pipeline
- .apply(new ReadDocuments(listInputDocuments(options)))
- .apply(new ComputeTfIdf())
- .apply(new WriteTfIdf(options.getOutput()));
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
deleted file mode 100644
index c57a5f2..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.List;
-
-/**
- * An example that reads Wikipedia edit data from Cloud Storage and computes the user with
- * the longest string of edits separated by no more than an hour within each month.
- *
- * <p>Concepts: Using Windowing to perform time-based aggregations of data.
- *
- * <p>It is not recommended to execute this pipeline locally, given the size of the default input
- * data.
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and an output prefix on GCS:
- * <pre>{@code
- * --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- *
- * <p>The default input is {@code gs://dataflow-samples/wikipedia_edits/*.json} and can be
- * overridden with {@code --input}.
- *
- * <p>The input for this example is large enough that it's a good place to enable (experimental)
- * autoscaling:
- * <pre>{@code
- * --autoscalingAlgorithm=BASIC
- * --maxNumWorkers=20
- * }
- * </pre>
- * This will automatically scale the number of workers up over time until the job completes.
- */
-public class TopWikipediaSessions {
- private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json";
-
- /**
- * Extracts user and timestamp from a TableRow representing a Wikipedia edit.
- */
- static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
- @Override
- public void processElement(ProcessContext c) {
- TableRow row = c.element();
- int timestamp = (Integer) row.get("timestamp");
- String userName = (String) row.get("contributor_username");
- if (userName != null) {
- // Sets the implicit timestamp field to be used in windowing.
- c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
- }
- }
- }
-
- /**
- * Computes the number of edits in each user session. A session is defined as
- * a string of edits where each is separated from the next by less than an hour.
- */
- static class ComputeSessions
- extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
- @Override
- public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
- return actions
- .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
-
- .apply(Count.<String>perElement());
- }
- }
-
- /**
- * Computes the longest session ending in each month.
- */
- private static class TopPerMonth
- extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
- @Override
- public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
- return sessions
- .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
-
- .apply(Top.of(1, new SerializableComparator<KV<String, Long>>() {
- @Override
- public int compare(KV<String, Long> o1, KV<String, Long> o2) {
- return Long.compare(o1.getValue(), o2.getValue());
- }
- }).withoutDefaults());
- }
- }
-
- static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>>
- implements RequiresWindowAccess {
-
- @Override
- public void processElement(ProcessContext c) {
- c.output(KV.of(
- c.element().getKey() + " : " + c.window(), c.element().getValue()));
- }
- }
-
- static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String>
- implements RequiresWindowAccess {
- @Override
- public void processElement(ProcessContext c) {
- for (KV<String, Long> item : c.element()) {
- String session = item.getKey();
- long count = item.getValue();
- c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start());
- }
- }
- }
-
- static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> {
-
- private final double samplingThreshold;
-
- public ComputeTopSessions(double samplingThreshold) {
- this.samplingThreshold = samplingThreshold;
- }
-
- @Override
- public PCollection<String> apply(PCollection<TableRow> input) {
- return input
- .apply(ParDo.of(new ExtractUserAndTimestamp()))
-
- .apply(ParDo.named("SampleUsers").of(
- new DoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) {
- if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
- c.output(c.element());
- }
- }
- }))
-
- .apply(new ComputeSessions())
-
- .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
- .apply(new TopPerMonth())
- .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
- }
- }
-
- /**
- * Options supported by this class.
- *
- * <p>Inherits standard Dataflow configuration options.
- */
- private static interface Options extends PipelineOptions {
- @Description(
- "Input specified as a GCS path containing a BigQuery table exported as json")
- @Default.String(EXPORTED_WIKI_TABLE)
- String getInput();
- void setInput(String value);
-
- @Description("File to output results to")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- public static void main(String[] args) {
- Options options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(Options.class);
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-
- Pipeline p = Pipeline.create(dataflowOptions);
-
- double samplingThreshold = 0.1;
-
- p.apply(TextIO.Read
- .from(options.getInput())
- .withCoder(TableRowJsonCoder.of()))
- .apply(new ComputeTopSessions(samplingThreshold))
- .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
deleted file mode 100644
index 2d54252..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Strings;
-
-import org.apache.avro.reflect.Nullable;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
- * You can configure the running mode by setting {@literal --streaming} to true or false.
- *
- * <p>Concepts: The batch and streaming runners, sliding windows, Google Cloud Pub/Sub
- * topic injection, use of the AvroCoder to encode a custom class, and custom Combine transforms.
- *
- * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
- * it finds the lane that had the highest flow recorded, for each sensor station. It writes
- * those max values along with auxiliary info to a BigQuery table.
- *
- * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
- *
- * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
- * By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
- * is provided in
- * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
- *
- * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
- */
-public class TrafficMaxLaneFlow {
-
- private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
- private static final Integer VALID_INPUTS = 4999;
-
- static final int WINDOW_DURATION = 60; // Default sliding window duration in minutes
- static final int WINDOW_SLIDE_EVERY = 5; // Default window 'slide every' setting in minutes
-
- /**
- * This class holds information about each lane in a station reading, along with some general
- * information from the reading.
- */
- @DefaultCoder(AvroCoder.class)
- static class LaneInfo {
- @Nullable String stationId;
- @Nullable String lane;
- @Nullable String direction;
- @Nullable String freeway;
- @Nullable String recordedTimestamp;
- @Nullable Integer laneFlow;
- @Nullable Integer totalFlow;
- @Nullable Double laneAO;
- @Nullable Double laneAS;
-
- public LaneInfo() {}
-
- public LaneInfo(String stationId, String lane, String direction, String freeway,
- String timestamp, Integer laneFlow, Double laneAO,
- Double laneAS, Integer totalFlow) {
- this.stationId = stationId;
- this.lane = lane;
- this.direction = direction;
- this.freeway = freeway;
- this.recordedTimestamp = timestamp;
- this.laneFlow = laneFlow;
- this.laneAO = laneAO;
- this.laneAS = laneAS;
- this.totalFlow = totalFlow;
- }
-
- public String getStationId() {
- return this.stationId;
- }
- public String getLane() {
- return this.lane;
- }
- public String getDirection() {
- return this.direction;
- }
- public String getFreeway() {
- return this.freeway;
- }
- public String getRecordedTimestamp() {
- return this.recordedTimestamp;
- }
- public Integer getLaneFlow() {
- return this.laneFlow;
- }
- public Double getLaneAO() {
- return this.laneAO;
- }
- public Double getLaneAS() {
- return this.laneAS;
- }
- public Integer getTotalFlow() {
- return this.totalFlow;
- }
- }
-
- /**
- * Extract the timestamp field from the input string, and use it as the element timestamp.
- */
- static class ExtractTimestamps extends DoFn<String, String> {
- private static final DateTimeFormatter dateTimeFormat =
- DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
-
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
- String[] items = c.element().split(",");
- if (items.length > 0) {
- try {
- String timestamp = items[0];
- c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
- } catch (IllegalArgumentException e) {
- // Skip the invalid input.
- }
- }
- }
- }
-
- /**
- * Extract flow information for each of the 8 lanes in a reading, and output as separate tuples.
- * This will let us determine which lane has the max flow for that station over the span of the
- * window, and output not only the max flow from that calculation, but other associated
- * information. The number of lanes for which data is present depends upon which freeway the data
- * point comes from.
- */
- static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {
-
- @Override
- public void processElement(ProcessContext c) {
- String[] items = c.element().split(",");
- if (items.length < 48) {
- // Skip the invalid input.
- return;
- }
- // extract the sensor information for the lanes from the input string fields.
- String timestamp = items[0];
- String stationId = items[1];
- String freeway = items[2];
- String direction = items[3];
- Integer totalFlow = tryIntParse(items[7]);
- for (int i = 1; i <= 8; ++i) {
- Integer laneFlow = tryIntParse(items[6 + 5 * i]);
- Double laneAvgOccupancy = tryDoubleParse(items[7 + 5 * i]);
- Double laneAvgSpeed = tryDoubleParse(items[8 + 5 * i]);
- if (laneFlow == null || laneAvgOccupancy == null || laneAvgSpeed == null) {
- return;
- }
- LaneInfo laneInfo = new LaneInfo(stationId, "lane" + i, direction, freeway, timestamp,
- laneFlow, laneAvgOccupancy, laneAvgSpeed, totalFlow);
- c.output(KV.of(stationId, laneInfo));
- }
- }
- }
-
- /**
- * A custom 'combine function' used with the Combine.perKey transform. Used to find the max lane
- * flow over all the data points in the Window. Extracts the lane flow from the input string and
- * determines whether it's the max seen so far. We're using a custom combiner instead of the Max
- * transform because we want to retain the additional information we've associated with the flow
- * value.
- */
- public static class MaxFlow implements SerializableFunction<Iterable<LaneInfo>, LaneInfo> {
- @Override
- public LaneInfo apply(Iterable<LaneInfo> input) {
- Integer max = 0;
- LaneInfo maxInfo = new LaneInfo();
- for (LaneInfo item : input) {
- Integer flow = item.getLaneFlow();
- if (flow != null && (flow >= max)) {
- max = flow;
- maxInfo = item;
- }
- }
- return maxInfo;
- }
- }
-
- /**
- * Format the results of the Max Lane flow calculation to a TableRow, to save to BigQuery.
- * Add the timestamp from the window context.
- */
- static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> {
- @Override
- public void processElement(ProcessContext c) {
-
- LaneInfo laneInfo = c.element().getValue();
- TableRow row = new TableRow()
- .set("station_id", c.element().getKey())
- .set("direction", laneInfo.getDirection())
- .set("freeway", laneInfo.getFreeway())
- .set("lane_max_flow", laneInfo.getLaneFlow())
- .set("lane", laneInfo.getLane())
- .set("avg_occ", laneInfo.getLaneAO())
- .set("avg_speed", laneInfo.getLaneAS())
- .set("total_flow", laneInfo.getTotalFlow())
- .set("recorded_timestamp", laneInfo.getRecordedTimestamp())
- .set("window_timestamp", c.timestamp().toString());
- c.output(row);
- }
-
- /** Defines the BigQuery schema used for the output. */
- static TableSchema getSchema() {
- List<TableFieldSchema> fields = new ArrayList<>();
- fields.add(new TableFieldSchema().setName("station_id").setType("STRING"));
- fields.add(new TableFieldSchema().setName("direction").setType("STRING"));
- fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
- fields.add(new TableFieldSchema().setName("lane_max_flow").setType("INTEGER"));
- fields.add(new TableFieldSchema().setName("lane").setType("STRING"));
- fields.add(new TableFieldSchema().setName("avg_occ").setType("FLOAT"));
- fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
- fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
- fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
- fields.add(new TableFieldSchema().setName("recorded_timestamp").setType("STRING"));
- TableSchema schema = new TableSchema().setFields(fields);
- return schema;
- }
- }
-
- /**
- * This PTransform extracts lane info, calculates the max lane flow found for a given station (for
- * the current Window) using a custom 'combiner', and formats the results for BigQuery.
- */
- static class MaxLaneFlow
- extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> {
- @Override
- public PCollection<TableRow> apply(PCollection<KV<String, LaneInfo>> flowInfo) {
- // stationId, LaneInfo => stationId + max lane flow info
- PCollection<KV<String, LaneInfo>> flowMaxes =
- flowInfo.apply(Combine.<String, LaneInfo>perKey(
- new MaxFlow()));
-
- // <stationId, max lane flow info>... => row...
- PCollection<TableRow> results = flowMaxes.apply(
- ParDo.of(new FormatMaxesFn()));
-
- return results;
- }
- }
-
- static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
- private final String inputFile;
-
- public ReadFileAndExtractTimestamps(String inputFile) {
- this.inputFile = inputFile;
- }
-
- @Override
- public PCollection<String> apply(PBegin begin) {
- return begin
- .apply(TextIO.Read.from(inputFile))
- .apply(ParDo.of(new ExtractTimestamps()));
- }
- }
-
- /**
- * Options supported by {@link TrafficMaxLaneFlow}.
- *
- * <p>Inherits standard configuration options.
- */
- private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions,
- ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
- @Description("Input file to inject to Pub/Sub topic")
- @Default.String("gs://dataflow-samples/traffic_sensor/"
- + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
- String getInputFile();
- void setInputFile(String value);
-
- @Description("Numeric value of sliding window duration, in minutes")
- @Default.Integer(WINDOW_DURATION)
- Integer getWindowDuration();
- void setWindowDuration(Integer value);
-
- @Description("Numeric value of window 'slide every' setting, in minutes")
- @Default.Integer(WINDOW_SLIDE_EVERY)
- Integer getWindowSlideEvery();
- void setWindowSlideEvery(Integer value);
-
- @Description("Whether to run the pipeline with unbounded input")
- @Default.Boolean(false)
- boolean isUnbounded();
- void setUnbounded(boolean value);
- }
-
- /**
- * Sets up and starts streaming pipeline.
- *
- * @throws IOException if there is a problem setting up resources
- */
- public static void main(String[] args) throws IOException {
- TrafficMaxLaneFlowOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(TrafficMaxLaneFlowOptions.class);
- options.setBigQuerySchema(FormatMaxesFn.getSchema());
- // Using DataflowExampleUtils to set up required resources.
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
-
- Pipeline pipeline = Pipeline.create(options);
- TableReference tableRef = new TableReference();
- tableRef.setProjectId(options.getProject());
- tableRef.setDatasetId(options.getBigQueryDataset());
- tableRef.setTableId(options.getBigQueryTable());
-
- PCollection<String> input;
- if (options.isUnbounded()) {
- // Read unbounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()));
- } else {
- // Read bounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
-
- // To read bounded TextIO files, use:
- // input = pipeline.apply(new ReadFileAndExtractTimestamps(options.getInputFile()));
- }
- input
- // row... => <station route, station speed> ...
- .apply(ParDo.of(new ExtractFlowInfoFn()))
- // map the incoming data stream into sliding windows. The default window duration values
- // work well if you're running the accompanying Pub/Sub generator script with the
- // --replay flag, which simulates pauses in the sensor data publication. You may want to
- // adjust them otherwise.
- .apply(Window.<KV<String, LaneInfo>>into(SlidingWindows.of(
- Duration.standardMinutes(options.getWindowDuration())).
- every(Duration.standardMinutes(options.getWindowSlideEvery()))))
- .apply(new MaxLaneFlow())
- .apply(BigQueryIO.Write.to(tableRef)
- .withSchema(FormatMaxesFn.getSchema()));
-
- // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
- if (!Strings.isNullOrEmpty(options.getInputFile())
- && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
- dataflowUtils.runInjectorPipeline(
- new ReadFileAndExtractTimestamps(options.getInputFile()),
- options.getPubsubTopic(),
- PUBSUB_TIMESTAMP_LABEL_KEY);
- }
-
- // Run the pipeline.
- PipelineResult result = pipeline.run();
-
- // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
- dataflowUtils.waitToFinish(result);
- }
-
- private static Integer tryIntParse(String number) {
- try {
- return Integer.parseInt(number);
- } catch (NumberFormatException e) {
- return null;
- }
- }
-
- private static Double tryDoubleParse(String number) {
- try {
- return Double.parseDouble(number);
- } catch (NumberFormatException e) {
- return null;
- }
- }
-}