You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:48:34 UTC
[47/74] [partial] incubator-beam git commit: Rename
com/google/cloud/dataflow->org/apache/beam
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
deleted file mode 100644
index f1d8d1a..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessions.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableComparator;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.List;
-
-/**
- * An example that reads Wikipedia edit data from Cloud Storage and computes the user with
- * the longest string of edits separated by no more than an hour within each month.
- *
- * <p>Concepts: Using Windowing to perform time-based aggregations of data.
- *
- * <p>It is not recommended to execute this pipeline locally, given the size of the default input
- * data.
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and an output prefix on GCS:
- * <pre>{@code
- * --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- *
- * <p>The default input is {@code gs://dataflow-samples/wikipedia_edits/*.json} and can be
- * overridden with {@code --input}.
- *
- * <p>The input for this example is large enough that it's a good place to enable (experimental)
- * autoscaling:
- * <pre>{@code
- * --autoscalingAlgorithm=BASIC
- * --maxNumWorkers=20
- * }
- * </pre>
- * This will automatically scale the number of workers up over time until the job completes.
- */
-public class TopWikipediaSessions {
- private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json";
-
- /**
- * Extracts user and timestamp from a TableRow representing a Wikipedia edit.
- */
- static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
- @Override
- public void processElement(ProcessContext c) {
- TableRow row = c.element();
- int timestamp = (Integer) row.get("timestamp");
- String userName = (String) row.get("contributor_username");
- if (userName != null) {
- // Sets the implicit timestamp field to be used in windowing.
- c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
- }
- }
- }
-
- /**
- * Computes the number of edits in each user session. A session is defined as
- * a string of edits where each is separated from the next by less than an hour.
- */
- static class ComputeSessions
- extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
- @Override
- public PCollection<KV<String, Long>> apply(PCollection<String> actions) {
- return actions
- .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardHours(1))))
-
- .apply(Count.<String>perElement());
- }
- }
-
- /**
- * Computes the longest session ending in each month.
- */
- private static class TopPerMonth
- extends PTransform<PCollection<KV<String, Long>>, PCollection<List<KV<String, Long>>>> {
- @Override
- public PCollection<List<KV<String, Long>>> apply(PCollection<KV<String, Long>> sessions) {
- return sessions
- .apply(Window.<KV<String, Long>>into(CalendarWindows.months(1)))
-
- .apply(Top.of(1, new SerializableComparator<KV<String, Long>>() {
- @Override
- public int compare(KV<String, Long> o1, KV<String, Long> o2) {
- return Long.compare(o1.getValue(), o2.getValue());
- }
- }).withoutDefaults());
- }
- }
-
- static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>>
- implements RequiresWindowAccess {
-
- @Override
- public void processElement(ProcessContext c) {
- c.output(KV.of(
- c.element().getKey() + " : " + c.window(), c.element().getValue()));
- }
- }
-
- static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String>
- implements RequiresWindowAccess {
- @Override
- public void processElement(ProcessContext c) {
- for (KV<String, Long> item : c.element()) {
- String session = item.getKey();
- long count = item.getValue();
- c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start());
- }
- }
- }
-
- static class ComputeTopSessions extends PTransform<PCollection<TableRow>, PCollection<String>> {
-
- private final double samplingThreshold;
-
- public ComputeTopSessions(double samplingThreshold) {
- this.samplingThreshold = samplingThreshold;
- }
-
- @Override
- public PCollection<String> apply(PCollection<TableRow> input) {
- return input
- .apply(ParDo.of(new ExtractUserAndTimestamp()))
-
- .apply(ParDo.named("SampleUsers").of(
- new DoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) {
- if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
- c.output(c.element());
- }
- }
- }))
-
- .apply(new ComputeSessions())
-
- .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn()))
- .apply(new TopPerMonth())
- .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn()));
- }
- }
-
- /**
- * Options supported by this class.
- *
- * <p>Inherits standard Dataflow configuration options.
- */
- private static interface Options extends PipelineOptions {
- @Description(
- "Input specified as a GCS path containing a BigQuery table exported as json")
- @Default.String(EXPORTED_WIKI_TABLE)
- String getInput();
- void setInput(String value);
-
- @Description("File to output results to")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- public static void main(String[] args) {
- Options options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(Options.class);
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-
- Pipeline p = Pipeline.create(dataflowOptions);
-
- double samplingThreshold = 0.1;
-
- p.apply(TextIO.Read
- .from(options.getInput())
- .withCoder(TableRowJsonCoder.of()))
- .apply(new ComputeTopSessions(samplingThreshold))
- .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
deleted file mode 100644
index 2c3c857..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficMaxLaneFlow.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Strings;
-
-import org.apache.avro.reflect.Nullable;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
- * You can configure the running mode by setting {@literal --streaming} to true or false.
- *
- * <p>Concepts: The batch and streaming runners, sliding windows, Google Cloud Pub/Sub
- * topic injection, use of the AvroCoder to encode a custom class, and custom Combine transforms.
- *
- * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
- * it finds the lane that had the highest flow recorded, for each sensor station. It writes
- * those max values along with auxiliary info to a BigQuery table.
- *
- * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
- *
- * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
- * By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
- * is provided in
- * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
- *
- * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
- */
-public class TrafficMaxLaneFlow {
-
- private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
- private static final Integer VALID_INPUTS = 4999;
-
- static final int WINDOW_DURATION = 60; // Default sliding window duration in minutes
- static final int WINDOW_SLIDE_EVERY = 5; // Default window 'slide every' setting in minutes
-
- /**
- * This class holds information about each lane in a station reading, along with some general
- * information from the reading.
- */
- @DefaultCoder(AvroCoder.class)
- static class LaneInfo {
- @Nullable String stationId;
- @Nullable String lane;
- @Nullable String direction;
- @Nullable String freeway;
- @Nullable String recordedTimestamp;
- @Nullable Integer laneFlow;
- @Nullable Integer totalFlow;
- @Nullable Double laneAO;
- @Nullable Double laneAS;
-
- public LaneInfo() {}
-
- public LaneInfo(String stationId, String lane, String direction, String freeway,
- String timestamp, Integer laneFlow, Double laneAO,
- Double laneAS, Integer totalFlow) {
- this.stationId = stationId;
- this.lane = lane;
- this.direction = direction;
- this.freeway = freeway;
- this.recordedTimestamp = timestamp;
- this.laneFlow = laneFlow;
- this.laneAO = laneAO;
- this.laneAS = laneAS;
- this.totalFlow = totalFlow;
- }
-
- public String getStationId() {
- return this.stationId;
- }
- public String getLane() {
- return this.lane;
- }
- public String getDirection() {
- return this.direction;
- }
- public String getFreeway() {
- return this.freeway;
- }
- public String getRecordedTimestamp() {
- return this.recordedTimestamp;
- }
- public Integer getLaneFlow() {
- return this.laneFlow;
- }
- public Double getLaneAO() {
- return this.laneAO;
- }
- public Double getLaneAS() {
- return this.laneAS;
- }
- public Integer getTotalFlow() {
- return this.totalFlow;
- }
- }
-
- /**
- * Extract the timestamp field from the input string, and use it as the element timestamp.
- */
- static class ExtractTimestamps extends DoFn<String, String> {
- private static final DateTimeFormatter dateTimeFormat =
- DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
-
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
- String[] items = c.element().split(",");
- if (items.length > 0) {
- try {
- String timestamp = items[0];
- c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
- } catch (IllegalArgumentException e) {
- // Skip the invalid input.
- }
- }
- }
- }
-
- /**
- * Extract flow information for each of the 8 lanes in a reading, and output as separate tuples.
- * This will let us determine which lane has the max flow for that station over the span of the
- * window, and output not only the max flow from that calculation, but other associated
- * information. The number of lanes for which data is present depends upon which freeway the data
- * point comes from.
- */
- static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {
-
- @Override
- public void processElement(ProcessContext c) {
- String[] items = c.element().split(",");
- if (items.length < 48) {
- // Skip the invalid input.
- return;
- }
- // extract the sensor information for the lanes from the input string fields.
- String timestamp = items[0];
- String stationId = items[1];
- String freeway = items[2];
- String direction = items[3];
- Integer totalFlow = tryIntParse(items[7]);
- for (int i = 1; i <= 8; ++i) {
- Integer laneFlow = tryIntParse(items[6 + 5 * i]);
- Double laneAvgOccupancy = tryDoubleParse(items[7 + 5 * i]);
- Double laneAvgSpeed = tryDoubleParse(items[8 + 5 * i]);
- if (laneFlow == null || laneAvgOccupancy == null || laneAvgSpeed == null) {
- return;
- }
- LaneInfo laneInfo = new LaneInfo(stationId, "lane" + i, direction, freeway, timestamp,
- laneFlow, laneAvgOccupancy, laneAvgSpeed, totalFlow);
- c.output(KV.of(stationId, laneInfo));
- }
- }
- }
-
- /**
- * A custom 'combine function' used with the Combine.perKey transform. Used to find the max lane
- * flow over all the data points in the Window. Extracts the lane flow from the input string and
- * determines whether it's the max seen so far. We're using a custom combiner instead of the Max
- * transform because we want to retain the additional information we've associated with the flow
- * value.
- */
- public static class MaxFlow implements SerializableFunction<Iterable<LaneInfo>, LaneInfo> {
- @Override
- public LaneInfo apply(Iterable<LaneInfo> input) {
- Integer max = 0;
- LaneInfo maxInfo = new LaneInfo();
- for (LaneInfo item : input) {
- Integer flow = item.getLaneFlow();
- if (flow != null && (flow >= max)) {
- max = flow;
- maxInfo = item;
- }
- }
- return maxInfo;
- }
- }
-
- /**
- * Format the results of the Max Lane flow calculation to a TableRow, to save to BigQuery.
- * Add the timestamp from the window context.
- */
- static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> {
- @Override
- public void processElement(ProcessContext c) {
-
- LaneInfo laneInfo = c.element().getValue();
- TableRow row = new TableRow()
- .set("station_id", c.element().getKey())
- .set("direction", laneInfo.getDirection())
- .set("freeway", laneInfo.getFreeway())
- .set("lane_max_flow", laneInfo.getLaneFlow())
- .set("lane", laneInfo.getLane())
- .set("avg_occ", laneInfo.getLaneAO())
- .set("avg_speed", laneInfo.getLaneAS())
- .set("total_flow", laneInfo.getTotalFlow())
- .set("recorded_timestamp", laneInfo.getRecordedTimestamp())
- .set("window_timestamp", c.timestamp().toString());
- c.output(row);
- }
-
- /** Defines the BigQuery schema used for the output. */
- static TableSchema getSchema() {
- List<TableFieldSchema> fields = new ArrayList<>();
- fields.add(new TableFieldSchema().setName("station_id").setType("STRING"));
- fields.add(new TableFieldSchema().setName("direction").setType("STRING"));
- fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
- fields.add(new TableFieldSchema().setName("lane_max_flow").setType("INTEGER"));
- fields.add(new TableFieldSchema().setName("lane").setType("STRING"));
- fields.add(new TableFieldSchema().setName("avg_occ").setType("FLOAT"));
- fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
- fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
- fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
- fields.add(new TableFieldSchema().setName("recorded_timestamp").setType("STRING"));
- TableSchema schema = new TableSchema().setFields(fields);
- return schema;
- }
- }
-
- /**
- * This PTransform extracts lane info, calculates the max lane flow found for a given station (for
- * the current Window) using a custom 'combiner', and formats the results for BigQuery.
- */
- static class MaxLaneFlow
- extends PTransform<PCollection<KV<String, LaneInfo>>, PCollection<TableRow>> {
- @Override
- public PCollection<TableRow> apply(PCollection<KV<String, LaneInfo>> flowInfo) {
- // stationId, LaneInfo => stationId + max lane flow info
- PCollection<KV<String, LaneInfo>> flowMaxes =
- flowInfo.apply(Combine.<String, LaneInfo>perKey(
- new MaxFlow()));
-
- // <stationId, max lane flow info>... => row...
- PCollection<TableRow> results = flowMaxes.apply(
- ParDo.of(new FormatMaxesFn()));
-
- return results;
- }
- }
-
- static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
- private final String inputFile;
-
- public ReadFileAndExtractTimestamps(String inputFile) {
- this.inputFile = inputFile;
- }
-
- @Override
- public PCollection<String> apply(PBegin begin) {
- return begin
- .apply(TextIO.Read.from(inputFile))
- .apply(ParDo.of(new ExtractTimestamps()));
- }
- }
-
- /**
- * Options supported by {@link TrafficMaxLaneFlow}.
- *
- * <p>Inherits standard configuration options.
- */
- private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions,
- ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
- @Description("Input file to inject to Pub/Sub topic")
- @Default.String("gs://dataflow-samples/traffic_sensor/"
- + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
- String getInputFile();
- void setInputFile(String value);
-
- @Description("Numeric value of sliding window duration, in minutes")
- @Default.Integer(WINDOW_DURATION)
- Integer getWindowDuration();
- void setWindowDuration(Integer value);
-
- @Description("Numeric value of window 'slide every' setting, in minutes")
- @Default.Integer(WINDOW_SLIDE_EVERY)
- Integer getWindowSlideEvery();
- void setWindowSlideEvery(Integer value);
-
- @Description("Whether to run the pipeline with unbounded input")
- @Default.Boolean(false)
- boolean isUnbounded();
- void setUnbounded(boolean value);
- }
-
- /**
- * Sets up and starts streaming pipeline.
- *
- * @throws IOException if there is a problem setting up resources
- */
- public static void main(String[] args) throws IOException {
- TrafficMaxLaneFlowOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(TrafficMaxLaneFlowOptions.class);
- options.setBigQuerySchema(FormatMaxesFn.getSchema());
- // Using DataflowExampleUtils to set up required resources.
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
-
- Pipeline pipeline = Pipeline.create(options);
- TableReference tableRef = new TableReference();
- tableRef.setProjectId(options.getProject());
- tableRef.setDatasetId(options.getBigQueryDataset());
- tableRef.setTableId(options.getBigQueryTable());
-
- PCollection<String> input;
- if (options.isUnbounded()) {
- // Read unbounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()));
- } else {
- // Read bounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
-
- // To read bounded TextIO files, use:
- // input = pipeline.apply(new ReadFileAndExtractTimestamps(options.getInputFile()));
- }
- input
- // row... => <station route, station speed> ...
- .apply(ParDo.of(new ExtractFlowInfoFn()))
- // map the incoming data stream into sliding windows. The default window duration values
- // work well if you're running the accompanying Pub/Sub generator script with the
- // --replay flag, which simulates pauses in the sensor data publication. You may want to
- // adjust them otherwise.
- .apply(Window.<KV<String, LaneInfo>>into(SlidingWindows.of(
- Duration.standardMinutes(options.getWindowDuration())).
- every(Duration.standardMinutes(options.getWindowSlideEvery()))))
- .apply(new MaxLaneFlow())
- .apply(BigQueryIO.Write.to(tableRef)
- .withSchema(FormatMaxesFn.getSchema()));
-
- // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
- if (!Strings.isNullOrEmpty(options.getInputFile())
- && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
- dataflowUtils.runInjectorPipeline(
- new ReadFileAndExtractTimestamps(options.getInputFile()),
- options.getPubsubTopic(),
- PUBSUB_TIMESTAMP_LABEL_KEY);
- }
-
- // Run the pipeline.
- PipelineResult result = pipeline.run();
-
- // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
- dataflowUtils.waitToFinish(result);
- }
-
- private static Integer tryIntParse(String number) {
- try {
- return Integer.parseInt(number);
- } catch (NumberFormatException e) {
- return null;
- }
- }
-
- private static Double tryDoubleParse(String number) {
- try {
- return Double.parseDouble(number);
- } catch (NumberFormatException e) {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
deleted file mode 100644
index b1c72e6..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.complete;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-import org.apache.avro.reflect.Nullable;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
- * You can configure the running mode by setting {@literal --streaming} to true or false.
- *
- * <p>Concepts: The batch and streaming runners, GroupByKey, sliding windows, and
- * Google Cloud Pub/Sub topic injection.
- *
- * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
- * it calculates the average speed over the window for some small set of predefined 'routes',
- * and looks for 'slowdowns' in those routes. It writes its results to a BigQuery table.
- *
- * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
- *
- * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
- * By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
- * is provided in
- * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
- *
- * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
- */
-
-public class TrafficRoutes {
-
- private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
- private static final Integer VALID_INPUTS = 4999;
-
- // Instantiate some small predefined San Diego routes to analyze
- static Map<String, String> sdStations = buildStationInfo();
- static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes
- static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes
-
- /**
- * This class holds information about a station reading's average speed.
- */
- @DefaultCoder(AvroCoder.class)
- static class StationSpeed implements Comparable<StationSpeed> {
- @Nullable String stationId;
- @Nullable Double avgSpeed;
- @Nullable Long timestamp;
-
- public StationSpeed() {}
-
- public StationSpeed(String stationId, Double avgSpeed, Long timestamp) {
- this.stationId = stationId;
- this.avgSpeed = avgSpeed;
- this.timestamp = timestamp;
- }
-
- public String getStationId() {
- return this.stationId;
- }
- public Double getAvgSpeed() {
- return this.avgSpeed;
- }
-
- @Override
- public int compareTo(StationSpeed other) {
- return Long.compare(this.timestamp, other.timestamp);
- }
- }
-
- /**
- * This class holds information about a route's speed/slowdown.
- */
- @DefaultCoder(AvroCoder.class)
- static class RouteInfo {
- @Nullable String route;
- @Nullable Double avgSpeed;
- @Nullable Boolean slowdownEvent;
-
-
- public RouteInfo() {}
-
- public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) {
- this.route = route;
- this.avgSpeed = avgSpeed;
- this.slowdownEvent = slowdownEvent;
- }
-
- public String getRoute() {
- return this.route;
- }
- public Double getAvgSpeed() {
- return this.avgSpeed;
- }
- public Boolean getSlowdownEvent() {
- return this.slowdownEvent;
- }
- }
-
- /**
- * Extract the timestamp field from the input string, and use it as the element timestamp.
- */
- static class ExtractTimestamps extends DoFn<String, String> {
- private static final DateTimeFormatter dateTimeFormat =
- DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
-
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
- String[] items = c.element().split(",");
- String timestamp = tryParseTimestamp(items);
- if (timestamp != null) {
- try {
- c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
- } catch (IllegalArgumentException e) {
- // Skip the invalid input.
- }
- }
- }
- }
-
- /**
- * Filter out readings for the stations along predefined 'routes', and output
- * (station, speed info) keyed on route.
- */
- static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> {
-
- @Override
- public void processElement(ProcessContext c) {
- String[] items = c.element().split(",");
- String stationType = tryParseStationType(items);
- // For this analysis, use only 'main line' station types
- if (stationType != null && stationType.equals("ML")) {
- Double avgSpeed = tryParseAvgSpeed(items);
- String stationId = tryParseStationId(items);
- // For this simple example, filter out everything but some hardwired routes.
- if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
- StationSpeed stationSpeed =
- new StationSpeed(stationId, avgSpeed, c.timestamp().getMillis());
- // The tuple key is the 'route' name stored in the 'sdStations' hash.
- KV<String, StationSpeed> outputValue = KV.of(sdStations.get(stationId), stationSpeed);
- c.output(outputValue);
- }
- }
- }
- }
-
- /**
- * For a given route, track average speed for the window. Calculate whether
- * traffic is currently slowing down, via a predefined threshold. If a supermajority of
- * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
- * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
- */
- static class GatherStats
- extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
- @Override
- public void processElement(ProcessContext c) throws IOException {
- String route = c.element().getKey();
- double speedSum = 0.0;
- int speedCount = 0;
- int speedups = 0;
- int slowdowns = 0;
- List<StationSpeed> infoList = Lists.newArrayList(c.element().getValue());
- // StationSpeeds sort by embedded timestamp.
- Collections.sort(infoList);
- Map<String, Double> prevSpeeds = new HashMap<>();
- // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
- for (StationSpeed item : infoList) {
- Double speed = item.getAvgSpeed();
- if (speed != null) {
- speedSum += speed;
- speedCount++;
- Double lastSpeed = prevSpeeds.get(item.getStationId());
- if (lastSpeed != null) {
- if (lastSpeed < speed) {
- speedups += 1;
- } else {
- slowdowns += 1;
- }
- }
- prevSpeeds.put(item.getStationId(), speed);
- }
- }
- if (speedCount == 0) {
- // No average to compute.
- return;
- }
- double speedAvg = speedSum / speedCount;
- boolean slowdownEvent = slowdowns >= 2 * speedups;
- RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
- c.output(KV.of(route, routeInfo));
- }
- }
-
- /**
- * Format the results of the slowdown calculations to a TableRow, to save to BigQuery.
- */
- static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> {
- @Override
- public void processElement(ProcessContext c) {
- RouteInfo routeInfo = c.element().getValue();
- TableRow row = new TableRow()
- .set("avg_speed", routeInfo.getAvgSpeed())
- .set("slowdown_event", routeInfo.getSlowdownEvent())
- .set("route", c.element().getKey())
- .set("window_timestamp", c.timestamp().toString());
- c.output(row);
- }
-
- /**
- * Defines the BigQuery schema used for the output.
- */
- static TableSchema getSchema() {
- List<TableFieldSchema> fields = new ArrayList<>();
- fields.add(new TableFieldSchema().setName("route").setType("STRING"));
- fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
- fields.add(new TableFieldSchema().setName("slowdown_event").setType("BOOLEAN"));
- fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
- TableSchema schema = new TableSchema().setFields(fields);
- return schema;
- }
- }
-
- /**
- * This PTransform extracts speed info from traffic station readings.
- * It groups the readings by 'route' and analyzes traffic slowdown for that route.
- * Lastly, it formats the results for BigQuery.
- */
- static class TrackSpeed extends
- PTransform<PCollection<KV<String, StationSpeed>>, PCollection<TableRow>> {
- @Override
- public PCollection<TableRow> apply(PCollection<KV<String, StationSpeed>> stationSpeed) {
- // Apply a GroupByKey transform to collect a list of all station
- // readings for a given route.
- PCollection<KV<String, Iterable<StationSpeed>>> timeGroup = stationSpeed.apply(
- GroupByKey.<String, StationSpeed>create());
-
- // Analyze 'slowdown' over the route readings.
- PCollection<KV<String, RouteInfo>> stats = timeGroup.apply(ParDo.of(new GatherStats()));
-
- // Format the results for writing to BigQuery
- PCollection<TableRow> results = stats.apply(
- ParDo.of(new FormatStatsFn()));
-
- return results;
- }
- }
-
- static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
- private final String inputFile;
-
- public ReadFileAndExtractTimestamps(String inputFile) {
- this.inputFile = inputFile;
- }
-
- @Override
- public PCollection<String> apply(PBegin begin) {
- return begin
- .apply(TextIO.Read.from(inputFile))
- .apply(ParDo.of(new ExtractTimestamps()));
- }
- }
-
- /**
- * Options supported by {@link TrafficRoutes}.
- *
- * <p>Inherits standard configuration options.
- */
- private interface TrafficRoutesOptions extends DataflowExampleOptions,
- ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
- @Description("Input file to inject to Pub/Sub topic")
- @Default.String("gs://dataflow-samples/traffic_sensor/"
- + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
- String getInputFile();
- void setInputFile(String value);
-
- @Description("Numeric value of sliding window duration, in minutes")
- @Default.Integer(WINDOW_DURATION)
- Integer getWindowDuration();
- void setWindowDuration(Integer value);
-
- @Description("Numeric value of window 'slide every' setting, in minutes")
- @Default.Integer(WINDOW_SLIDE_EVERY)
- Integer getWindowSlideEvery();
- void setWindowSlideEvery(Integer value);
-
- @Description("Whether to run the pipeline with unbounded input")
- @Default.Boolean(false)
- boolean isUnbounded();
- void setUnbounded(boolean value);
- }
-
- /**
- * Sets up and starts streaming pipeline.
- *
- * @throws IOException if there is a problem setting up resources
- */
- public static void main(String[] args) throws IOException {
- TrafficRoutesOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(TrafficRoutesOptions.class);
-
- options.setBigQuerySchema(FormatStatsFn.getSchema());
- // Using DataflowExampleUtils to set up required resources.
- DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options, options.isUnbounded());
-
- Pipeline pipeline = Pipeline.create(options);
- TableReference tableRef = new TableReference();
- tableRef.setProjectId(options.getProject());
- tableRef.setDatasetId(options.getBigQueryDataset());
- tableRef.setTableId(options.getBigQueryTable());
-
- PCollection<String> input;
- if (options.isUnbounded()) {
- // Read unbounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()));
- } else {
- // Read bounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
-
- // To read bounded TextIO files, use:
- // input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
- // .apply(ParDo.of(new ExtractTimestamps()));
- }
- input
- // row... => <station route, station speed> ...
- .apply(ParDo.of(new ExtractStationSpeedFn()))
- // map the incoming data stream into sliding windows.
- // The default window duration values work well if you're running the accompanying Pub/Sub
- // generator script without the --replay flag, so that there are no simulated pauses in
- // the sensor data publication. You may want to adjust the values otherwise.
- .apply(Window.<KV<String, StationSpeed>>into(SlidingWindows.of(
- Duration.standardMinutes(options.getWindowDuration())).
- every(Duration.standardMinutes(options.getWindowSlideEvery()))))
- .apply(new TrackSpeed())
- .apply(BigQueryIO.Write.to(tableRef)
- .withSchema(FormatStatsFn.getSchema()));
-
- // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
- if (!Strings.isNullOrEmpty(options.getInputFile())
- && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
- dataflowUtils.runInjectorPipeline(
- new ReadFileAndExtractTimestamps(options.getInputFile()),
- options.getPubsubTopic(),
- PUBSUB_TIMESTAMP_LABEL_KEY);
- }
-
- // Run the pipeline.
- PipelineResult result = pipeline.run();
-
- // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
- dataflowUtils.waitToFinish(result);
- }
-
- private static Double tryParseAvgSpeed(String[] inputItems) {
- try {
- return Double.parseDouble(tryParseString(inputItems, 9));
- } catch (NumberFormatException e) {
- return null;
- } catch (NullPointerException e) {
- return null;
- }
- }
-
- private static String tryParseStationType(String[] inputItems) {
- return tryParseString(inputItems, 4);
- }
-
- private static String tryParseStationId(String[] inputItems) {
- return tryParseString(inputItems, 1);
- }
-
- private static String tryParseTimestamp(String[] inputItems) {
- return tryParseString(inputItems, 0);
- }
-
- private static String tryParseString(String[] inputItems, int index) {
- return inputItems.length >= index ? inputItems[index] : null;
- }
-
- /**
- * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
- */
- private static Map<String, String> buildStationInfo() {
- Map<String, String> stations = new Hashtable<String, String>();
- stations.put("1108413", "SDRoute1"); // from freeway 805 S
- stations.put("1108699", "SDRoute2"); // from freeway 78 E
- stations.put("1108702", "SDRoute2");
- return stations;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java
deleted file mode 100644
index e5fd015..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An example that reads the public samples of weather data from BigQuery, counts the number of
- * tornadoes that occur in each month, and writes the results to BigQuery.
- *
- * <p>Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms
- *
- * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
- * table.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and the BigQuery table for the output, with the form
- * <pre>{@code
- * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations}
- * and can be overridden with {@code --input}.
- */
-public class BigQueryTornadoes {
- // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
- private static final String WEATHER_SAMPLES_TABLE =
- "clouddataflow-readonly:samples.weather_stations";
-
- /**
- * Examines each row in the input table. If a tornado was recorded
- * in that sample, the month in which it occurred is output.
- */
- static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
- @Override
- public void processElement(ProcessContext c){
- TableRow row = c.element();
- if ((Boolean) row.get("tornado")) {
- c.output(Integer.parseInt((String) row.get("month")));
- }
- }
- }
-
- /**
- * Prepares the data for writing to BigQuery by building a TableRow object containing an
- * integer representation of month and the number of tornadoes that occurred in each month.
- */
- static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
- @Override
- public void processElement(ProcessContext c) {
- TableRow row = new TableRow()
- .set("month", c.element().getKey())
- .set("tornado_count", c.element().getValue());
- c.output(row);
- }
- }
-
- /**
- * Takes rows from a table and generates a table of counts.
- *
- * <p>The input schema is described by
- * https://developers.google.com/bigquery/docs/dataset-gsod .
- * The output contains the total number of tornadoes found in each month in
- * the following schema:
- * <ul>
- * <li>month: integer</li>
- * <li>tornado_count: integer</li>
- * </ul>
- */
- static class CountTornadoes
- extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
- @Override
- public PCollection<TableRow> apply(PCollection<TableRow> rows) {
-
- // row... => month...
- PCollection<Integer> tornadoes = rows.apply(
- ParDo.of(new ExtractTornadoesFn()));
-
- // month... => <month,count>...
- PCollection<KV<Integer, Long>> tornadoCounts =
- tornadoes.apply(Count.<Integer>perElement());
-
- // <month,count>... => row...
- PCollection<TableRow> results = tornadoCounts.apply(
- ParDo.of(new FormatCountsFn()));
-
- return results;
- }
- }
-
- /**
- * Options supported by {@link BigQueryTornadoes}.
- *
- * <p>Inherits standard configuration options.
- */
- private static interface Options extends PipelineOptions {
- @Description("Table to read from, specified as "
- + "<project_id>:<dataset_id>.<table_id>")
- @Default.String(WEATHER_SAMPLES_TABLE)
- String getInput();
- void setInput(String value);
-
- @Description("BigQuery table to write to, specified as "
- + "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- public static void main(String[] args) {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
- Pipeline p = Pipeline.create(options);
-
- // Build the table schema for the output table.
- List<TableFieldSchema> fields = new ArrayList<>();
- fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
- fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
- TableSchema schema = new TableSchema().setFields(fields);
-
- p.apply(BigQueryIO.Read.from(options.getInput()))
- .apply(new CountTornadoes())
- .apply(BigQueryIO.Write
- .to(options.getOutput())
- .withSchema(schema)
- .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java
deleted file mode 100644
index 93304eb..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * An example that reads the public 'Shakespeare' data, and for each word in
- * the dataset that is over a given length, generates a string containing the
- * list of play names in which that word appears, and saves this information
- * to a bigquery table.
- *
- * <p>Concepts: the Combine.perKey transform, which lets you combine the values in a
- * key-grouped Collection, and how to use an Aggregator to track information in the
- * Monitoring UI.
- *
- * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
- * table.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://<STAGING DIRECTORY>
- * --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and the BigQuery table for the output:
- * <pre>{@code
- * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
- * }</pre>
- *
- * <p>The BigQuery input table defaults to {@code publicdata:samples.shakespeare} and can
- * be overridden with {@code --input}.
- */
-public class CombinePerKeyExamples {
- // Use the shakespeare public BigQuery sample
- private static final String SHAKESPEARE_TABLE =
- "publicdata:samples.shakespeare";
- // We'll track words >= this word length across all plays in the table.
- private static final int MIN_WORD_LENGTH = 9;
-
- /**
- * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
- * outputs word, play_name.
- */
- static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
- private final Aggregator<Long, Long> smallerWords =
- createAggregator("smallerWords", new Sum.SumLongFn());
-
- @Override
- public void processElement(ProcessContext c){
- TableRow row = c.element();
- String playName = (String) row.get("corpus");
- String word = (String) row.get("word");
- if (word.length() >= MIN_WORD_LENGTH) {
- c.output(KV.of(word, playName));
- } else {
- // Track how many smaller words we're not including. This information will be
- // visible in the Monitoring UI.
- smallerWords.addValue(1L);
- }
- }
- }
-
-
- /**
- * Prepares the data for writing to BigQuery by building a TableRow object
- * containing a word with a string listing the plays in which it appeared.
- */
- static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> {
- @Override
- public void processElement(ProcessContext c) {
- TableRow row = new TableRow()
- .set("word", c.element().getKey())
- .set("all_plays", c.element().getValue());
- c.output(row);
- }
- }
-
- /**
- * Reads the public 'Shakespeare' data, and for each word in the dataset
- * over a given length, generates a string containing the list of play names
- * in which that word appears. It does this via the Combine.perKey
- * transform, with the ConcatWords combine function.
- *
- * <p>Combine.perKey is similar to a GroupByKey followed by a ParDo, but
- * has more restricted semantics that allow it to be executed more
- * efficiently. These records are then formatted as BQ table rows.
- */
- static class PlaysForWord
- extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
- @Override
- public PCollection<TableRow> apply(PCollection<TableRow> rows) {
-
- // row... => <word, play_name> ...
- PCollection<KV<String, String>> words = rows.apply(
- ParDo.of(new ExtractLargeWordsFn()));
-
- // word, play_name => word, all_plays ...
- PCollection<KV<String, String>> wordAllPlays =
- words.apply(Combine.<String, String>perKey(
- new ConcatWords()));
-
- // <word, all_plays>... => row...
- PCollection<TableRow> results = wordAllPlays.apply(
- ParDo.of(new FormatShakespeareOutputFn()));
-
- return results;
- }
- }
-
- /**
- * A 'combine function' used with the Combine.perKey transform. Builds a
- * comma-separated string of all input items. So, it will build a string
- * containing all the different Shakespeare plays in which the given input
- * word has appeared.
- */
- public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
- @Override
- public String apply(Iterable<String> input) {
- StringBuilder all = new StringBuilder();
- for (String item : input) {
- if (!item.isEmpty()) {
- if (all.length() == 0) {
- all.append(item);
- } else {
- all.append(",");
- all.append(item);
- }
- }
- }
- return all.toString();
- }
- }
-
- /**
- * Options supported by {@link CombinePerKeyExamples}.
- *
- * <p>Inherits standard configuration options.
- */
- private static interface Options extends PipelineOptions {
- @Description("Table to read from, specified as "
- + "<project_id>:<dataset_id>.<table_id>")
- @Default.String(SHAKESPEARE_TABLE)
- String getInput();
- void setInput(String value);
-
- @Description("Table to write to, specified as "
- + "<project_id>:<dataset_id>.<table_id>. "
- + "The dataset_id must already exist")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
- }
-
- public static void main(String[] args)
- throws Exception {
-
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- Pipeline p = Pipeline.create(options);
-
- // Build the table schema for the output table.
- List<TableFieldSchema> fields = new ArrayList<>();
- fields.add(new TableFieldSchema().setName("word").setType("STRING"));
- fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
- TableSchema schema = new TableSchema().setFields(fields);
-
- p.apply(BigQueryIO.Read.from(options.getInput()))
- .apply(new PlaysForWord())
- .apply(BigQueryIO.Write
- .to(options.getOutput())
- .withSchema(schema)
- .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
-
- p.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java
deleted file mode 100644
index 9dddb5d..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.cookbook;
-
-import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
-import static com.google.api.services.datastore.client.DatastoreHelper.getString;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
-
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Property;
-import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
-import com.google.api.services.datastore.DatastoreV1.Query;
-import com.google.api.services.datastore.DatastoreV1.Value;
-import com.google.cloud.dataflow.examples.WordCount;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.DatastoreIO;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-
-import java.util.Map;
-import java.util.UUID;
-
-import javax.annotation.Nullable;
-
-/**
- * A WordCount example using DatastoreIO.
- *
- * <p>This example shows how to use DatastoreIO to read from Datastore and
- * write the results to Cloud Storage. Note that this example will write
- * data to Datastore, which may incur charge for Datastore operations.
- *
- * <p>To run this example, users need to use gcloud to get credential for Datastore:
- * <pre>{@code
- * $ gcloud auth login
- * }</pre>
- *
- * <p>To run this pipeline locally, the following options must be provided:
- * <pre>{@code
- * --project=YOUR_PROJECT_ID
- * --dataset=YOUR_DATASET_ID
- * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PATH]
- * }</pre>
- *
- * <p>To run this example using Dataflow service, you must additionally
- * provide either {@literal --stagingLocation} or {@literal --tempLocation}, and
- * select one of the Dataflow pipeline runners, eg
- * {@literal --runner=BlockingDataflowPipelineRunner}.
- *
- * <p><b>Note:</b> this example creates entities with <i>Ancestor keys</i> to ensure that all
- * entities created are in the same entity group. Similarly, the query used to read from the Cloud
- * Datastore uses an <i>Ancestor filter</i>. Ancestors are used to ensure strongly consistent
- * results in Cloud Datastore. For more information, see the Cloud Datastore documentation on
- * <a href="https://cloud.google.com/datastore/docs/concepts/structuring_for_strong_consistency">
- * Structing Data for Strong Consistency</a>.
- */
-public class DatastoreWordCount {
-
- /**
- * A DoFn that gets the content of an entity (one line in a
- * Shakespeare play) and converts it to a string.
- */
- static class GetContentFn extends DoFn<Entity, String> {
- @Override
- public void processElement(ProcessContext c) {
- Map<String, Value> props = getPropertyMap(c.element());
- Value value = props.get("content");
- if (value != null) {
- c.output(getString(value));
- }
- }
- }
-
- /**
- * A helper function to create the ancestor key for all created and queried entities.
- *
- * <p>We use ancestor keys and ancestor queries for strong consistency. See
- * {@link DatastoreWordCount} javadoc for more information.
- */
- static Key makeAncestorKey(@Nullable String namespace, String kind) {
- Key.Builder keyBuilder = makeKey(kind, "root");
- if (namespace != null) {
- keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
- }
- return keyBuilder.build();
- }
-
- /**
- * A DoFn that creates entity for every line in Shakespeare.
- */
- static class CreateEntityFn extends DoFn<String, Entity> {
- private final String namespace;
- private final String kind;
- private final Key ancestorKey;
-
- CreateEntityFn(String namespace, String kind) {
- this.namespace = namespace;
- this.kind = kind;
-
- // Build the ancestor key for all created entities once, including the namespace.
- ancestorKey = makeAncestorKey(namespace, kind);
- }
-
- public Entity makeEntity(String content) {
- Entity.Builder entityBuilder = Entity.newBuilder();
-
- // All created entities have the same ancestor Key.
- Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
- // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
- // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
- // we can simplify this code.
- if (namespace != null) {
- keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
- }
-
- entityBuilder.setKey(keyBuilder.build());
- entityBuilder.addProperty(Property.newBuilder().setName("content")
- .setValue(Value.newBuilder().setStringValue(content)));
- return entityBuilder.build();
- }
-
- @Override
- public void processElement(ProcessContext c) {
- c.output(makeEntity(c.element()));
- }
- }
-
- /**
- * Options supported by {@link DatastoreWordCount}.
- *
- * <p>Inherits standard configuration options.
- */
- public static interface Options extends PipelineOptions {
- @Description("Path of the file to read from and store to Datastore")
- @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
- String getInput();
- void setInput(String value);
-
- @Description("Path of the file to write to")
- @Validation.Required
- String getOutput();
- void setOutput(String value);
-
- @Description("Dataset ID to read from datastore")
- @Validation.Required
- String getDataset();
- void setDataset(String value);
-
- @Description("Dataset entity kind")
- @Default.String("shakespeare-demo")
- String getKind();
- void setKind(String value);
-
- @Description("Dataset namespace")
- String getNamespace();
- void setNamespace(@Nullable String value);
-
- @Description("Read an existing dataset, do not write first")
- boolean isReadOnly();
- void setReadOnly(boolean value);
-
- @Description("Number of output shards")
- @Default.Integer(0) // If the system should choose automatically.
- int getNumShards();
- void setNumShards(int value);
- }
-
- /**
- * An example that creates a pipeline to populate DatastoreIO from a
- * text input. Forces use of DirectPipelineRunner for local execution mode.
- */
- public static void writeDataToDatastore(Options options) {
- Pipeline p = Pipeline.create(options);
- p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
- .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
- .apply(DatastoreIO.writeTo(options.getDataset()));
-
- p.run();
- }
-
- /**
- * Build a Cloud Datastore ancestor query for the specified {@link Options#getNamespace} and
- * {@link Options#getKind}.
- *
- * <p>We use ancestor keys and ancestor queries for strong consistency. See
- * {@link DatastoreWordCount} javadoc for more information.
- *
- * @see <a href="https://cloud.google.com/datastore/docs/concepts/queries#Datastore_Ancestor_filters">Ancestor filters</a>
- */
- static Query makeAncestorKindQuery(Options options) {
- Query.Builder q = Query.newBuilder();
- q.addKindBuilder().setName(options.getKind());
- q.setFilter(makeFilter(
- "__key__",
- PropertyFilter.Operator.HAS_ANCESTOR,
- makeValue(makeAncestorKey(options.getNamespace(), options.getKind()))));
- return q.build();
- }
-
- /**
- * An example that creates a pipeline to do DatastoreIO.Read from Datastore.
- */
- public static void readDataFromDatastore(Options options) {
- Query query = makeAncestorKindQuery(options);
-
- // For Datastore sources, the read namespace can be set on the entire query.
- DatastoreIO.Source source = DatastoreIO.source()
- .withDataset(options.getDataset())
- .withQuery(query)
- .withNamespace(options.getNamespace());
-
- Pipeline p = Pipeline.create(options);
- p.apply("ReadShakespeareFromDatastore", Read.from(source))
- .apply("StringifyEntity", ParDo.of(new GetContentFn()))
- .apply("CountWords", new WordCount.CountWords())
- .apply("PrintWordCount", MapElements.via(new WordCount.FormatAsTextFn()))
- .apply("WriteLines", TextIO.Write.to(options.getOutput())
- .withNumShards(options.getNumShards()));
- p.run();
- }
-
- /**
- * An example to demo how to use {@link DatastoreIO}. The runner here is
- * customizable, which means users could pass either {@code DirectPipelineRunner}
- * or {@code DataflowPipelineRunner} in the pipeline options.
- */
- public static void main(String args[]) {
- // The options are used in two places, for Dataflow service, and
- // building DatastoreIO.Read object
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
- if (!options.isReadOnly()) {
- // First example: write data to Datastore for reading later.
- //
- // NOTE: this write does not delete any existing Entities in the Datastore, so if run
- // multiple times with the same output dataset, there may be duplicate entries. The
- // Datastore Query tool in the Google Developers Console can be used to inspect or erase all
- // entries with a particular namespace and/or kind.
- DatastoreWordCount.writeDataToDatastore(options);
- }
-
- // Second example: do parallel read from Datastore.
- DatastoreWordCount.readDataFromDatastore(options);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java
deleted file mode 100644
index 40d1f76..0000000
--- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.cookbook;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-
-/**
- * This example uses as input Shakespeare's plays as plaintext files, and will remove any
- * duplicate lines across all the files. (The output does not preserve any input order).
- *
- * <p>Concepts: the RemoveDuplicates transform, and how to wire transforms together.
- * Demonstrates {@link com.google.cloud.dataflow.sdk.io.TextIO.Read}/
- * {@link RemoveDuplicates}/{@link com.google.cloud.dataflow.sdk.io.TextIO.Write}.
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * --project=YOUR_PROJECT_ID
- * and a local output file or output prefix on GCS:
- * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration:
- * --project=YOUR_PROJECT_ID
- * --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
- * and an output prefix on GCS:
- * --output=gs://YOUR_OUTPUT_PREFIX
- *
- * <p>The input defaults to {@code gs://dataflow-samples/shakespeare/*} and can be
- * overridden with {@code --input}.
- */
-public class DeDupExample {
-
- /**
- * Options supported by {@link DeDupExample}.
- *
- * <p>Inherits standard configuration options.
- */
- private static interface Options extends PipelineOptions {
- @Description("Path to the directory or GCS prefix containing files to read from")
- @Default.String("gs://dataflow-samples/shakespeare/*")
- String getInput();
- void setInput(String value);
-
- @Description("Path of the file to write to")
- @Default.InstanceFactory(OutputFactory.class)
- String getOutput();
- void setOutput(String value);
-
- /** Returns gs://${STAGING_LOCATION}/"deduped.txt". */
- public static class OutputFactory implements DefaultValueFactory<String> {
- @Override
- public String create(PipelineOptions options) {
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
- if (dataflowOptions.getStagingLocation() != null) {
- return GcsPath.fromUri(dataflowOptions.getStagingLocation())
- .resolve("deduped.txt").toString();
- } else {
- throw new IllegalArgumentException("Must specify --output or --stagingLocation");
- }
- }
- }
- }
-
-
- public static void main(String[] args)
- throws Exception {
-
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- Pipeline p = Pipeline.create(options);
-
- p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
- .apply(RemoveDuplicates.<String>create())
- .apply(TextIO.Write.named("DedupedShakespeare")
- .to(options.getOutput()));
-
- p.run();
- }
-}