You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:48 UTC
[42/50] [abbrv] incubator-beam git commit: Replace PubsubIO and
injector with TextIO in traffic examples
Replace PubsubIO and injector with TextIO in traffic examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ae75a5d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ae75a5d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ae75a5d8
Branch: refs/heads/runners-spark2
Commit: ae75a5d8c18098356f5e96ed25ca543a846d8f5b
Parents: 151ff5f
Author: Pei He <pe...@google.com>
Authored: Fri Jul 1 14:45:43 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:53 2016 -0700
----------------------------------------------------------------------
.../examples/complete/TrafficMaxLaneFlow.java | 64 ++++----------------
.../beam/examples/complete/TrafficRoutes.java | 62 +++----------------
2 files changed, 22 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae75a5d8/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index cfef311..2db7c9e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -20,13 +20,11 @@ package org.apache.beam.examples.complete;
import org.apache.beam.examples.common.DataflowExampleOptions;
import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
-import org.apache.beam.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -46,7 +44,6 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.Strings;
import org.apache.avro.reflect.Nullable;
import org.joda.time.Duration;
@@ -62,30 +59,19 @@ import java.util.List;
* A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
* You can configure the running mode by setting {@literal --streaming} to true or false.
*
- * <p>Concepts: The batch and streaming runners, sliding windows, Google Cloud Pub/Sub
- * topic injection, use of the AvroCoder to encode a custom class, and custom Combine transforms.
+ * <p>Concepts: The batch and streaming runners, sliding windows,
+ * use of the AvroCoder to encode a custom class, and custom Combine transforms.
*
* <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
* it finds the lane that had the highest flow recorded, for each sensor station. It writes
* those max values along with auxiliary info to a BigQuery table.
*
- * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
+ * <p>The pipeline reads traffic sensor data from {@literal --inputFile}.
*
- * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
- * By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
- * is provided in
- * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
+ * <p>The example is configured to use the default BigQuery table from the example common package
+ * (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --bigQueryDataset}, and {@literal --bigQueryTable}
+ * options. If the BigQuery table do not exist, the example will try to create them.
*
* <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
* and then exits.
@@ -322,8 +308,8 @@ public class TrafficMaxLaneFlow {
* <p>Inherits standard configuration options.
*/
private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions,
- ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
- @Description("Input file to inject to Pub/Sub topic")
+ ExampleBigQueryTableOptions {
+ @Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/traffic_sensor/"
+ "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
String getInputFile();
@@ -364,28 +350,11 @@ public class TrafficMaxLaneFlow {
tableRef.setDatasetId(options.getBigQueryDataset());
tableRef.setTableId(options.getBigQueryTable());
- PCollection<String> input;
- if (options.isUnbounded()) {
- // Read unbounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()));
- } else {
- // Read bounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
-
- // To read bounded TextIO files, use:
- // input = pipeline.apply(new ReadFileAndExtractTimestamps(options.getInputFile()));
- }
- input
+ pipeline
+ .apply("ReadLines", new ReadFileAndExtractTimestamps(options.getInputFile()))
// row... => <station route, station speed> ...
.apply(ParDo.of(new ExtractFlowInfoFn()))
- // map the incoming data stream into sliding windows. The default window duration values
- // work well if you're running the accompanying Pub/Sub generator script with the
- // --replay flag, which simulates pauses in the sensor data publication. You may want to
- // adjust them otherwise.
+ // map the incoming data stream into sliding windows.
.apply(Window.<KV<String, LaneInfo>>into(SlidingWindows.of(
Duration.standardMinutes(options.getWindowDuration())).
every(Duration.standardMinutes(options.getWindowSlideEvery()))))
@@ -393,15 +362,6 @@ public class TrafficMaxLaneFlow {
.apply(BigQueryIO.Write.to(tableRef)
.withSchema(FormatMaxesFn.getSchema()));
- // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
- if (!Strings.isNullOrEmpty(options.getInputFile())
- && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
- dataflowUtils.runInjectorPipeline(
- new ReadFileAndExtractTimestamps(options.getInputFile()),
- options.getPubsubTopic(),
- PUBSUB_TIMESTAMP_LABEL_KEY);
- }
-
// Run the pipeline.
PipelineResult result = pipeline.run();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae75a5d8/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index c9bada8..89cfbfc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -20,13 +20,11 @@ package org.apache.beam.examples.complete;
import org.apache.beam.examples.common.DataflowExampleOptions;
import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
-import org.apache.beam.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -45,7 +43,6 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.avro.reflect.Nullable;
@@ -66,30 +63,18 @@ import java.util.Map;
* A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
* You can configure the running mode by setting {@literal --streaming} to true or false.
*
- * <p>Concepts: The batch and streaming runners, GroupByKey, sliding windows, and
- * Google Cloud Pub/Sub topic injection.
+ * <p>Concepts: The batch and streaming runners, GroupByKey, sliding windows.
*
* <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
* it calculates the average speed over the window for some small set of predefined 'routes',
* and looks for 'slowdowns' in those routes. It writes its results to a BigQuery table.
*
- * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
+ * <p>The pipeline reads traffic sensor data from {@literal --inputFile}.
*
- * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
- * By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
- * is provided in
- * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
+ * <p>The example is configured to use the default BigQuery table from the example common package
+ * (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --bigQueryDataset}, and {@literal --bigQueryTable}
+ * options. If the BigQuery table do not exist, the example will try to create them.
*
* <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
* and then exits.
@@ -333,8 +318,8 @@ public class TrafficRoutes {
* <p>Inherits standard configuration options.
*/
private interface TrafficRoutesOptions extends DataflowExampleOptions,
- ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
- @Description("Input file to inject to Pub/Sub topic")
+ ExampleBigQueryTableOptions {
+ @Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/traffic_sensor/"
+ "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
String getInputFile();
@@ -376,29 +361,11 @@ public class TrafficRoutes {
tableRef.setDatasetId(options.getBigQueryDataset());
tableRef.setTableId(options.getBigQueryTable());
- PCollection<String> input;
- if (options.isUnbounded()) {
- // Read unbounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()));
- } else {
- // Read bounded PubSubIO.
- input = pipeline.apply(PubsubIO.Read
- .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
- .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
-
- // To read bounded TextIO files, use:
- // input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
- // .apply(ParDo.of(new ExtractTimestamps()));
- }
- input
+ pipeline
+ .apply("ReadLines", new ReadFileAndExtractTimestamps(options.getInputFile()))
// row... => <station route, station speed> ...
.apply(ParDo.of(new ExtractStationSpeedFn()))
// map the incoming data stream into sliding windows.
- // The default window duration values work well if you're running the accompanying Pub/Sub
- // generator script without the --replay flag, so that there are no simulated pauses in
- // the sensor data publication. You may want to adjust the values otherwise.
.apply(Window.<KV<String, StationSpeed>>into(SlidingWindows.of(
Duration.standardMinutes(options.getWindowDuration())).
every(Duration.standardMinutes(options.getWindowSlideEvery()))))
@@ -406,15 +373,6 @@ public class TrafficRoutes {
.apply(BigQueryIO.Write.to(tableRef)
.withSchema(FormatStatsFn.getSchema()));
- // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
- if (!Strings.isNullOrEmpty(options.getInputFile())
- && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
- dataflowUtils.runInjectorPipeline(
- new ReadFileAndExtractTimestamps(options.getInputFile()),
- options.getPubsubTopic(),
- PUBSUB_TIMESTAMP_LABEL_KEY);
- }
-
// Run the pipeline.
PipelineResult result = pipeline.run();