You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:19 UTC

[55/67] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java
deleted file mode 100644
index 4bedf31..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/ExamplePubsubTopicOptions.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.common;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-/**
- * Options that can be used to configure Pub/Sub topic in Dataflow examples.
- */
-public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions {
-  @Description("Pub/Sub topic")
-  @Default.InstanceFactory(PubsubTopicFactory.class)
-  String getPubsubTopic();
-  void setPubsubTopic(String topic);
-
-  /**
-   * Returns a default Pub/Sub topic based on the project and the job names.
-   */
-  static class PubsubTopicFactory implements DefaultValueFactory<String> {
-    @Override
-    public String create(PipelineOptions options) {
-      DataflowPipelineOptions dataflowPipelineOptions =
-          options.as(DataflowPipelineOptions.class);
-      return "projects/" + dataflowPipelineOptions.getProject()
-          + "/topics/" + dataflowPipelineOptions.getJobName();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java b/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java
deleted file mode 100644
index 4a82ae6..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/common/PubsubFileInjector.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.common;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
-import com.google.cloud.dataflow.sdk.util.Transport;
-import com.google.common.collect.ImmutableMap;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * A batch Dataflow pipeline for injecting a set of GCS files into
- * a PubSub topic line by line. Empty lines are skipped.
- *
- * <p>This is useful for testing streaming
- * pipelines. Note that since batch pipelines might retry chunks, this
- * does _not_ guarantee exactly-once injection of file data. Some lines may
- * be published multiple times.
- * </p>
- */
-public class PubsubFileInjector {
-
-  /**
-   * An incomplete {@code PubsubFileInjector} transform with unbound output topic.
-   */
-  public static class Unbound {
-    private final String timestampLabelKey;
-
-    Unbound() {
-      this.timestampLabelKey = null;
-    }
-
-    Unbound(String timestampLabelKey) {
-      this.timestampLabelKey = timestampLabelKey;
-    }
-
-    Unbound withTimestampLabelKey(String timestampLabelKey) {
-      return new Unbound(timestampLabelKey);
-    }
-
-    public Bound publish(String outputTopic) {
-      return new Bound(outputTopic, timestampLabelKey);
-    }
-  }
-
-  /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */
-  public static class Bound extends DoFn<String, Void> {
-    private final String outputTopic;
-    private final String timestampLabelKey;
-    public transient Pubsub pubsub;
-
-    public Bound(String outputTopic, String timestampLabelKey) {
-      this.outputTopic = outputTopic;
-      this.timestampLabelKey = timestampLabelKey;
-    }
-
-    @Override
-    public void startBundle(Context context) {
-      this.pubsub =
-          Transport.newPubsubClient(context.getPipelineOptions().as(DataflowPipelineOptions.class))
-              .build();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws IOException {
-      if (c.element().isEmpty()) {
-        return;
-      }
-      PubsubMessage pubsubMessage = new PubsubMessage();
-      pubsubMessage.encodeData(c.element().getBytes());
-      if (timestampLabelKey != null) {
-        pubsubMessage.setAttributes(
-            ImmutableMap.of(timestampLabelKey, Long.toString(c.timestamp().getMillis())));
-      }
-      PublishRequest publishRequest = new PublishRequest();
-      publishRequest.setMessages(Arrays.asList(pubsubMessage));
-      this.pubsub.projects().topics().publish(outputTopic, publishRequest).execute();
-    }
-  }
-
-  /**
-   * Creates a {@code PubsubFileInjector} transform with the given timestamp label key.
-   */
-  public static Unbound withTimestampLabelKey(String timestampLabelKey) {
-    return new Unbound(timestampLabelKey);
-  }
-
-  /**
-   * Creates a {@code PubsubFileInjector} transform that publishes to the given output topic.
-   */
-  public static Bound publish(String outputTopic) {
-    return new Unbound().publish(outputTopic);
-  }
-
-  /**
-   * Command line parameter options.
-   */
-  private interface PubsubFileInjectorOptions extends PipelineOptions {
-    @Description("GCS location of files.")
-    @Validation.Required
-    String getInput();
-    void setInput(String value);
-
-    @Description("Topic to publish on.")
-    @Validation.Required
-    String getOutputTopic();
-    void setOutputTopic(String value);
-  }
-
-  /**
-   * Sets up and starts streaming pipeline.
-   */
-  public static void main(String[] args) {
-    PubsubFileInjectorOptions options = PipelineOptionsFactory.fromArgs(args)
-        .withValidation()
-        .as(PubsubFileInjectorOptions.class);
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    pipeline
-        .apply(TextIO.Read.from(options.getInput()))
-        .apply(IntraBundleParallelization.of(PubsubFileInjector.publish(options.getOutputTopic()))
-            .withMaxParallelism(20));
-
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
deleted file mode 100644
index f897338..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
+++ /dev/null
@@ -1,516 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Value;
-import com.google.api.services.datastore.client.DatastoreHelper;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.DatastoreIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Filter;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Partition;
-import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Duration;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * An example that computes the most popular hash tags
- * for every prefix, which can be used for auto-completion.
- *
- * <p>Concepts: Using the same pipeline in both streaming and batch, combiners,
- *              composite transforms.
- *
- * <p>To execute this pipeline using the Dataflow service in batch mode,
- * specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=DataflowPipelineRunner
- *   --inputFile=gs://path/to/input*.txt
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service in streaming mode,
- * specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=DataflowPipelineRunner
- *   --inputFile=gs://YOUR_INPUT_DIRECTORY/*.txt
- *   --streaming
- * }</pre>
- *
- * <p>This will update the datastore every 10 seconds based on the last
- * 30 minutes of data received.
- */
-public class AutoComplete {
-
-  /**
-   * A PTransform that takes as input a list of tokens and returns
-   * the most common tokens per prefix.
-   */
-  public static class ComputeTopCompletions
-      extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> {
-    private final int candidatesPerPrefix;
-    private final boolean recursive;
-
-    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) {
-      this.candidatesPerPrefix = candidatesPerPrefix;
-      this.recursive = recursive;
-    }
-
-    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) {
-      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
-    }
-
-    @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) {
-      PCollection<CompletionCandidate> candidates = input
-        // First count how often each token appears.
-        .apply(new Count.PerElement<String>())
-
-        // Map the KV outputs of Count into our own CompletionCandiate class.
-        .apply(ParDo.named("CreateCompletionCandidates").of(
-            new DoFn<KV<String, Long>, CompletionCandidate>() {
-              @Override
-              public void processElement(ProcessContext c) {
-                c.output(new CompletionCandidate(c.element().getKey(), c.element().getValue()));
-              }
-            }));
-
-      // Compute the top via either a flat or recursive algorithm.
-      if (recursive) {
-        return candidates
-          .apply(new ComputeTopRecursive(candidatesPerPrefix, 1))
-          .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
-      } else {
-        return candidates
-          .apply(new ComputeTopFlat(candidatesPerPrefix, 1));
-      }
-    }
-  }
-
-  /**
-   * Lower latency, but more expensive.
-   */
-  private static class ComputeTopFlat
-      extends PTransform<PCollection<CompletionCandidate>,
-                         PCollection<KV<String, List<CompletionCandidate>>>> {
-    private final int candidatesPerPrefix;
-    private final int minPrefix;
-
-    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) {
-      this.candidatesPerPrefix = candidatesPerPrefix;
-      this.minPrefix = minPrefix;
-    }
-
-    @Override
-    public PCollection<KV<String, List<CompletionCandidate>>> apply(
-        PCollection<CompletionCandidate> input) {
-      return input
-        // For each completion candidate, map it to all prefixes.
-        .apply(ParDo.of(new AllPrefixes(minPrefix)))
-
-        // Find and return the top candiates for each prefix.
-        .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)
-               .withHotKeyFanout(new HotKeyFanout()));
-    }
-
-    private static class HotKeyFanout implements SerializableFunction<String, Integer> {
-      @Override
-      public Integer apply(String input) {
-        return (int) Math.pow(4, 5 - input.length());
-      }
-    }
-  }
-
-  /**
-   * Cheaper but higher latency.
-   *
-   * <p>Returns two PCollections, the first is top prefixes of size greater
-   * than minPrefix, and the second is top prefixes of size exactly
-   * minPrefix.
-   */
-  private static class ComputeTopRecursive
-      extends PTransform<PCollection<CompletionCandidate>,
-                         PCollectionList<KV<String, List<CompletionCandidate>>>> {
-    private final int candidatesPerPrefix;
-    private final int minPrefix;
-
-    public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) {
-      this.candidatesPerPrefix = candidatesPerPrefix;
-      this.minPrefix = minPrefix;
-    }
-
-    private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> {
-      @Override
-      public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) {
-        return elem.getKey().length() > minPrefix ? 0 : 1;
-      }
-    }
-
-    private static class FlattenTops
-        extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
-      @Override
-      public void processElement(ProcessContext c) {
-        for (CompletionCandidate cc : c.element().getValue()) {
-          c.output(cc);
-        }
-      }
-    }
-
-    @Override
-    public PCollectionList<KV<String, List<CompletionCandidate>>> apply(
-          PCollection<CompletionCandidate> input) {
-        if (minPrefix > 10) {
-          // Base case, partitioning to return the output in the expected format.
-          return input
-            .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix))
-            .apply(Partition.of(2, new KeySizePartitionFn()));
-        } else {
-          // If a candidate is in the top N for prefix a...b, it must also be in the top
-          // N for a...bX for every X, which is typlically a much smaller set to consider.
-          // First, compute the top candidate for prefixes of size at least minPrefix + 1.
-          PCollectionList<KV<String, List<CompletionCandidate>>> larger = input
-            .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1));
-          // Consider the top candidates for each prefix of length minPrefix + 1...
-          PCollection<KV<String, List<CompletionCandidate>>> small =
-            PCollectionList
-            .of(larger.get(1).apply(ParDo.of(new FlattenTops())))
-            // ...together with those (previously excluded) candidates of length
-            // exactly minPrefix...
-            .and(input.apply(Filter.byPredicate(
-                new SerializableFunction<CompletionCandidate, Boolean>() {
-                  @Override
-                  public Boolean apply(CompletionCandidate c) {
-                    return c.getValue().length() == minPrefix;
-                  }
-                })))
-            .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
-            // ...set the key to be the minPrefix-length prefix...
-            .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
-            // ...and (re)apply the Top operator to all of them together.
-            .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix));
-
-          PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger
-              .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections());
-
-          return PCollectionList.of(flattenLarger).and(small);
-        }
-    }
-  }
-
-  /**
-   * A DoFn that keys each candidate by all its prefixes.
-   */
-  private static class AllPrefixes
-      extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
-    private final int minPrefix;
-    private final int maxPrefix;
-    public AllPrefixes(int minPrefix) {
-      this(minPrefix, Integer.MAX_VALUE);
-    }
-    public AllPrefixes(int minPrefix, int maxPrefix) {
-      this.minPrefix = minPrefix;
-      this.maxPrefix = maxPrefix;
-    }
-    @Override
-      public void processElement(ProcessContext c) {
-      String word = c.element().value;
-      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
-        c.output(KV.of(word.substring(0, i), c.element()));
-      }
-    }
-  }
-
-  /**
-   * Class used to store tag-count pairs.
-   */
-  @DefaultCoder(AvroCoder.class)
-  static class CompletionCandidate implements Comparable<CompletionCandidate> {
-    private long count;
-    private String value;
-
-    public CompletionCandidate(String value, long count) {
-      this.value = value;
-      this.count = count;
-    }
-
-    public long getCount() {
-      return count;
-    }
-
-    public String getValue() {
-      return value;
-    }
-
-    // Empty constructor required for Avro decoding.
-    public CompletionCandidate() {}
-
-    @Override
-    public int compareTo(CompletionCandidate o) {
-      if (this.count < o.count) {
-        return -1;
-      } else if (this.count == o.count) {
-        return this.value.compareTo(o.value);
-      } else {
-        return 1;
-      }
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other instanceof CompletionCandidate) {
-        CompletionCandidate that = (CompletionCandidate) other;
-        return this.count == that.count && this.value.equals(that.value);
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      return Long.valueOf(count).hashCode() ^ value.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return "CompletionCandidate[" + value + ", " + count + "]";
-    }
-  }
-
-  /**
-   * Takes as input a set of strings, and emits each #hashtag found therein.
-   */
-  static class ExtractHashtags extends DoFn<String, String> {
-    @Override
-    public void processElement(ProcessContext c) {
-      Matcher m = Pattern.compile("#\\S+").matcher(c.element());
-      while (m.find()) {
-        c.output(m.group().substring(1));
-      }
-    }
-  }
-
-  static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> {
-    @Override
-    public void processElement(ProcessContext c) {
-      List<TableRow> completions = new ArrayList<>();
-      for (CompletionCandidate cc : c.element().getValue()) {
-        completions.add(new TableRow()
-          .set("count", cc.getCount())
-          .set("tag", cc.getValue()));
-      }
-      TableRow row = new TableRow()
-        .set("prefix", c.element().getKey())
-        .set("tags", completions);
-      c.output(row);
-    }
-
-    /**
-     * Defines the BigQuery schema used for the output.
-     */
-    static TableSchema getSchema() {
-      List<TableFieldSchema> tagFields = new ArrayList<>();
-      tagFields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
-      tagFields.add(new TableFieldSchema().setName("tag").setType("STRING"));
-      List<TableFieldSchema> fields = new ArrayList<>();
-      fields.add(new TableFieldSchema().setName("prefix").setType("STRING"));
-      fields.add(new TableFieldSchema()
-          .setName("tags").setType("RECORD").setMode("REPEATED").setFields(tagFields));
-      return new TableSchema().setFields(fields);
-    }
-  }
-
-  /**
-   * Takes as input a the top candidates per prefix, and emits an entity
-   * suitable for writing to Datastore.
-   */
-  static class FormatForDatastore extends DoFn<KV<String, List<CompletionCandidate>>, Entity> {
-    private String kind;
-
-    public FormatForDatastore(String kind) {
-      this.kind = kind;
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      Entity.Builder entityBuilder = Entity.newBuilder();
-      Key key = DatastoreHelper.makeKey(kind, c.element().getKey()).build();
-
-      entityBuilder.setKey(key);
-      List<Value> candidates = new ArrayList<>();
-      for (CompletionCandidate tag : c.element().getValue()) {
-        Entity.Builder tagEntity = Entity.newBuilder();
-        tagEntity.addProperty(
-            DatastoreHelper.makeProperty("tag", DatastoreHelper.makeValue(tag.value)));
-        tagEntity.addProperty(
-            DatastoreHelper.makeProperty("count", DatastoreHelper.makeValue(tag.count)));
-        candidates.add(DatastoreHelper.makeValue(tagEntity).setIndexed(false).build());
-      }
-      entityBuilder.addProperty(
-          DatastoreHelper.makeProperty("candidates", DatastoreHelper.makeValue(candidates)));
-      c.output(entityBuilder.build());
-    }
-  }
-
-  /**
-   * Options supported by this class.
-   *
-   * <p>Inherits standard Dataflow configuration options.
-   */
-  private static interface Options extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
-    @Description("Input text file")
-    String getInputFile();
-    void setInputFile(String value);
-
-    @Description("Whether to use the recursive algorithm")
-    @Default.Boolean(true)
-    Boolean getRecursive();
-    void setRecursive(Boolean value);
-
-    @Description("Dataset entity kind")
-    @Default.String("autocomplete-demo")
-    String getKind();
-    void setKind(String value);
-
-    @Description("Whether output to BigQuery")
-    @Default.Boolean(true)
-    Boolean getOutputToBigQuery();
-    void setOutputToBigQuery(Boolean value);
-
-    @Description("Whether output to Datastore")
-    @Default.Boolean(false)
-    Boolean getOutputToDatastore();
-    void setOutputToDatastore(Boolean value);
-
-    @Description("Datastore output dataset ID, defaults to project ID")
-    String getOutputDataset();
-    void setOutputDataset(String value);
-  }
-
-  public static void main(String[] args) throws IOException {
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
-    if (options.isStreaming()) {
-      // In order to cancel the pipelines automatically,
-      // {@literal DataflowPipelineRunner} is forced to be used.
-      options.setRunner(DataflowPipelineRunner.class);
-    }
-
-    options.setBigQuerySchema(FormatForBigquery.getSchema());
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
-
-    // We support running the same pipeline in either
-    // batch or windowed streaming mode.
-    PTransform<? super PBegin, PCollection<String>> readSource;
-    WindowFn<Object, ?> windowFn;
-    if (options.isStreaming()) {
-      Preconditions.checkArgument(
-          !options.getOutputToDatastore(), "DatastoreIO is not supported in streaming.");
-      dataflowUtils.setupPubsub();
-
-      readSource = PubsubIO.Read.topic(options.getPubsubTopic());
-      windowFn = SlidingWindows.of(Duration.standardMinutes(30)).every(Duration.standardSeconds(5));
-    } else {
-      readSource = TextIO.Read.from(options.getInputFile());
-      windowFn = new GlobalWindows();
-    }
-
-    // Create the pipeline.
-    Pipeline p = Pipeline.create(options);
-    PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
-      .apply(readSource)
-      .apply(ParDo.of(new ExtractHashtags()))
-      .apply(Window.<String>into(windowFn))
-      .apply(ComputeTopCompletions.top(10, options.getRecursive()));
-
-    if (options.getOutputToDatastore()) {
-      toWrite
-      .apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind())))
-      .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
-          options.getOutputDataset(), options.getProject())));
-    }
-    if (options.getOutputToBigQuery()) {
-      dataflowUtils.setupBigQueryTable();
-
-      TableReference tableRef = new TableReference();
-      tableRef.setProjectId(options.getProject());
-      tableRef.setDatasetId(options.getBigQueryDataset());
-      tableRef.setTableId(options.getBigQueryTable());
-
-      toWrite
-        .apply(ParDo.of(new FormatForBigquery()))
-        .apply(BigQueryIO.Write
-               .to(tableRef)
-               .withSchema(FormatForBigquery.getSchema())
-               .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-               .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
-    }
-
-    // Run the pipeline.
-    PipelineResult result = p.run();
-
-    if (options.isStreaming() && !options.getInputFile().isEmpty()) {
-      // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
-      dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic());
-    }
-
-    // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
-    dataflowUtils.waitToFinish(result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md
deleted file mode 100644
index 5fba154..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/README.md
+++ /dev/null
@@ -1,44 +0,0 @@
-
-# "Complete" Examples
-
-This directory contains end-to-end example pipelines that perform complex data processing tasks. They include:
-
-<ul>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java">AutoComplete</a>
-  &mdash; An example that computes the most popular hash tags for every
-  prefix, which can be used for auto-completion. Demonstrates how to use the
-  same pipeline in both streaming and batch, combiners, and composite
-  transforms.</li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java">StreamingWordExtract</a>
-  &mdash; A streaming pipeline example that inputs lines of text from a Cloud
-  Pub/Sub topic, splits each line into individual words, capitalizes those
-  words, and writes the output to a BigQuery table.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java">TfIdf</a>
-  &mdash; An example that computes a basic TF-IDF search table for a directory or
-  Cloud Storage prefix. Demonstrates joining data, side inputs, and logging.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java">TopWikipediaSessions</a>
-  &mdash; An example that reads Wikipedia edit data from Cloud Storage and
-  computes the user with the longest string of edits separated by no more than
-  an hour within each month. Demonstrates using Cloud Dataflow
-  <code>Windowing</code> to perform time-based aggregations of data.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java">TrafficMaxLaneFlow</a>
-  &mdash; A streaming Cloud Dataflow example using BigQuery output in the
-  <code>traffic sensor</code> domain. Demonstrates the Cloud Dataflow streaming
-  runner, sliding windows, Cloud Pub/Sub topic ingestion, the use of the
-  <code>AvroCoder</code> to encode a custom class, and custom
-  <code>Combine</code> transforms.
-  </li>
-  <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java">TrafficRoutes</a>
-  &mdash; A streaming Cloud Dataflow example using BigQuery output in the
-  <code>traffic sensor</code> domain. Demonstrates the Cloud Dataflow streaming
-  runner, <code>GroupByKey</code>, keyed state, sliding windows, and Cloud
-  Pub/Sub topic ingestion.
-  </li>
-  </ul>
-
-See the [documentation](https://cloud.google.com/dataflow/getting-started) and the [Examples
-README](../../../../../../../../../README.md) for
-information about how to run these examples.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java
deleted file mode 100644
index 99c5249..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/StreamingWordExtract.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-/**
- * A streaming Dataflow Example using BigQuery output.
- *
- * <p>This pipeline example reads lines of text from a PubSub topic, splits each line
- * into individual words, capitalizes those words, and writes the output to
- * a BigQuery table.
- *
- * <p>By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
- *
- * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
- */
-public class StreamingWordExtract {
-
-  /** A DoFn that tokenizes lines of text into individual words. */
-  static class ExtractWords extends DoFn<String, String> {
-    @Override
-    public void processElement(ProcessContext c) {
-      String[] words = c.element().split("[^a-zA-Z']+");
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  /** A DoFn that uppercases a word. */
-  static class Uppercase extends DoFn<String, String> {
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(c.element().toUpperCase());
-    }
-  }
-
-  /**
-   * Converts strings into BigQuery rows.
-   */
-  static class StringToRowConverter extends DoFn<String, TableRow> {
-    /**
-     * In this example, put the whole string into single BigQuery field.
-     */
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(new TableRow().set("string_field", c.element()));
-    }
-
-    static TableSchema getSchema() {
-      return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
-            // Compose the list of TableFieldSchema from tableSchema.
-            {
-              add(new TableFieldSchema().setName("string_field").setType("STRING"));
-            }
-      });
-    }
-  }
-
-  /**
-   * Options supported by {@link StreamingWordExtract}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private interface StreamingWordExtractOptions
-      extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
-    @Description("Input file to inject to Pub/Sub topic")
-    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
-    String getInputFile();
-    void setInputFile(String value);
-  }
-
-  /**
-   * Sets up and starts streaming pipeline.
-   *
-   * @throws IOException if there is a problem setting up resources
-   */
-  public static void main(String[] args) throws IOException {
-    StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
-        .withValidation()
-        .as(StreamingWordExtractOptions.class);
-    options.setStreaming(true);
-    // In order to cancel the pipelines automatically,
-    // {@literal DataflowPipelineRunner} is forced to be used.
-    options.setRunner(DataflowPipelineRunner.class);
-
-    options.setBigQuerySchema(StringToRowConverter.getSchema());
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
-    dataflowUtils.setup();
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    String tableSpec = new StringBuilder()
-        .append(options.getProject()).append(":")
-        .append(options.getBigQueryDataset()).append(".")
-        .append(options.getBigQueryTable())
-        .toString();
-    pipeline
-        .apply(PubsubIO.Read.topic(options.getPubsubTopic()))
-        .apply(ParDo.of(new ExtractWords()))
-        .apply(ParDo.of(new Uppercase()))
-        .apply(ParDo.of(new StringToRowConverter()))
-        .apply(BigQueryIO.Write.to(tableSpec)
-            .withSchema(StringToRowConverter.getSchema()));
-
-    PipelineResult result = pipeline.run();
-
-    if (!options.getInputFile().isEmpty()) {
-      // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
-      dataflowUtils.runInjectorPipeline(options.getInputFile(), options.getPubsubTopic());
-    }
-
-    // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
-    dataflowUtils.waitToFinish(result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java
deleted file mode 100644
index 65ac753..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TfIdf.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Keys;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.WithKeys;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.util.GcsUtil;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * An example that computes a basic TF-IDF search table for a directory or GCS prefix.
- *
- * <p>Concepts: joining data; side inputs; logging
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }</pre>
- * and a local output file or output prefix on GCS:
- * <pre>{@code
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * and an output prefix on GCS:
- *   --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- *
- * <p>The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with
- * {@code --input}.
- */
-public class TfIdf {
-  /**
-   * Options supported by {@link TfIdf}.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description("Path to the directory or GCS prefix containing files to read from")
-    @Default.String("gs://dataflow-samples/shakespeare/")
-    String getInput();
-    void setInput(String value);
-
-    @Description("Prefix of output URI to write to")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-  }
-
-  /**
-   * Lists documents contained beneath the {@code options.input} prefix/directory.
-   */
-  public static Set<URI> listInputDocuments(Options options)
-      throws URISyntaxException, IOException {
-    URI baseUri = new URI(options.getInput());
-
-    // List all documents in the directory or GCS prefix.
-    URI absoluteUri;
-    if (baseUri.getScheme() != null) {
-      absoluteUri = baseUri;
-    } else {
-      absoluteUri = new URI(
-          "file",
-          baseUri.getAuthority(),
-          baseUri.getPath(),
-          baseUri.getQuery(),
-          baseUri.getFragment());
-    }
-
-    Set<URI> uris = new HashSet<>();
-    if (absoluteUri.getScheme().equals("file")) {
-      File directory = new File(absoluteUri);
-      for (String entry : directory.list()) {
-        File path = new File(directory, entry);
-        uris.add(path.toURI());
-      }
-    } else if (absoluteUri.getScheme().equals("gs")) {
-      GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
-      URI gcsUriGlob = new URI(
-          absoluteUri.getScheme(),
-          absoluteUri.getAuthority(),
-          absoluteUri.getPath() + "*",
-          absoluteUri.getQuery(),
-          absoluteUri.getFragment());
-      for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) {
-        uris.add(entry.toUri());
-      }
-    }
-
-    return uris;
-  }
-
-  /**
-   * Reads the documents at the provided uris and returns all lines
-   * from the documents tagged with which document they are from.
-   */
-  public static class ReadDocuments
-      extends PTransform<PInput, PCollection<KV<URI, String>>> {
-    private Iterable<URI> uris;
-
-    public ReadDocuments(Iterable<URI> uris) {
-      this.uris = uris;
-    }
-
-    @Override
-    public Coder<?> getDefaultOutputCoder() {
-      return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
-    }
-
-    @Override
-    public PCollection<KV<URI, String>> apply(PInput input) {
-      Pipeline pipeline = input.getPipeline();
-
-      // Create one TextIO.Read transform for each document
-      // and add its output to a PCollectionList
-      PCollectionList<KV<URI, String>> urisToLines =
-          PCollectionList.empty(pipeline);
-
-      // TextIO.Read supports:
-      //  - file: URIs and paths locally
-      //  - gs: URIs on the service
-      for (final URI uri : uris) {
-        String uriString;
-        if (uri.getScheme().equals("file")) {
-          uriString = new File(uri).getPath();
-        } else {
-          uriString = uri.toString();
-        }
-
-        PCollection<KV<URI, String>> oneUriToLines = pipeline
-            .apply(TextIO.Read.from(uriString)
-                .named("TextIO.Read(" + uriString + ")"))
-            .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
-
-        urisToLines = urisToLines.and(oneUriToLines);
-      }
-
-      return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
-    }
-  }
-
-  /**
-   * A transform containing a basic TF-IDF pipeline. The input consists of KV objects
-   * where the key is the document's URI and the value is a piece
-   * of the document's content. The output is mapping from terms to
-   * scores for each document URI.
-   */
-  public static class ComputeTfIdf
-      extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> {
-    public ComputeTfIdf() { }
-
-    @Override
-    public PCollection<KV<String, KV<URI, Double>>> apply(
-      PCollection<KV<URI, String>> uriToContent) {
-
-      // Compute the total number of documents, and
-      // prepare this singleton PCollectionView for
-      // use as a side input.
-      final PCollectionView<Long> totalDocuments =
-          uriToContent
-          .apply("GetURIs", Keys.<URI>create())
-          .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create())
-          .apply(Count.<URI>globally())
-          .apply(View.<Long>asSingleton());
-
-      // Create a collection of pairs mapping a URI to each
-      // of the words in the document associated with that that URI.
-      PCollection<KV<URI, String>> uriToWords = uriToContent
-          .apply(ParDo.named("SplitWords").of(
-              new DoFn<KV<URI, String>, KV<URI, String>>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  URI uri = c.element().getKey();
-                  String line = c.element().getValue();
-                  for (String word : line.split("\\W+")) {
-                    // Log INFO messages when the word “love” is found.
-                    if (word.toLowerCase().equals("love")) {
-                      LOG.info("Found {}", word.toLowerCase());
-                    }
-
-                    if (!word.isEmpty()) {
-                      c.output(KV.of(uri, word.toLowerCase()));
-                    }
-                  }
-                }
-              }));
-
-      // Compute a mapping from each word to the total
-      // number of documents in which it appears.
-      PCollection<KV<String, Long>> wordToDocCount = uriToWords
-          .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create())
-          .apply(Values.<String>create())
-          .apply("CountDocs", Count.<String>perElement());
-
-      // Compute a mapping from each URI to the total
-      // number of words in the document associated with that URI.
-      PCollection<KV<URI, Long>> uriToWordTotal = uriToWords
-          .apply("GetURIs2", Keys.<URI>create())
-          .apply("CountWords", Count.<URI>perElement());
-
-      // Count, for each (URI, word) pair, the number of
-      // occurrences of that word in the document associated
-      // with the URI.
-      PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords
-          .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement());
-
-      // Adjust the above collection to a mapping from
-      // (URI, word) pairs to counts into an isomorphic mapping
-      // from URI to (word, count) pairs, to prepare for a join
-      // by the URI key.
-      PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
-          .apply(ParDo.named("ShiftKeys").of(
-              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  URI uri = c.element().getKey().getKey();
-                  String word = c.element().getKey().getValue();
-                  Long occurrences = c.element().getValue();
-                  c.output(KV.of(uri, KV.of(word, occurrences)));
-                }
-              }));
-
-      // Prepare to join the mapping of URI to (word, count) pairs with
-      // the mapping of URI to total word counts, by associating
-      // each of the input PCollection<KV<URI, ...>> with
-      // a tuple tag. Each input must have the same key type, URI
-      // in this case. The type parameter of the tuple tag matches
-      // the types of the values for each collection.
-      final TupleTag<Long> wordTotalsTag = new TupleTag<Long>();
-      final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<KV<String, Long>>();
-      KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple
-          .of(wordTotalsTag, uriToWordTotal)
-          .and(wordCountsTag, uriToWordAndCount);
-
-      // Perform a CoGroupByKey (a sort of pre-join) on the prepared
-      // inputs. This yields a mapping from URI to a CoGbkResult
-      // (CoGroupByKey Result). The CoGbkResult is a mapping
-      // from the above tuple tags to the values in each input
-      // associated with a particular URI. In this case, each
-      // KV<URI, CoGbkResult> group a URI with the total number of
-      // words in that document as well as all the (word, count)
-      // pairs for particular words.
-      PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput
-          .apply("CoGroupByUri", CoGroupByKey.<URI>create());
-
-      // Compute a mapping from each word to a (URI, term frequency)
-      // pair for each URI. A word's term frequency for a document
-      // is simply the number of times that word occurs in the document
-      // divided by the total number of words in the document.
-      PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
-          .apply(ParDo.named("ComputeTermFrequencies").of(
-              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  URI uri = c.element().getKey();
-                  Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
-
-                  for (KV<String, Long> wordAndCount
-                           : c.element().getValue().getAll(wordCountsTag)) {
-                    String word = wordAndCount.getKey();
-                    Long wordCount = wordAndCount.getValue();
-                    Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
-                    c.output(KV.of(word, KV.of(uri, termFrequency)));
-                  }
-                }
-              }));
-
-      // Compute a mapping from each word to its document frequency.
-      // A word's document frequency in a corpus is the number of
-      // documents in which the word appears divided by the total
-      // number of documents in the corpus. Note how the total number of
-      // documents is passed as a side input; the same value is
-      // presented to each invocation of the DoFn.
-      PCollection<KV<String, Double>> wordToDf = wordToDocCount
-          .apply(ParDo
-              .named("ComputeDocFrequencies")
-              .withSideInputs(totalDocuments)
-              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  String word = c.element().getKey();
-                  Long documentCount = c.element().getValue();
-                  Long documentTotal = c.sideInput(totalDocuments);
-                  Double documentFrequency = documentCount.doubleValue()
-                      / documentTotal.doubleValue();
-
-                  c.output(KV.of(word, documentFrequency));
-                }
-              }));
-
-      // Join the term frequency and document frequency
-      // collections, each keyed on the word.
-      final TupleTag<KV<URI, Double>> tfTag = new TupleTag<KV<URI, Double>>();
-      final TupleTag<Double> dfTag = new TupleTag<Double>();
-      PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple
-          .of(tfTag, wordToUriAndTf)
-          .and(dfTag, wordToDf)
-          .apply(CoGroupByKey.<String>create());
-
-      // Compute a mapping from each word to a (URI, TF-IDF) score
-      // for each URI. There are a variety of definitions of TF-IDF
-      // ("term frequency - inverse document frequency") score;
-      // here we use a basic version that is the term frequency
-      // divided by the log of the document frequency.
-      PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf
-          .apply(ParDo.named("ComputeTfIdf").of(
-              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  String word = c.element().getKey();
-                  Double df = c.element().getValue().getOnly(dfTag);
-
-                  for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
-                    URI uri = uriAndTf.getKey();
-                    Double tf = uriAndTf.getValue();
-                    Double tfIdf = tf * Math.log(1 / df);
-                    c.output(KV.of(word, KV.of(uri, tfIdf)));
-                  }
-                }
-              }));
-
-      return wordToUriAndTfIdf;
-    }
-
-    // Instantiate Logger.
-    // It is suggested that the user specify the class name of the containing class
-    // (in this case ComputeTfIdf).
-    private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class);
-  }
-
-  /**
-   * A {@link PTransform} to write, in CSV format, a mapping from term and URI
-   * to score.
-   */
-  public static class WriteTfIdf
-      extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> {
-    private String output;
-
-    public WriteTfIdf(String output) {
-      this.output = output;
-    }
-
-    @Override
-    public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
-      return wordToUriAndTfIdf
-          .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() {
-            @Override
-            public void processElement(ProcessContext c) {
-              c.output(String.format("%s,\t%s,\t%f",
-                  c.element().getKey(),
-                  c.element().getValue().getKey(),
-                  c.element().getValue().getValue()));
-            }
-          }))
-          .apply(TextIO.Write
-              .to(output)
-              .withSuffix(".csv"));
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
-
-    pipeline
-        .apply(new ReadDocuments(listInputDocuments(options)))
-        .apply(new ComputeTfIdf())
-        .apply(new WriteTfIdf(options.getOutput()));
-
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
deleted file mode 100644
index c57a5f2..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.List;
-
-/**
- * An example that reads Wikipedia edit data from Cloud Storage and computes the user with
- * the longest string of edits separated by no more than an hour within each month.
- *
- * <p>Concepts: Using Windowing to perform time-based aggregations of data.
- *
- * <p>It is not recommended to execute this pipeline locally, given the size of the default input
- * data.
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and an output prefix on GCS:
- * <pre>{@code
- *   --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- *
- * <p>The default input is {@code gs://dataflow-samples/wikipedia_edits/*.json} and can be
- * overridden with {@code --input}.
- *
- * <p>The input for this example is large enough that it's a good place to enable (experimental)
- * autoscaling:
- * <pre>{@code
- *   --autoscalingAlgorithm=BASIC
- *   --maxNumWorkers=20
- * }
- * </pre>
- * This will automatically scale the number of workers up over time until the job completes.
- */
-public class TopWikipediaSessions {
-  private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json";
-
-  /**
-   * Extracts user and timestamp from a TableRow representing a Wikipedia edit.
-   */
-  static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = c.element();
-      int timestamp = (Integer) row.get("timestamp");
-      String userName = (String) row.get("contributor_username");
-      if (userName != null) {
-        // Sets the implicit timestamp field to be used in windowing.
-        c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
-      }
-    }
-  }
-
-  /**
-   * Computes the number of edits in each user session.  A session is defined as
-   * a string of edits where each is separated from the next by less than an hour.
-   */
-  static class ComputeSessions
-      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
-    @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
-      return actions
-          .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
-
-          .apply(Count.<String>perElement());
-    }
-  }
-
-  /**
-   * Computes the longest session ending in each month.
-   */
-  private static class TopPerMonth
-      extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
-    @Override
-    public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
-      return sessions
-        .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
-
-          .apply(Top.of(1, new SerializableComparator<KV<String, Long>>() {
-                    @Override
-                    public int compare(KV<String, Long> o1, KV<String, Long> o2) {
-                      return Long.compare(o1.getValue(), o2.getValue());
-                    }
-                  }).withoutDefaults());
-    }
-  }
-
-  static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>>
-      implements RequiresWindowAccess {
-
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(KV.of(
-          c.element().getKey() + " : " + c.window(), c.element().getValue()));
-    }
-  }
-
-  static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String>
-      implements RequiresWindowAccess {
-    @Override
-    public void processElement(ProcessContext c) {
-      for (KV<String, Long> item : c.element()) {
-        String session = item.getKey();
-        long count = item.getValue();
-        c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start());
-      }
-    }
-  }
-
-  static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> {
-
-    private final double samplingThreshold;
-
-    public ComputeTopSessions(double samplingThreshold) {
-      this.samplingThreshold = samplingThreshold;
-    }
-
-    @Override
-    public PCollection<String> apply(PCollection<TableRow> input) {
-      return input
-          .apply(ParDo.of(new ExtractUserAndTimestamp()))
-
-          .apply(ParDo.named("SampleUsers").of(
-              new DoFn<String, String>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
-                    c.output(c.element());
-                  }
-                }
-              }))
-
-          .apply(new ComputeSessions())
-
-          .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
-          .apply(new TopPerMonth())
-          .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
-    }
-  }
-
-  /**
-   * Options supported by this class.
-   *
-   * <p>Inherits standard Dataflow configuration options.
-   */
-  private static interface Options extends PipelineOptions {
-    @Description(
-      "Input specified as a GCS path containing a BigQuery table exported as json")
-    @Default.String(EXPORTED_WIKI_TABLE)
-    String getInput();
-    void setInput(String value);
-
-    @Description("File to output results to")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-  }
-
-  public static void main(String[] args) {
-    Options options = PipelineOptionsFactory.fromArgs(args)
-        .withValidation()
-        .as(Options.class);
-    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-
-    Pipeline p = Pipeline.create(dataflowOptions);
-
-    double samplingThreshold = 0.1;
-
-    p.apply(TextIO.Read
-        .from(options.getInput())
-        .withCoder(TableRowJsonCoder.of()))
-     .apply(new ComputeTopSessions(samplingThreshold))
-     .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
deleted file mode 100644
index 2d54252..0000000
--- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Strings;
-
-import org.apache.avro.reflect.Nullable;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
- * You can configure the running mode by setting {@literal --streaming} to true or false.
- *
- * <p>Concepts: The batch and streaming runners, sliding windows, Google Cloud Pub/Sub
- * topic injection, use of the AvroCoder to encode a custom class, and custom Combine transforms.
- *
- * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
- * it finds the lane that had the highest flow recorded, for each sensor station. It writes
- * those max values along with auxiliary info to a BigQuery table.
- *
- * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
- *
- * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
- * By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
- * is provided in
- * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
- *
- * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
- */
-public class TrafficMaxLaneFlow {
-
-  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
-  private static final Integer VALID_INPUTS = 4999;
-
-  static final int WINDOW_DURATION = 60;  // Default sliding window duration in minutes
-  static final int WINDOW_SLIDE_EVERY = 5;  // Default window 'slide every' setting in minutes
-
-  /**
-   * This class holds information about each lane in a station reading, along with some general
-   * information from the reading.
-   */
-  @DefaultCoder(AvroCoder.class)
-  static class LaneInfo {
-    @Nullable String stationId;
-    @Nullable String lane;
-    @Nullable String direction;
-    @Nullable String freeway;
-    @Nullable String recordedTimestamp;
-    @Nullable Integer laneFlow;
-    @Nullable Integer totalFlow;
-    @Nullable Double laneAO;
-    @Nullable Double laneAS;
-
-    public LaneInfo() {}
-
-    public LaneInfo(String stationId, String lane, String direction, String freeway,
-        String timestamp, Integer laneFlow, Double laneAO,
-        Double laneAS, Integer totalFlow) {
-      this.stationId = stationId;
-      this.lane = lane;
-      this.direction = direction;
-      this.freeway = freeway;
-      this.recordedTimestamp = timestamp;
-      this.laneFlow = laneFlow;
-      this.laneAO = laneAO;
-      this.laneAS = laneAS;
-      this.totalFlow = totalFlow;
-    }
-
-    public String getStationId() {
-      return this.stationId;
-    }
-    public String getLane() {
-      return this.lane;
-    }
-    public String getDirection() {
-      return this.direction;
-    }
-    public String getFreeway() {
-      return this.freeway;
-    }
-    public String getRecordedTimestamp() {
-      return this.recordedTimestamp;
-    }
-    public Integer getLaneFlow() {
-      return this.laneFlow;
-    }
-    public Double getLaneAO() {
-      return this.laneAO;
-    }
-    public Double getLaneAS() {
-      return this.laneAS;
-    }
-    public Integer getTotalFlow() {
-      return this.totalFlow;
-    }
-  }
-
-  /**
-   * Extract the timestamp field from the input string, and use it as the element timestamp.
-   */
-  static class ExtractTimestamps extends DoFn<String, String> {
-    private static final DateTimeFormatter dateTimeFormat =
-        DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
-
-    @Override
-    public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
-      String[] items = c.element().split(",");
-      if (items.length > 0) {
-        try {
-          String timestamp = items[0];
-          c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
-        } catch (IllegalArgumentException e) {
-          // Skip the invalid input.
-        }
-      }
-    }
-  }
-
-  /**
-   * Extract flow information for each of the 8 lanes in a reading, and output as separate tuples.
-   * This will let us determine which lane has the max flow for that station over the span of the
-   * window, and output not only the max flow from that calculation, but other associated
-   * information. The number of lanes for which data is present depends upon which freeway the data
-   * point comes from.
-   */
-  static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {
-
-    @Override
-    public void processElement(ProcessContext c) {
-      String[] items = c.element().split(",");
-      if (items.length < 48) {
-        // Skip the invalid input.
-        return;
-      }
-      // extract the sensor information for the lanes from the input string fields.
-      String timestamp = items[0];
-      String stationId = items[1];
-      String freeway = items[2];
-      String direction = items[3];
-      Integer totalFlow = tryIntParse(items[7]);
-      for (int i = 1; i <= 8; ++i) {
-        Integer laneFlow = tryIntParse(items[6 + 5 * i]);
-        Double laneAvgOccupancy = tryDoubleParse(items[7 + 5 * i]);
-        Double laneAvgSpeed = tryDoubleParse(items[8 + 5 * i]);
-        if (laneFlow == null || laneAvgOccupancy == null || laneAvgSpeed == null) {
-          return;
-        }
-        LaneInfo laneInfo = new LaneInfo(stationId, "lane" + i, direction, freeway, timestamp,
-            laneFlow, laneAvgOccupancy, laneAvgSpeed, totalFlow);
-        c.output(KV.of(stationId, laneInfo));
-      }
-    }
-  }
-
-  /**
-   * A custom 'combine function' used with the Combine.perKey transform. Used to find the max lane
-   * flow over all the data points in the Window. Extracts the lane flow from the input string and
-   * determines whether it's the max seen so far. We're using a custom combiner instead of the Max
-   * transform because we want to retain the additional information we've associated with the flow
-   * value.
-   */
-  public static class MaxFlow implements SerializableFunction<Iterable<LaneInfo>, LaneInfo> {
-    @Override
-    public LaneInfo apply(Iterable<LaneInfo> input) {
-      Integer max = 0;
-      LaneInfo maxInfo = new LaneInfo();
-      for (LaneInfo item : input) {
-        Integer flow = item.getLaneFlow();
-        if (flow != null && (flow >= max)) {
-          max = flow;
-          maxInfo = item;
-        }
-      }
-      return maxInfo;
-    }
-  }
-
-  /**
-   * Format the results of the Max Lane flow calculation to a TableRow, to save to BigQuery.
-   * Add the timestamp from the window context.
-   */
-  static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> {
-    @Override
-    public void processElement(ProcessContext c) {
-
-      LaneInfo laneInfo = c.element().getValue();
-      TableRow row = new TableRow()
-          .set("station_id", c.element().getKey())
-          .set("direction", laneInfo.getDirection())
-          .set("freeway", laneInfo.getFreeway())
-          .set("lane_max_flow", laneInfo.getLaneFlow())
-          .set("lane", laneInfo.getLane())
-          .set("avg_occ", laneInfo.getLaneAO())
-          .set("avg_speed", laneInfo.getLaneAS())
-          .set("total_flow", laneInfo.getTotalFlow())
-          .set("recorded_timestamp", laneInfo.getRecordedTimestamp())
-          .set("window_timestamp", c.timestamp().toString());
-      c.output(row);
-    }
-
-    /** Defines the BigQuery schema used for the output. */
-    static TableSchema getSchema() {
-      List<TableFieldSchema> fields = new ArrayList<>();
-      fields.add(new TableFieldSchema().setName("station_id").setType("STRING"));
-      fields.add(new TableFieldSchema().setName("direction").setType("STRING"));
-      fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
-      fields.add(new TableFieldSchema().setName("lane_max_flow").setType("INTEGER"));
-      fields.add(new TableFieldSchema().setName("lane").setType("STRING"));
-      fields.add(new TableFieldSchema().setName("avg_occ").setType("FLOAT"));
-      fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
-      fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
-      fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
-      fields.add(new TableFieldSchema().setName("recorded_timestamp").setType("STRING"));
-      TableSchema schema = new TableSchema().setFields(fields);
-      return schema;
-    }
-  }
-
-  /**
-   * This PTransform extracts lane info, calculates the max lane flow found for a given station (for
-   * the current Window) using a custom 'combiner', and formats the results for BigQuery.
-   */
-  static class MaxLaneFlow
-      extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> {
-    @Override
-    public PCollection<TableRow> apply(PCollection<KV<String, LaneInfo>> flowInfo) {
-      // stationId, LaneInfo => stationId + max lane flow info
-      PCollection<KV<String, LaneInfo>> flowMaxes =
-          flowInfo.apply(Combine.<String, LaneInfo>perKey(
-              new MaxFlow()));
-
-      // <stationId, max lane flow info>... => row...
-      PCollection<TableRow> results = flowMaxes.apply(
-          ParDo.of(new FormatMaxesFn()));
-
-      return results;
-    }
-  }
-
-  static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
-    private final String inputFile;
-
-    public ReadFileAndExtractTimestamps(String inputFile) {
-      this.inputFile = inputFile;
-    }
-
-    @Override
-    public PCollection<String> apply(PBegin begin) {
-      return begin
-          .apply(TextIO.Read.from(inputFile))
-          .apply(ParDo.of(new ExtractTimestamps()));
-    }
-  }
-
-  /**
-    * Options supported by {@link TrafficMaxLaneFlow}.
-    *
-    * <p>Inherits standard configuration options.
-    */
-  private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions,
-      ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
-        @Description("Input file to inject to Pub/Sub topic")
-    @Default.String("gs://dataflow-samples/traffic_sensor/"
-        + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
-    String getInputFile();
-    void setInputFile(String value);
-
-    @Description("Numeric value of sliding window duration, in minutes")
-    @Default.Integer(WINDOW_DURATION)
-    Integer getWindowDuration();
-    void setWindowDuration(Integer value);
-
-    @Description("Numeric value of window 'slide every' setting, in minutes")
-    @Default.Integer(WINDOW_SLIDE_EVERY)
-    Integer getWindowSlideEvery();
-    void setWindowSlideEvery(Integer value);
-
-    @Description("Whether to run the pipeline with unbounded input")
-    @Default.Boolean(false)
-    boolean isUnbounded();
-    void setUnbounded(boolean value);
-  }
-
-  /**
-   * Sets up and starts streaming pipeline.
-   *
-   * @throws IOException if there is a problem setting up resources
-   */
-  public static void main(String[] args) throws IOException {
-    TrafficMaxLaneFlowOptions options = PipelineOptionsFactory.fromArgs(args)
-        .withValidation()
-        .as(TrafficMaxLaneFlowOptions.class);
-    options.setBigQuerySchema(FormatMaxesFn.getSchema());
-    // Using DataflowExampleUtils to set up required resources.
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
-
-    Pipeline pipeline = Pipeline.create(options);
-    TableReference tableRef = new TableReference();
-    tableRef.setProjectId(options.getProject());
-    tableRef.setDatasetId(options.getBigQueryDataset());
-    tableRef.setTableId(options.getBigQueryTable());
-
-    PCollection<String> input;
-    if (options.isUnbounded()) {
-      // Read unbounded PubSubIO.
-      input = pipeline.apply(PubsubIO.Read
-          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
-          .subscription(options.getPubsubSubscription()));
-    } else {
-      // Read bounded PubSubIO.
-      input = pipeline.apply(PubsubIO.Read
-          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
-          .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
-
-      // To read bounded TextIO files, use:
-      // input = pipeline.apply(new ReadFileAndExtractTimestamps(options.getInputFile()));
-    }
-    input
-        // row... => <station route, station speed> ...
-        .apply(ParDo.of(new ExtractFlowInfoFn()))
-        // map the incoming data stream into sliding windows. The default window duration values
-        // work well if you're running the accompanying Pub/Sub generator script with the
-        // --replay flag, which simulates pauses in the sensor data publication. You may want to
-        // adjust them otherwise.
-        .apply(Window.<KV<String, LaneInfo>>into(SlidingWindows.of(
-            Duration.standardMinutes(options.getWindowDuration())).
-            every(Duration.standardMinutes(options.getWindowSlideEvery()))))
-        .apply(new MaxLaneFlow())
-        .apply(BigQueryIO.Write.to(tableRef)
-            .withSchema(FormatMaxesFn.getSchema()));
-
-    // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
-    if (!Strings.isNullOrEmpty(options.getInputFile())
-        && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
-      dataflowUtils.runInjectorPipeline(
-          new ReadFileAndExtractTimestamps(options.getInputFile()),
-          options.getPubsubTopic(),
-          PUBSUB_TIMESTAMP_LABEL_KEY);
-    }
-
-    // Run the pipeline.
-    PipelineResult result = pipeline.run();
-
-    // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
-    dataflowUtils.waitToFinish(result);
-  }
-
-  private static Integer tryIntParse(String number) {
-    try {
-      return Integer.parseInt(number);
-    } catch (NumberFormatException e) {
-      return null;
-    }
-  }
-
-  private static Double tryDoubleParse(String number) {
-    try {
-      return Double.parseDouble(number);
-    } catch (NumberFormatException e) {
-      return null;
-    }
-  }
-}