You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:48:22 UTC

[58/67] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
new file mode 100644
index 0000000..c57a5f2
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
+import com.google.cloud.dataflow.sdk.transforms.Top;
+import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.List;
+
+/**
+ * An example that reads Wikipedia edit data from Cloud Storage and computes the user with
+ * the longest string of edits separated by no more than an hour within each month.
+ *
+ * <p>Concepts: Using Windowing to perform time-based aggregations of data.
+ *
+ * <p>It is not recommended to execute this pipeline locally, given the size of the default input
+ * data.
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and an output prefix on GCS:
+ * <pre>{@code
+ *   --output=gs://YOUR_OUTPUT_PREFIX
+ * }</pre>
+ *
+ * <p>The default input is {@code gs://dataflow-samples/wikipedia_edits/*.json} and can be
+ * overridden with {@code --input}.
+ *
+ * <p>The input for this example is large enough that it's a good place to enable (experimental)
+ * autoscaling:
+ * <pre>{@code
+ *   --autoscalingAlgorithm=BASIC
+ *   --maxNumWorkers=20
+ * }
+ * </pre>
+ * This will automatically scale the number of workers up over time until the job completes.
+ */
+public class TopWikipediaSessions {
+  private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json";
+
+  /**
+   * Extracts user and timestamp from a TableRow representing a Wikipedia edit.
+   */
+  static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      int timestamp = (Integer) row.get("timestamp");
+      String userName = (String) row.get("contributor_username");
+      if (userName != null) {
+        // Sets the implicit timestamp field to be used in windowing.
+        c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+      }
+    }
+  }
+
+  /**
+   * Computes the number of edits in each user session.  A session is defined as
+   * a string of edits where each is separated from the next by less than an hour.
+   */
+  static class ComputeSessions
+      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
+    @Override
+    public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
+      return actions
+          .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
+
+          .apply(Count.<String>perElement());
+    }
+  }
+
+  /**
+   * Computes the longest session ending in each month.
+   */
+  private static class TopPerMonth
+      extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
+    @Override
+    public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
+      return sessions
+        .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
+
+          .apply(Top.of(1, new SerializableComparator<KV<String, Long>>() {
+                    @Override
+                    public int compare(KV<String, Long> o1, KV<String, Long> o2) {
+                      return Long.compare(o1.getValue(), o2.getValue());
+                    }
+                  }).withoutDefaults());
+    }
+  }
+
+  static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>>
+      implements RequiresWindowAccess {
+
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(KV.of(
+          c.element().getKey() + " : " + c.window(), c.element().getValue()));
+    }
+  }
+
+  static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String>
+      implements RequiresWindowAccess {
+    @Override
+    public void processElement(ProcessContext c) {
+      for (KV<String, Long> item : c.element()) {
+        String session = item.getKey();
+        long count = item.getValue();
+        c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start());
+      }
+    }
+  }
+
+  static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> {
+
+    private final double samplingThreshold;
+
+    public ComputeTopSessions(double samplingThreshold) {
+      this.samplingThreshold = samplingThreshold;
+    }
+
+    @Override
+    public PCollection<String> apply(PCollection<TableRow> input) {
+      return input
+          .apply(ParDo.of(new ExtractUserAndTimestamp()))
+
+          .apply(ParDo.named("SampleUsers").of(
+              new DoFn<String, String>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
+                    c.output(c.element());
+                  }
+                }
+              }))
+
+          .apply(new ComputeSessions())
+
+          .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
+          .apply(new TopPerMonth())
+          .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
+    }
+  }
+
+  /**
+   * Options supported by this class.
+   *
+   * <p>Inherits standard Dataflow configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description(
+      "Input specified as a GCS path containing a BigQuery table exported as json")
+    @Default.String(EXPORTED_WIKI_TABLE)
+    String getInput();
+    void setInput(String value);
+
+    @Description("File to output results to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) {
+    Options options = PipelineOptionsFactory.fromArgs(args)
+        .withValidation()
+        .as(Options.class);
+    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+
+    Pipeline p = Pipeline.create(dataflowOptions);
+
+    double samplingThreshold = 0.1;
+
+    p.apply(TextIO.Read
+        .from(options.getInput())
+        .withCoder(TableRowJsonCoder.of()))
+     .apply(new ComputeTopSessions(samplingThreshold))
+     .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
new file mode 100644
index 0000000..2d54252
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
+import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Strings;
+
+import org.apache.avro.reflect.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
+ * You can configure the running mode by setting {@literal --streaming} to true or false.
+ *
+ * <p>Concepts: The batch and streaming runners, sliding windows, Google Cloud Pub/Sub
+ * topic injection, use of the AvroCoder to encode a custom class, and custom Combine transforms.
+ *
+ * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
+ * it finds the lane that had the highest flow recorded, for each sensor station. It writes
+ * those max values along with auxiliary info to a BigQuery table.
+ *
+ * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
+ *
+ * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
+ * By default, the example will run a separate pipeline to inject the data from the default
+ * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
+ * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
+ * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
+ * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
+ * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
+ * is provided in
+ * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
+ *
+ * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
+ * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+public class TrafficMaxLaneFlow {
+
+  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+  private static final Integer VALID_INPUTS = 4999;
+
+  static final int WINDOW_DURATION = 60;  // Default sliding window duration in minutes
+  static final int WINDOW_SLIDE_EVERY = 5;  // Default window 'slide every' setting in minutes
+
+  /**
+   * This class holds information about each lane in a station reading, along with some general
+   * information from the reading.
+   */
+  @DefaultCoder(AvroCoder.class)
+  static class LaneInfo {
+    @Nullable String stationId;
+    @Nullable String lane;
+    @Nullable String direction;
+    @Nullable String freeway;
+    @Nullable String recordedTimestamp;
+    @Nullable Integer laneFlow;
+    @Nullable Integer totalFlow;
+    @Nullable Double laneAO;
+    @Nullable Double laneAS;
+
+    public LaneInfo() {}
+
+    public LaneInfo(String stationId, String lane, String direction, String freeway,
+        String timestamp, Integer laneFlow, Double laneAO,
+        Double laneAS, Integer totalFlow) {
+      this.stationId = stationId;
+      this.lane = lane;
+      this.direction = direction;
+      this.freeway = freeway;
+      this.recordedTimestamp = timestamp;
+      this.laneFlow = laneFlow;
+      this.laneAO = laneAO;
+      this.laneAS = laneAS;
+      this.totalFlow = totalFlow;
+    }
+
+    public String getStationId() {
+      return this.stationId;
+    }
+    public String getLane() {
+      return this.lane;
+    }
+    public String getDirection() {
+      return this.direction;
+    }
+    public String getFreeway() {
+      return this.freeway;
+    }
+    public String getRecordedTimestamp() {
+      return this.recordedTimestamp;
+    }
+    public Integer getLaneFlow() {
+      return this.laneFlow;
+    }
+    public Double getLaneAO() {
+      return this.laneAO;
+    }
+    public Double getLaneAS() {
+      return this.laneAS;
+    }
+    public Integer getTotalFlow() {
+      return this.totalFlow;
+    }
+  }
+
+  /**
+   * Extract the timestamp field from the input string, and use it as the element timestamp.
+   */
+  static class ExtractTimestamps extends DoFn<String, String> {
+    private static final DateTimeFormatter dateTimeFormat =
+        DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
+
+    @Override
+    public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+      String[] items = c.element().split(",");
+      if (items.length > 0) {
+        try {
+          String timestamp = items[0];
+          c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
+        } catch (IllegalArgumentException e) {
+          // Skip the invalid input.
+        }
+      }
+    }
+  }
+
+  /**
+   * Extract flow information for each of the 8 lanes in a reading, and output as separate tuples.
+   * This will let us determine which lane has the max flow for that station over the span of the
+   * window, and output not only the max flow from that calculation, but other associated
+   * information. The number of lanes for which data is present depends upon which freeway the data
+   * point comes from.
+   */
+  static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {
+
+    @Override
+    public void processElement(ProcessContext c) {
+      String[] items = c.element().split(",");
+      if (items.length < 48) {
+        // Skip the invalid input.
+        return;
+      }
+      // extract the sensor information for the lanes from the input string fields.
+      String timestamp = items[0];
+      String stationId = items[1];
+      String freeway = items[2];
+      String direction = items[3];
+      Integer totalFlow = tryIntParse(items[7]);
+      for (int i = 1; i <= 8; ++i) {
+        Integer laneFlow = tryIntParse(items[6 + 5 * i]);
+        Double laneAvgOccupancy = tryDoubleParse(items[7 + 5 * i]);
+        Double laneAvgSpeed = tryDoubleParse(items[8 + 5 * i]);
+        if (laneFlow == null || laneAvgOccupancy == null || laneAvgSpeed == null) {
+          return;
+        }
+        LaneInfo laneInfo = new LaneInfo(stationId, "lane" + i, direction, freeway, timestamp,
+            laneFlow, laneAvgOccupancy, laneAvgSpeed, totalFlow);
+        c.output(KV.of(stationId, laneInfo));
+      }
+    }
+  }
+
+  /**
+   * A custom 'combine function' used with the Combine.perKey transform. Used to find the max lane
+   * flow over all the data points in the Window. Extracts the lane flow from the input string and
+   * determines whether it's the max seen so far. We're using a custom combiner instead of the Max
+   * transform because we want to retain the additional information we've associated with the flow
+   * value.
+   */
+  public static class MaxFlow implements SerializableFunction<Iterable<LaneInfo>, LaneInfo> {
+    @Override
+    public LaneInfo apply(Iterable<LaneInfo> input) {
+      Integer max = 0;
+      LaneInfo maxInfo = new LaneInfo();
+      for (LaneInfo item : input) {
+        Integer flow = item.getLaneFlow();
+        if (flow != null && (flow >= max)) {
+          max = flow;
+          maxInfo = item;
+        }
+      }
+      return maxInfo;
+    }
+  }
+
+  /**
+   * Format the results of the Max Lane flow calculation to a TableRow, to save to BigQuery.
+   * Add the timestamp from the window context.
+   */
+  static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> {
+    @Override
+    public void processElement(ProcessContext c) {
+
+      LaneInfo laneInfo = c.element().getValue();
+      TableRow row = new TableRow()
+          .set("station_id", c.element().getKey())
+          .set("direction", laneInfo.getDirection())
+          .set("freeway", laneInfo.getFreeway())
+          .set("lane_max_flow", laneInfo.getLaneFlow())
+          .set("lane", laneInfo.getLane())
+          .set("avg_occ", laneInfo.getLaneAO())
+          .set("avg_speed", laneInfo.getLaneAS())
+          .set("total_flow", laneInfo.getTotalFlow())
+          .set("recorded_timestamp", laneInfo.getRecordedTimestamp())
+          .set("window_timestamp", c.timestamp().toString());
+      c.output(row);
+    }
+
+    /** Defines the BigQuery schema used for the output. */
+    static TableSchema getSchema() {
+      List<TableFieldSchema> fields = new ArrayList<>();
+      fields.add(new TableFieldSchema().setName("station_id").setType("STRING"));
+      fields.add(new TableFieldSchema().setName("direction").setType("STRING"));
+      fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
+      fields.add(new TableFieldSchema().setName("lane_max_flow").setType("INTEGER"));
+      fields.add(new TableFieldSchema().setName("lane").setType("STRING"));
+      fields.add(new TableFieldSchema().setName("avg_occ").setType("FLOAT"));
+      fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
+      fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
+      fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
+      fields.add(new TableFieldSchema().setName("recorded_timestamp").setType("STRING"));
+      TableSchema schema = new TableSchema().setFields(fields);
+      return schema;
+    }
+  }
+
+  /**
+   * This PTransform extracts lane info, calculates the max lane flow found for a given station (for
+   * the current Window) using a custom 'combiner', and formats the results for BigQuery.
+   */
+  static class MaxLaneFlow
+      extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> {
+    @Override
+    public PCollection<TableRow> apply(PCollection<KV<String, LaneInfo>> flowInfo) {
+      // stationId, LaneInfo => stationId + max lane flow info
+      PCollection<KV<String, LaneInfo>> flowMaxes =
+          flowInfo.apply(Combine.<String, LaneInfo>perKey(
+              new MaxFlow()));
+
+      // <stationId, max lane flow info>... => row...
+      PCollection<TableRow> results = flowMaxes.apply(
+          ParDo.of(new FormatMaxesFn()));
+
+      return results;
+    }
+  }
+
+  static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
+    private final String inputFile;
+
+    public ReadFileAndExtractTimestamps(String inputFile) {
+      this.inputFile = inputFile;
+    }
+
+    @Override
+    public PCollection<String> apply(PBegin begin) {
+      return begin
+          .apply(TextIO.Read.from(inputFile))
+          .apply(ParDo.of(new ExtractTimestamps()));
+    }
+  }
+
+  /**
+    * Options supported by {@link TrafficMaxLaneFlow}.
+    *
+    * <p>Inherits standard configuration options.
+    */
+  private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions,
+      ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
+        @Description("Input file to inject to Pub/Sub topic")
+    @Default.String("gs://dataflow-samples/traffic_sensor/"
+        + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
+    String getInputFile();
+    void setInputFile(String value);
+
+    @Description("Numeric value of sliding window duration, in minutes")
+    @Default.Integer(WINDOW_DURATION)
+    Integer getWindowDuration();
+    void setWindowDuration(Integer value);
+
+    @Description("Numeric value of window 'slide every' setting, in minutes")
+    @Default.Integer(WINDOW_SLIDE_EVERY)
+    Integer getWindowSlideEvery();
+    void setWindowSlideEvery(Integer value);
+
+    @Description("Whether to run the pipeline with unbounded input")
+    @Default.Boolean(false)
+    boolean isUnbounded();
+    void setUnbounded(boolean value);
+  }
+
+  /**
+   * Sets up and starts streaming pipeline.
+   *
+   * @throws IOException if there is a problem setting up resources
+   */
+  public static void main(String[] args) throws IOException {
+    TrafficMaxLaneFlowOptions options = PipelineOptionsFactory.fromArgs(args)
+        .withValidation()
+        .as(TrafficMaxLaneFlowOptions.class);
+    options.setBigQuerySchema(FormatMaxesFn.getSchema());
+    // Using DataflowExampleUtils to set up required resources.
+    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
+
+    Pipeline pipeline = Pipeline.create(options);
+    TableReference tableRef = new TableReference();
+    tableRef.setProjectId(options.getProject());
+    tableRef.setDatasetId(options.getBigQueryDataset());
+    tableRef.setTableId(options.getBigQueryTable());
+
+    PCollection<String> input;
+    if (options.isUnbounded()) {
+      // Read unbounded PubSubIO.
+      input = pipeline.apply(PubsubIO.Read
+          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+          .subscription(options.getPubsubSubscription()));
+    } else {
+      // Read bounded PubSubIO.
+      input = pipeline.apply(PubsubIO.Read
+          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+          .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
+
+      // To read bounded TextIO files, use:
+      // input = pipeline.apply(new ReadFileAndExtractTimestamps(options.getInputFile()));
+    }
+    input
+        // row... => <station route, station speed> ...
+        .apply(ParDo.of(new ExtractFlowInfoFn()))
+        // map the incoming data stream into sliding windows. The default window duration values
+        // work well if you're running the accompanying Pub/Sub generator script with the
+        // --replay flag, which simulates pauses in the sensor data publication. You may want to
+        // adjust them otherwise.
+        .apply(Window.<KV<String, LaneInfo>>into(SlidingWindows.of(
+            Duration.standardMinutes(options.getWindowDuration())).
+            every(Duration.standardMinutes(options.getWindowSlideEvery()))))
+        .apply(new MaxLaneFlow())
+        .apply(BigQueryIO.Write.to(tableRef)
+            .withSchema(FormatMaxesFn.getSchema()));
+
+    // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
+    if (!Strings.isNullOrEmpty(options.getInputFile())
+        && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
+      dataflowUtils.runInjectorPipeline(
+          new ReadFileAndExtractTimestamps(options.getInputFile()),
+          options.getPubsubTopic(),
+          PUBSUB_TIMESTAMP_LABEL_KEY);
+    }
+
+    // Run the pipeline.
+    PipelineResult result = pipeline.run();
+
+    // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
+    dataflowUtils.waitToFinish(result);
+  }
+
+  private static Integer tryIntParse(String number) {
+    try {
+      return Integer.parseInt(number);
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+
+  private static Double tryDoubleParse(String number) {
+    try {
+      return Double.parseDouble(number);
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
new file mode 100644
index 0000000..e3e88c2
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
@@ -0,0 +1,459 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.complete;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
+import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.avro.reflect.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
+ * You can configure the running mode by setting {@literal --streaming} to true or false.
+ *
+ * <p>Concepts: The batch and streaming runners, GroupByKey, sliding windows, and
+ * Google Cloud Pub/Sub topic injection.
+ *
+ * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
+ * it calculates the average speed over the window for some small set of predefined 'routes',
+ * and looks for 'slowdowns' in those routes. It writes its results to a BigQuery table.
+ *
+ * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
+ *
+ * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
+ * By default, the example will run a separate pipeline to inject the data from the default
+ * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
+ * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
+ * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
+ * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
+ * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
+ * is provided in
+ * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
+ *
+ * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
+ * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+
+public class TrafficRoutes {
+
+  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+  private static final Integer VALID_INPUTS = 4999;
+
+  // Instantiate some small predefined San Diego routes to analyze
+  static Map<String, String> sdStations = buildStationInfo();
+  static final int WINDOW_DURATION = 3;  // Default sliding window duration in minutes
+  static final int WINDOW_SLIDE_EVERY = 1;  // Default window 'slide every' setting in minutes
+
+  /**
+   * This class holds information about a station reading's average speed.
+   */
+  @DefaultCoder(AvroCoder.class)
+  static class StationSpeed implements Comparable<StationSpeed> {
+    @Nullable String stationId;
+    @Nullable Double avgSpeed;
+    @Nullable Long timestamp;
+
+    public StationSpeed() {}
+
+    public StationSpeed(String stationId, Double avgSpeed, Long timestamp) {
+      this.stationId = stationId;
+      this.avgSpeed = avgSpeed;
+      this.timestamp = timestamp;
+    }
+
+    public String getStationId() {
+      return this.stationId;
+    }
+    public Double getAvgSpeed() {
+      return this.avgSpeed;
+    }
+
+    @Override
+    public int compareTo(StationSpeed other) {
+      return Long.compare(this.timestamp, other.timestamp);
+    }
+  }
+
+  /**
+   * This class holds information about a route's speed/slowdown.
+   */
+  @DefaultCoder(AvroCoder.class)
+  static class RouteInfo {
+    @Nullable String route;
+    @Nullable Double avgSpeed;
+    @Nullable Boolean slowdownEvent;
+
+
+    public RouteInfo() {}
+
+    public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) {
+      this.route = route;
+      this.avgSpeed = avgSpeed;
+      this.slowdownEvent = slowdownEvent;
+    }
+
+    public String getRoute() {
+      return this.route;
+    }
+    public Double getAvgSpeed() {
+      return this.avgSpeed;
+    }
+    public Boolean getSlowdownEvent() {
+      return this.slowdownEvent;
+    }
+  }
+
+  /**
+   * Extract the timestamp field from the input string, and use it as the element timestamp.
+   */
+  static class ExtractTimestamps extends DoFn<String, String> {
+    private static final DateTimeFormatter dateTimeFormat =
+        DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
+
+    @Override
+    public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+      String[] items = c.element().split(",");
+      String timestamp = tryParseTimestamp(items);
+      if (timestamp != null) {
+        try {
+          c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
+        } catch (IllegalArgumentException e) {
+          // Skip the invalid input.
+        }
+      }
+    }
+  }
+
+  /**
+   * Filter out readings for the stations along predefined 'routes', and output
+   * (station, speed info) keyed on route.
+   */
+  static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> {
+
+    @Override
+    public void processElement(ProcessContext c) {
+      String[] items = c.element().split(",");
+      String stationType = tryParseStationType(items);
+      // For this analysis, use only 'main line' station types
+      if (stationType != null && stationType.equals("ML")) {
+        Double avgSpeed = tryParseAvgSpeed(items);
+        String stationId = tryParseStationId(items);
+        // For this simple example, filter out everything but some hardwired routes.
+        if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
+          StationSpeed stationSpeed =
+              new StationSpeed(stationId, avgSpeed, c.timestamp().getMillis());
+          // The tuple key is the 'route' name stored in the 'sdStations' hash.
+          KV<String, StationSpeed> outputValue = KV.of(sdStations.get(stationId), stationSpeed);
+          c.output(outputValue);
+        }
+      }
+    }
+  }
+
+  /**
+   * For a given route, track average speed for the window. Calculate whether
+   * traffic is currently slowing down, via a predefined threshold. If a supermajority of
+   * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
+   * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
+   */
+  static class GatherStats
+      extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
+    @Override
+    public void processElement(ProcessContext c) throws IOException {
+      String route = c.element().getKey();
+      double speedSum = 0.0;
+      int speedCount = 0;
+      int speedups = 0;
+      int slowdowns = 0;
+      List<StationSpeed> infoList = Lists.newArrayList(c.element().getValue());
+      // StationSpeeds sort by embedded timestamp.
+      Collections.sort(infoList);
+      Map<String, Double> prevSpeeds = new HashMap<>();
+      // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
+      for (StationSpeed item : infoList) {
+        Double speed = item.getAvgSpeed();
+        if (speed != null) {
+          speedSum += speed;
+          speedCount++;
+          Double lastSpeed = prevSpeeds.get(item.getStationId());
+          if (lastSpeed != null) {
+            if (lastSpeed < speed) {
+              speedups += 1;
+            } else {
+              slowdowns += 1;
+            }
+          }
+          prevSpeeds.put(item.getStationId(), speed);
+        }
+      }
+      if (speedCount == 0) {
+        // No average to compute.
+        return;
+      }
+      double speedAvg = speedSum / speedCount;
+      boolean slowdownEvent = slowdowns >= 2 * speedups;
+      RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
+      c.output(KV.of(route, routeInfo));
+    }
+  }
+
+  /**
+   * Format the results of the slowdown calculations to a TableRow, to save to BigQuery.
+   */
+  static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> {
+    @Override
+    public void processElement(ProcessContext c) {
+      RouteInfo routeInfo = c.element().getValue();
+      TableRow row = new TableRow()
+          .set("avg_speed", routeInfo.getAvgSpeed())
+          .set("slowdown_event", routeInfo.getSlowdownEvent())
+          .set("route", c.element().getKey())
+          .set("window_timestamp", c.timestamp().toString());
+      c.output(row);
+    }
+
+    /**
+     * Defines the BigQuery schema used for the output.
+     */
+    static TableSchema getSchema() {
+      List<TableFieldSchema> fields = new ArrayList<>();
+      fields.add(new TableFieldSchema().setName("route").setType("STRING"));
+      fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
+      fields.add(new TableFieldSchema().setName("slowdown_event").setType("BOOLEAN"));
+      fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
+      TableSchema schema = new TableSchema().setFields(fields);
+      return schema;
+    }
+  }
+
+  /**
+   * This PTransform extracts speed info from traffic station readings.
+   * It groups the readings by 'route' and analyzes traffic slowdown for that route.
+   * Lastly, it formats the results for BigQuery.
+   */
+  static class TrackSpeed extends
+      PTransform<PCollection<KV<String, StationSpeed>>, PCollection<TableRow>> {
+    @Override
+    public PCollection<TableRow> apply(PCollection<KV<String, StationSpeed>> stationSpeed) {
+      // Apply a GroupByKey transform to collect a list of all station
+      // readings for a given route.
+      PCollection<KV<String, Iterable<StationSpeed>>> timeGroup = stationSpeed.apply(
+        GroupByKey.<String, StationSpeed>create());
+
+      // Analyze 'slowdown' over the route readings.
+      PCollection<KV<String, RouteInfo>> stats = timeGroup.apply(ParDo.of(new GatherStats()));
+
+      // Format the results for writing to BigQuery
+      PCollection<TableRow> results = stats.apply(
+          ParDo.of(new FormatStatsFn()));
+
+      return results;
+    }
+  }
+
+  static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
+    private final String inputFile;
+
+    public ReadFileAndExtractTimestamps(String inputFile) {
+      this.inputFile = inputFile;
+    }
+
+    @Override
+    public PCollection<String> apply(PBegin begin) {
+      return begin
+          .apply(TextIO.Read.from(inputFile))
+          .apply(ParDo.of(new ExtractTimestamps()));
+    }
+  }
+
+  /**
+  * Options supported by {@link TrafficRoutes}.
+  *
+  * <p>Inherits standard configuration options.
+  */
+  private interface TrafficRoutesOptions extends DataflowExampleOptions,
+      ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
+    @Description("Input file to inject to Pub/Sub topic")
+    @Default.String("gs://dataflow-samples/traffic_sensor/"
+        + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
+    String getInputFile();
+    void setInputFile(String value);
+
+    @Description("Numeric value of sliding window duration, in minutes")
+    @Default.Integer(WINDOW_DURATION)
+    Integer getWindowDuration();
+    void setWindowDuration(Integer value);
+
+    @Description("Numeric value of window 'slide every' setting, in minutes")
+    @Default.Integer(WINDOW_SLIDE_EVERY)
+    Integer getWindowSlideEvery();
+    void setWindowSlideEvery(Integer value);
+
+    @Description("Whether to run the pipeline with unbounded input")
+    @Default.Boolean(false)
+    boolean isUnbounded();
+    void setUnbounded(boolean value);
+  }
+
+  /**
+   * Sets up and starts streaming pipeline.
+   *
+   * @throws IOException if there is a problem setting up resources
+   */
+  public static void main(String[] args) throws IOException {
+    TrafficRoutesOptions options = PipelineOptionsFactory.fromArgs(args)
+        .withValidation()
+        .as(TrafficRoutesOptions.class);
+
+    options.setBigQuerySchema(FormatStatsFn.getSchema());
+    // Using DataflowExampleUtils to set up required resources.
+    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
+
+    Pipeline pipeline = Pipeline.create(options);
+    TableReference tableRef = new TableReference();
+    tableRef.setProjectId(options.getProject());
+    tableRef.setDatasetId(options.getBigQueryDataset());
+    tableRef.setTableId(options.getBigQueryTable());
+
+    PCollection<String> input;
+    if (options.isUnbounded()) {
+      // Read unbounded PubSubIO.
+      input = pipeline.apply(PubsubIO.Read
+          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+          .subscription(options.getPubsubSubscription()));
+    } else {
+      // Read bounded PubSubIO.
+      input = pipeline.apply(PubsubIO.Read
+          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+          .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
+
+      // To read bounded TextIO files, use:
+      // input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
+      //    .apply(ParDo.of(new ExtractTimestamps()));
+    }
+    input
+        // row... => <station route, station speed> ...
+        .apply(ParDo.of(new ExtractStationSpeedFn()))
+        // map the incoming data stream into sliding windows.
+        // The default window duration values work well if you're running the accompanying Pub/Sub
+        // generator script without the --replay flag, so that there are no simulated pauses in
+        // the sensor data publication. You may want to adjust the values otherwise.
+        .apply(Window.<KV<String, StationSpeed>>into(SlidingWindows.of(
+            Duration.standardMinutes(options.getWindowDuration())).
+            every(Duration.standardMinutes(options.getWindowSlideEvery()))))
+        .apply(new TrackSpeed())
+        .apply(BigQueryIO.Write.to(tableRef)
+            .withSchema(FormatStatsFn.getSchema()));
+
+    // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
+    if (!Strings.isNullOrEmpty(options.getInputFile())
+        && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
+      dataflowUtils.runInjectorPipeline(
+          new ReadFileAndExtractTimestamps(options.getInputFile()),
+          options.getPubsubTopic(),
+          PUBSUB_TIMESTAMP_LABEL_KEY);
+    }
+
+    // Run the pipeline.
+    PipelineResult result = pipeline.run();
+
+    // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
+    dataflowUtils.waitToFinish(result);
+  }
+
+  private static Double tryParseAvgSpeed(String[] inputItems) {
+    try {
+      return Double.parseDouble(tryParseString(inputItems, 9));
+    } catch (NumberFormatException e) {
+      return null;
+    } catch (NullPointerException e) {
+      return null;
+    }
+  }
+
+  private static String tryParseStationType(String[] inputItems) {
+    return tryParseString(inputItems, 4);
+  }
+
+  private static String tryParseStationId(String[] inputItems) {
+    return tryParseString(inputItems, 1);
+  }
+
+  private static String tryParseTimestamp(String[] inputItems) {
+    return tryParseString(inputItems, 0);
+  }
+
+  private static String tryParseString(String[] inputItems, int index) {
+    return inputItems.length >= index ? inputItems[index] : null;
+  }
+
+  /**
+   * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
+   */
+  private static Map<String, String> buildStationInfo() {
+    Map<String, String> stations = new Hashtable<String, String>();
+      stations.put("1108413", "SDRoute1"); // from freeway 805 S
+      stations.put("1108699", "SDRoute2"); // from freeway 78 E
+      stations.put("1108702", "SDRoute2");
+    return stations;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java
new file mode 100644
index 0000000..503bcad
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An example that reads the public samples of weather data from BigQuery, counts the number of
+ * tornadoes that occur in each month, and writes the results to BigQuery.
+ *
+ * <p>Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms
+ *
+ * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and the BigQuery table for the output, with the form
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations}
+ * and can be overridden with {@code --input}.
+ */
+public class BigQueryTornadoes {
+  // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+  private static final String WEATHER_SAMPLES_TABLE =
+      "clouddataflow-readonly:samples.weather_stations";
+
+  /**
+   * Examines each row in the input table. If a tornado was recorded
+   * in that sample, the month in which it occurred is output.
+   */
+  static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
+    @Override
+    public void processElement(ProcessContext c){
+      TableRow row = c.element();
+      if ((Boolean) row.get("tornado")) {
+        c.output(Integer.parseInt((String) row.get("month")));
+      }
+    }
+  }
+
+  /**
+   * Prepares the data for writing to BigQuery by building a TableRow object containing an
+   * integer representation of month and the number of tornadoes that occurred in each month.
+   */
+  static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = new TableRow()
+          .set("month", c.element().getKey())
+          .set("tornado_count", c.element().getValue());
+      c.output(row);
+    }
+  }
+
+  /**
+   * Takes rows from a table and generates a table of counts.
+   *
+   * <p>The input schema is described by
+   * https://developers.google.com/bigquery/docs/dataset-gsod .
+   * The output contains the total number of tornadoes found in each month in
+   * the following schema:
+   * <ul>
+   *   <li>month: integer</li>
+   *   <li>tornado_count: integer</li>
+   * </ul>
+   */
+  static class CountTornadoes
+      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
+    @Override
+    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+
+      // row... => month...
+      PCollection<Integer> tornadoes = rows.apply(
+          ParDo.of(new ExtractTornadoesFn()));
+
+      // month... => <month,count>...
+      PCollection<KV<Integer, Long>> tornadoCounts =
+          tornadoes.apply(Count.<Integer>perElement());
+
+      // <month,count>... => row...
+      PCollection<TableRow> results = tornadoCounts.apply(
+          ParDo.of(new FormatCountsFn()));
+
+      return results;
+    }
+  }
+
+  /**
+   * Options supported by {@link BigQueryTornadoes}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description("Table to read from, specified as "
+        + "<project_id>:<dataset_id>.<table_id>")
+    @Default.String(WEATHER_SAMPLES_TABLE)
+    String getInput();
+    void setInput(String value);
+
+    @Description("BigQuery table to write to, specified as "
+        + "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) {
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+    Pipeline p = Pipeline.create(options);
+
+    // Build the table schema for the output table.
+    List<TableFieldSchema> fields = new ArrayList<>();
+    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
+    fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
+    TableSchema schema = new TableSchema().setFields(fields);
+
+    p.apply(BigQueryIO.Read.from(options.getInput()))
+     .apply(new CountTornadoes())
+     .apply(BigQueryIO.Write
+        .to(options.getOutput())
+        .withSchema(schema)
+        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java
new file mode 100644
index 0000000..9540dd4
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An example that reads the public 'Shakespeare' data, and for each word in
+ * the dataset that is over a given length, generates a string containing the
+ * list of play names in which that word appears, and saves this information
+ * to a bigquery table.
+ *
+ * <p>Concepts: the Combine.perKey transform, which lets you combine the values in a
+ * key-grouped Collection, and how to use an Aggregator to track information in the
+ * Monitoring UI.
+ *
+ * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://<STAGING DIRECTORY>
+ *   --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>The BigQuery input table defaults to {@code publicdata:samples.shakespeare} and can
+ * be overridden with {@code --input}.
+ */
+public class CombinePerKeyExamples {
+  // Use the shakespeare public BigQuery sample
+  private static final String SHAKESPEARE_TABLE =
+      "publicdata:samples.shakespeare";
+  // We'll track words >= this word length across all plays in the table.
+  private static final int MIN_WORD_LENGTH = 9;
+
+  /**
+   * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
+   * outputs word, play_name.
+   */
+  static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
+    private final Aggregator<Long, Long> smallerWords =
+        createAggregator("smallerWords", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c){
+      TableRow row = c.element();
+      String playName = (String) row.get("corpus");
+      String word = (String) row.get("word");
+      if (word.length() >= MIN_WORD_LENGTH) {
+        c.output(KV.of(word, playName));
+      } else {
+        // Track how many smaller words we're not including. This information will be
+        // visible in the Monitoring UI.
+        smallerWords.addValue(1L);
+      }
+    }
+  }
+
+
+  /**
+   * Prepares the data for writing to BigQuery by building a TableRow object
+   * containing a word with a string listing the plays in which it appeared.
+   */
+  static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> {
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = new TableRow()
+          .set("word", c.element().getKey())
+          .set("all_plays", c.element().getValue());
+      c.output(row);
+    }
+  }
+
+  /**
+   * Reads the public 'Shakespeare' data, and for each word in the dataset
+   * over a given length, generates a string containing the list of play names
+   * in which that word appears. It does this via the Combine.perKey
+   * transform, with the ConcatWords combine function.
+   *
+   * <p>Combine.perKey is similar to a GroupByKey followed by a ParDo, but
+   * has more restricted semantics that allow it to be executed more
+   * efficiently. These records are then formatted as BQ table rows.
+   */
+  static class PlaysForWord
+      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
+    @Override
+    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+
+      // row... => <word, play_name> ...
+      PCollection<KV<String, String>> words = rows.apply(
+          ParDo.of(new ExtractLargeWordsFn()));
+
+      // word, play_name => word, all_plays ...
+      PCollection<KV<String, String>> wordAllPlays =
+          words.apply(Combine.<String, String>perKey(
+              new ConcatWords()));
+
+      // <word, all_plays>... => row...
+      PCollection<TableRow> results = wordAllPlays.apply(
+          ParDo.of(new FormatShakespeareOutputFn()));
+
+      return results;
+    }
+  }
+
+  /**
+   * A 'combine function' used with the Combine.perKey transform. Builds a
+   * comma-separated string of all input items.  So, it will build a string
+   * containing all the different Shakespeare plays in which the given input
+   * word has appeared.
+   */
+  public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
+    @Override
+    public String apply(Iterable<String> input) {
+      StringBuilder all = new StringBuilder();
+      for (String item : input) {
+        if (!item.isEmpty()) {
+          if (all.length() == 0) {
+            all.append(item);
+          } else {
+            all.append(",");
+            all.append(item);
+          }
+        }
+      }
+      return all.toString();
+    }
+  }
+
+  /**
+   * Options supported by {@link CombinePerKeyExamples}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description("Table to read from, specified as "
+        + "<project_id>:<dataset_id>.<table_id>")
+    @Default.String(SHAKESPEARE_TABLE)
+    String getInput();
+    void setInput(String value);
+
+    @Description("Table to write to, specified as "
+        + "<project_id>:<dataset_id>.<table_id>. "
+        + "The dataset_id must already exist")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+
+    // Build the table schema for the output table.
+    List<TableFieldSchema> fields = new ArrayList<>();
+    fields.add(new TableFieldSchema().setName("word").setType("STRING"));
+    fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
+    TableSchema schema = new TableSchema().setFields(fields);
+
+    p.apply(BigQueryIO.Read.from(options.getInput()))
+     .apply(new PlaysForWord())
+     .apply(BigQueryIO.Write
+        .to(options.getOutput())
+        .withSchema(schema)
+        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java
new file mode 100644
index 0000000..eaf1e20
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
+import static com.google.api.services.datastore.client.DatastoreHelper.getString;
+import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
+import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
+import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
+
+import com.google.api.services.datastore.DatastoreV1.Entity;
+import com.google.api.services.datastore.DatastoreV1.Key;
+import com.google.api.services.datastore.DatastoreV1.Property;
+import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
+import com.google.api.services.datastore.DatastoreV1.Query;
+import com.google.api.services.datastore.DatastoreV1.Value;
+import com.google.cloud.dataflow.examples.WordCount;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.DatastoreIO;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+
+import java.util.Map;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+/**
+ * A WordCount example using DatastoreIO.
+ *
+ * <p>This example shows how to use DatastoreIO to read from Datastore and
+ * write the results to Cloud Storage.  Note that this example will write
+ * data to Datastore, which may incur charge for Datastore operations.
+ *
+ * <p>To run this example, users need to use gcloud to get credential for Datastore:
+ * <pre>{@code
+ * $ gcloud auth login
+ * }</pre>
+ *
+ * <p>To run this pipeline locally, the following options must be provided:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --dataset=YOUR_DATASET_ID
+ *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PATH]
+ * }</pre>
+ *
+ * <p>To run this example using Dataflow service, you must additionally
+ * provide either {@literal --stagingLocation} or {@literal --tempLocation}, and
+ * select one of the Dataflow pipeline runners, eg
+ * {@literal --runner=BlockingDataflowPipelineRunner}.
+ *
+ * <p><b>Note:</b> this example creates entities with <i>Ancestor keys</i> to ensure that all
+ * entities created are in the same entity group. Similarly, the query used to read from the Cloud
+ * Datastore uses an <i>Ancestor filter</i>. Ancestors are used to ensure strongly consistent
+ * results in Cloud Datastore. For more information, see the Cloud Datastore documentation on
+ * <a href="https://cloud.google.com/datastore/docs/concepts/structuring_for_strong_consistency">
+ * Structing Data for Strong Consistency</a>.
+ */
+public class DatastoreWordCount {
+
+  /**
+   * A DoFn that gets the content of an entity (one line in a
+   * Shakespeare play) and converts it to a string.
+   */
+  static class GetContentFn extends DoFn<Entity, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      Map<String, Value> props = getPropertyMap(c.element());
+      Value value = props.get("content");
+      if (value != null) {
+        c.output(getString(value));
+      }
+    }
+  }
+
+  /**
+   * A helper function to create the ancestor key for all created and queried entities.
+   *
+   * <p>We use ancestor keys and ancestor queries for strong consistency. See
+   * {@link DatastoreWordCount} javadoc for more information.
+   */
+  static Key makeAncestorKey(@Nullable String namespace, String kind) {
+    Key.Builder keyBuilder = makeKey(kind, "root");
+    if (namespace != null) {
+      keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+    }
+    return keyBuilder.build();
+  }
+
+  /**
+   * A DoFn that creates entity for every line in Shakespeare.
+   */
+  static class CreateEntityFn extends DoFn<String, Entity> {
+    private final String namespace;
+    private final String kind;
+    private final Key ancestorKey;
+
+    CreateEntityFn(String namespace, String kind) {
+      this.namespace = namespace;
+      this.kind = kind;
+
+      // Build the ancestor key for all created entities once, including the namespace.
+      ancestorKey = makeAncestorKey(namespace, kind);
+    }
+
+    public Entity makeEntity(String content) {
+      Entity.Builder entityBuilder = Entity.newBuilder();
+
+      // All created entities have the same ancestor Key.
+      Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
+      // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
+      // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
+      // we can simplify this code.
+      if (namespace != null) {
+        keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+      }
+
+      entityBuilder.setKey(keyBuilder.build());
+      entityBuilder.addProperty(Property.newBuilder().setName("content")
+          .setValue(Value.newBuilder().setStringValue(content)));
+      return entityBuilder.build();
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(makeEntity(c.element()));
+    }
+  }
+
+  /**
+   * Options supported by {@link DatastoreWordCount}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  public static interface Options extends PipelineOptions {
+    @Description("Path of the file to read from and store to Datastore")
+    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+    String getInput();
+    void setInput(String value);
+
+    @Description("Path of the file to write to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+
+    @Description("Dataset ID to read from datastore")
+    @Validation.Required
+    String getDataset();
+    void setDataset(String value);
+
+    @Description("Dataset entity kind")
+    @Default.String("shakespeare-demo")
+    String getKind();
+    void setKind(String value);
+
+    @Description("Dataset namespace")
+    String getNamespace();
+    void setNamespace(@Nullable String value);
+
+    @Description("Read an existing dataset, do not write first")
+    boolean isReadOnly();
+    void setReadOnly(boolean value);
+
+    @Description("Number of output shards")
+    @Default.Integer(0) // If the system should choose automatically.
+    int getNumShards();
+    void setNumShards(int value);
+  }
+
+  /**
+   * An example that creates a pipeline to populate DatastoreIO from a
+   * text input.  Forces use of DirectPipelineRunner for local execution mode.
+   */
+  public static void writeDataToDatastore(Options options) {
+      Pipeline p = Pipeline.create(options);
+      p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+       .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
+       .apply(DatastoreIO.writeTo(options.getDataset()));
+
+      p.run();
+  }
+
+  /**
+   * Build a Cloud Datastore ancestor query for the specified {@link Options#getNamespace} and
+   * {@link Options#getKind}.
+   *
+   * <p>We use ancestor keys and ancestor queries for strong consistency. See
+   * {@link DatastoreWordCount} javadoc for more information.
+   *
+   * @see <a href="https://cloud.google.com/datastore/docs/concepts/queries#Datastore_Ancestor_filters">Ancestor filters</a>
+   */
+  static Query makeAncestorKindQuery(Options options) {
+    Query.Builder q = Query.newBuilder();
+    q.addKindBuilder().setName(options.getKind());
+    q.setFilter(makeFilter(
+        "__key__",
+        PropertyFilter.Operator.HAS_ANCESTOR,
+        makeValue(makeAncestorKey(options.getNamespace(), options.getKind()))));
+    return q.build();
+  }
+
+  /**
+   * An example that creates a pipeline to do DatastoreIO.Read from Datastore.
+   */
+  public static void readDataFromDatastore(Options options) {
+    Query query = makeAncestorKindQuery(options);
+
+    // For Datastore sources, the read namespace can be set on the entire query.
+    DatastoreIO.Source source = DatastoreIO.source()
+        .withDataset(options.getDataset())
+        .withQuery(query)
+        .withNamespace(options.getNamespace());
+
+    Pipeline p = Pipeline.create(options);
+    p.apply("ReadShakespeareFromDatastore", Read.from(source))
+        .apply("StringifyEntity", ParDo.of(new GetContentFn()))
+        .apply("CountWords", new WordCount.CountWords())
+        .apply("PrintWordCount", MapElements.via(new WordCount.FormatAsTextFn()))
+        .apply("WriteLines", TextIO.Write.to(options.getOutput())
+            .withNumShards(options.getNumShards()));
+    p.run();
+  }
+
+  /**
+   * An example to demo how to use {@link DatastoreIO}.  The runner here is
+   * customizable, which means users could pass either {@code DirectPipelineRunner}
+   * or {@code DataflowPipelineRunner} in the pipeline options.
+   */
+  public static void main(String args[]) {
+    // The options are used in two places, for Dataflow service, and
+    // building DatastoreIO.Read object
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+    if (!options.isReadOnly()) {
+      // First example: write data to Datastore for reading later.
+      //
+      // NOTE: this write does not delete any existing Entities in the Datastore, so if run
+      // multiple times with the same output dataset, there may be duplicate entries. The
+      // Datastore Query tool in the Google Developers Console can be used to inspect or erase all
+      // entries with a particular namespace and/or kind.
+      DatastoreWordCount.writeDataToDatastore(options);
+    }
+
+    // Second example: do parallel read from Datastore.
+    DatastoreWordCount.readDataFromDatastore(options);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java
new file mode 100644
index 0000000..9873561
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+
+/**
+ * This example uses as input Shakespeare's plays as plaintext files, and will remove any
+ * duplicate lines across all the files. (The output does not preserve any input order).
+ *
+ * <p>Concepts: the RemoveDuplicates transform, and how to wire transforms together.
+ * Demonstrates {@link com.google.cloud.dataflow.sdk.io.TextIO.Read}/
+ * {@link RemoveDuplicates}/{@link com.google.cloud.dataflow.sdk.io.TextIO.Write}.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ *   --project=YOUR_PROJECT_ID
+ * and a local output file or output prefix on GCS:
+ *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ * and an output prefix on GCS:
+ *   --output=gs://YOUR_OUTPUT_PREFIX
+ *
+ * <p>The input defaults to {@code gs://dataflow-samples/shakespeare/*} and can be
+ * overridden with {@code --input}.
+ */
+public class DeDupExample {
+
+  /**
+   * Options supported by {@link DeDupExample}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description("Path to the directory or GCS prefix containing files to read from")
+    @Default.String("gs://dataflow-samples/shakespeare/*")
+    String getInput();
+    void setInput(String value);
+
+    @Description("Path of the file to write to")
+    @Default.InstanceFactory(OutputFactory.class)
+    String getOutput();
+    void setOutput(String value);
+
+    /** Returns gs://${STAGING_LOCATION}/"deduped.txt". */
+    public static class OutputFactory implements DefaultValueFactory<String> {
+      @Override
+      public String create(PipelineOptions options) {
+        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+        if (dataflowOptions.getStagingLocation() != null) {
+          return GcsPath.fromUri(dataflowOptions.getStagingLocation())
+              .resolve("deduped.txt").toString();
+        } else {
+          throw new IllegalArgumentException("Must specify --output or --stagingLocation");
+        }
+      }
+    }
+  }
+
+
+  public static void main(String[] args)
+      throws Exception {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+     .apply(RemoveDuplicates.<String>create())
+     .apply(TextIO.Write.named("DedupedShakespeare")
+         .to(options.getOutput()));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2eaa709c/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java
new file mode 100644
index 0000000..781873a
--- /dev/null
+++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Mean;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * This is an example that demonstrates several approaches to filtering, and use of the Mean
+ * transform. It shows how to dynamically set parameters by defining and using new pipeline options,
+ * and how to use a value derived by the pipeline.
+ *
+ * <p>Concepts: The Mean transform; Options configuration; using pipeline-derived data as a side
+ * input; approaches to filtering, selection, and projection.
+ *
+ * <p>The example reads public samples of weather data from BigQuery. It performs a
+ * projection on the data, finds the global mean of the temperature readings, filters on readings
+ * for a single given month, and then outputs only data (for that month) that has a mean temp
+ * smaller than the derived global mean.
+*
+ * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
+ * table.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ *   [--monthFilter=<month_number>]
+ * }
+ * </pre>
+ * where optional parameter {@code --monthFilter} is set to a number 1-12.
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ *   [--monthFilter=<month_number>]
+ * }
+ * </pre>
+ * where optional parameter {@code --monthFilter} is set to a number 1-12.
+ *
+ * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations}
+ * and can be overridden with {@code --input}.
+ */
+public class FilterExamples {
+  // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
+  private static final String WEATHER_SAMPLES_TABLE =
+      "clouddataflow-readonly:samples.weather_stations";
+  static final Logger LOG = Logger.getLogger(FilterExamples.class.getName());
+  static final int MONTH_TO_FILTER = 7;
+
+  /**
+   * Examines each row in the input table. Outputs only the subset of the cells this example
+   * is interested in-- the mean_temp and year, month, and day-- as a bigquery table row.
+   */
+  static class ProjectionFn extends DoFn<TableRow, TableRow> {
+    @Override
+    public void processElement(ProcessContext c){
+      TableRow row = c.element();
+      // Grab year, month, day, mean_temp from the row
+      Integer year = Integer.parseInt((String) row.get("year"));
+      Integer month = Integer.parseInt((String) row.get("month"));
+      Integer day = Integer.parseInt((String) row.get("day"));
+      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
+      // Prepares the data for writing to BigQuery by building a TableRow object
+      TableRow outRow = new TableRow()
+          .set("year", year).set("month", month)
+          .set("day", day).set("mean_temp", meanTemp);
+      c.output(outRow);
+    }
+  }
+
+  /**
+   * Implements 'filter' functionality.
+   *
+   * <p>Examines each row in the input table. Outputs only rows from the month
+   * monthFilter, which is passed in as a parameter during construction of this DoFn.
+   */
+  static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> {
+    Integer monthFilter;
+
+    public FilterSingleMonthDataFn(Integer monthFilter) {
+      this.monthFilter = monthFilter;
+    }
+
+    @Override
+    public void processElement(ProcessContext c){
+      TableRow row = c.element();
+      Integer month;
+      month = (Integer) row.get("month");
+      if (month.equals(this.monthFilter)) {
+        c.output(row);
+      }
+    }
+  }
+
+  /**
+   * Examines each row (weather reading) in the input table. Output the temperature
+   * reading for that row ('mean_temp').
+   */
+  static class ExtractTempFn extends DoFn<TableRow, Double> {
+    @Override
+    public void processElement(ProcessContext c){
+      TableRow row = c.element();
+      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
+      c.output(meanTemp);
+    }
+  }
+
+
+
+  /*
+   * Finds the global mean of the mean_temp for each day/record, and outputs
+   * only data that has a mean temp larger than this global mean.
+   **/
+  static class BelowGlobalMean
+      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
+    Integer monthFilter;
+
+    public BelowGlobalMean(Integer monthFilter) {
+      this.monthFilter = monthFilter;
+    }
+
+
+    @Override
+    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+
+      // Extract the mean_temp from each row.
+      PCollection<Double> meanTemps = rows.apply(
+          ParDo.of(new ExtractTempFn()));
+
+      // Find the global mean, of all the mean_temp readings in the weather data,
+      // and prepare this singleton PCollectionView for use as a side input.
+      final PCollectionView<Double> globalMeanTemp =
+          meanTemps.apply(Mean.<Double>globally())
+               .apply(View.<Double>asSingleton());
+
+      // Rows filtered to remove all but a single month
+      PCollection<TableRow> monthFilteredRows = rows
+          .apply(ParDo.of(new FilterSingleMonthDataFn(monthFilter)));
+
+      // Then, use the global mean as a side input, to further filter the weather data.
+      // By using a side input to pass in the filtering criteria, we can use a value
+      // that is computed earlier in pipeline execution.
+      // We'll only output readings with temperatures below this mean.
+      PCollection<TableRow> filteredRows = monthFilteredRows
+          .apply(ParDo
+              .named("ParseAndFilter")
+              .withSideInputs(globalMeanTemp)
+              .of(new DoFn<TableRow, TableRow>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString());
+                  Double gTemp = c.sideInput(globalMeanTemp);
+                  if (meanTemp < gTemp) {
+                    c.output(c.element());
+                  }
+                }
+              }));
+
+      return filteredRows;
+    }
+  }
+
+
+  /**
+   * Options supported by {@link FilterExamples}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description("Table to read from, specified as "
+        + "<project_id>:<dataset_id>.<table_id>")
+    @Default.String(WEATHER_SAMPLES_TABLE)
+    String getInput();
+    void setInput(String value);
+
+    @Description("Table to write to, specified as "
+        + "<project_id>:<dataset_id>.<table_id>. "
+        + "The dataset_id must already exist")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+
+    @Description("Numeric value of month to filter on")
+    @Default.Integer(MONTH_TO_FILTER)
+    Integer getMonthFilter();
+    void setMonthFilter(Integer value);
+  }
+
+  /**
+   * Helper method to build the table schema for the output table.
+   */
+  private static TableSchema buildWeatherSchemaProjection() {
+    List<TableFieldSchema> fields = new ArrayList<>();
+    fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
+    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
+    fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
+    fields.add(new TableFieldSchema().setName("mean_temp").setType("FLOAT"));
+    TableSchema schema = new TableSchema().setFields(fields);
+    return schema;
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+
+    TableSchema schema = buildWeatherSchemaProjection();
+
+    p.apply(BigQueryIO.Read.from(options.getInput()))
+     .apply(ParDo.of(new ProjectionFn()))
+     .apply(new BelowGlobalMean(options.getMonthFilter()))
+     .apply(BigQueryIO.Write
+        .to(options.getOutput())
+        .withSchema(schema)
+        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+
+    p.run();
+  }
+}