You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:48 UTC

[42/50] [abbrv] incubator-beam git commit: Replace PubsubIO and injector with TextIO in traffic examples

Replace PubsubIO and injector with TextIO in traffic examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ae75a5d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ae75a5d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ae75a5d8

Branch: refs/heads/runners-spark2
Commit: ae75a5d8c18098356f5e96ed25ca543a846d8f5b
Parents: 151ff5f
Author: Pei He <pe...@google.com>
Authored: Fri Jul 1 14:45:43 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:53 2016 -0700

----------------------------------------------------------------------
 .../examples/complete/TrafficMaxLaneFlow.java   | 64 ++++----------------
 .../beam/examples/complete/TrafficRoutes.java   | 62 +++----------------
 2 files changed, 22 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae75a5d8/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index cfef311..2db7c9e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -20,13 +20,11 @@ package org.apache.beam.examples.complete;
 import org.apache.beam.examples.common.DataflowExampleOptions;
 import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
-import org.apache.beam.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -46,7 +44,6 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.Strings;
 
 import org.apache.avro.reflect.Nullable;
 import org.joda.time.Duration;
@@ -62,30 +59,19 @@ import java.util.List;
  * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
  * You can configure the running mode by setting {@literal --streaming} to true or false.
  *
- * <p>Concepts: The batch and streaming runners, sliding windows, Google Cloud Pub/Sub
- * topic injection, use of the AvroCoder to encode a custom class, and custom Combine transforms.
+ * <p>Concepts: The batch and streaming runners, sliding windows,
+ * use of the AvroCoder to encode a custom class, and custom Combine transforms.
  *
  * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
  * it finds the lane that had the highest flow recorded, for each sensor station. It writes
  * those max values along with auxiliary info to a BigQuery table.
  *
- * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
+ * <p>The pipeline reads traffic sensor data from {@literal --inputFile}.
  *
- * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
- * By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
- * is provided in
- * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
+ * <p>The example is configured to use the default BigQuery table from the example common package
+ * (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --bigQueryDataset}, and {@literal --bigQueryTable}
+ * options. If the BigQuery table do not exist, the example will try to create them.
  *
  * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
  * and then exits.
@@ -322,8 +308,8 @@ public class TrafficMaxLaneFlow {
     * <p>Inherits standard configuration options.
     */
   private interface TrafficMaxLaneFlowOptions extends DataflowExampleOptions,
-      ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
-        @Description("Input file to inject to Pub/Sub topic")
+      ExampleBigQueryTableOptions {
+    @Description("Path of the file to read from")
     @Default.String("gs://dataflow-samples/traffic_sensor/"
         + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
     String getInputFile();
@@ -364,28 +350,11 @@ public class TrafficMaxLaneFlow {
     tableRef.setDatasetId(options.getBigQueryDataset());
     tableRef.setTableId(options.getBigQueryTable());
 
-    PCollection<String> input;
-    if (options.isUnbounded()) {
-      // Read unbounded PubSubIO.
-      input = pipeline.apply(PubsubIO.Read
-          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
-          .subscription(options.getPubsubSubscription()));
-    } else {
-      // Read bounded PubSubIO.
-      input = pipeline.apply(PubsubIO.Read
-          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
-          .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
-
-      // To read bounded TextIO files, use:
-      // input = pipeline.apply(new ReadFileAndExtractTimestamps(options.getInputFile()));
-    }
-    input
+    pipeline
+        .apply("ReadLines", new ReadFileAndExtractTimestamps(options.getInputFile()))
         // row... => <station route, station speed> ...
         .apply(ParDo.of(new ExtractFlowInfoFn()))
-        // map the incoming data stream into sliding windows. The default window duration values
-        // work well if you're running the accompanying Pub/Sub generator script with the
-        // --replay flag, which simulates pauses in the sensor data publication. You may want to
-        // adjust them otherwise.
+        // map the incoming data stream into sliding windows.
         .apply(Window.<KV<String, LaneInfo>>into(SlidingWindows.of(
             Duration.standardMinutes(options.getWindowDuration())).
             every(Duration.standardMinutes(options.getWindowSlideEvery()))))
@@ -393,15 +362,6 @@ public class TrafficMaxLaneFlow {
         .apply(BigQueryIO.Write.to(tableRef)
             .withSchema(FormatMaxesFn.getSchema()));
 
-    // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
-    if (!Strings.isNullOrEmpty(options.getInputFile())
-        && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
-      dataflowUtils.runInjectorPipeline(
-          new ReadFileAndExtractTimestamps(options.getInputFile()),
-          options.getPubsubTopic(),
-          PUBSUB_TIMESTAMP_LABEL_KEY);
-    }
-
     // Run the pipeline.
     PipelineResult result = pipeline.run();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae75a5d8/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index c9bada8..89cfbfc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -20,13 +20,11 @@ package org.apache.beam.examples.complete;
 import org.apache.beam.examples.common.DataflowExampleOptions;
 import org.apache.beam.examples.common.DataflowExampleUtils;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
-import org.apache.beam.examples.common.ExamplePubsubTopicAndSubscriptionOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -45,7 +43,6 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
 import org.apache.avro.reflect.Nullable;
@@ -66,30 +63,18 @@ import java.util.Map;
  * A Dataflow Example that runs in both batch and streaming modes with traffic sensor data.
  * You can configure the running mode by setting {@literal --streaming} to true or false.
  *
- * <p>Concepts: The batch and streaming runners, GroupByKey, sliding windows, and
- * Google Cloud Pub/Sub topic injection.
+ * <p>Concepts: The batch and streaming runners, GroupByKey, sliding windows.
  *
  * <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
  * it calculates the average speed over the window for some small set of predefined 'routes',
  * and looks for 'slowdowns' in those routes. It writes its results to a BigQuery table.
  *
- * <p>In batch mode, the pipeline reads traffic sensor data from {@literal --inputFile}.
+ * <p>The pipeline reads traffic sensor data from {@literal --inputFile}.
  *
- * <p>In streaming mode, the pipeline reads the data from a Pub/Sub topic.
- * By default, the example will run a separate pipeline to inject the data from the default
- * {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
- * the streaming pipeline to process. You may override the default {@literal --inputFile} with the
- * file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
- * disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
- * to this example. An example code, which publishes traffic sensor data to a Pub/Sub topic,
- * is provided in
- * <a href="https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/tree/master/gce-cmdline-publisher"></a>.
- *
- * <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
- * {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
+ * <p>The example is configured to use the default BigQuery table from the example common package
+ * (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@literal --bigQueryDataset}, and {@literal --bigQueryTable}
+ * options. If the BigQuery table do not exist, the example will try to create them.
  *
  * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
  * and then exits.
@@ -333,8 +318,8 @@ public class TrafficRoutes {
   * <p>Inherits standard configuration options.
   */
   private interface TrafficRoutesOptions extends DataflowExampleOptions,
-      ExamplePubsubTopicAndSubscriptionOptions, ExampleBigQueryTableOptions {
-    @Description("Input file to inject to Pub/Sub topic")
+      ExampleBigQueryTableOptions {
+    @Description("Path of the file to read from")
     @Default.String("gs://dataflow-samples/traffic_sensor/"
         + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
     String getInputFile();
@@ -376,29 +361,11 @@ public class TrafficRoutes {
     tableRef.setDatasetId(options.getBigQueryDataset());
     tableRef.setTableId(options.getBigQueryTable());
 
-    PCollection<String> input;
-    if (options.isUnbounded()) {
-      // Read unbounded PubSubIO.
-      input = pipeline.apply(PubsubIO.Read
-          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
-          .subscription(options.getPubsubSubscription()));
-    } else {
-      // Read bounded PubSubIO.
-      input = pipeline.apply(PubsubIO.Read
-          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
-          .subscription(options.getPubsubSubscription()).maxNumRecords(VALID_INPUTS));
-
-      // To read bounded TextIO files, use:
-      // input = pipeline.apply(TextIO.Read.from(options.getInputFile()))
-      //    .apply(ParDo.of(new ExtractTimestamps()));
-    }
-    input
+    pipeline
+        .apply("ReadLines", new ReadFileAndExtractTimestamps(options.getInputFile()))
         // row... => <station route, station speed> ...
         .apply(ParDo.of(new ExtractStationSpeedFn()))
         // map the incoming data stream into sliding windows.
-        // The default window duration values work well if you're running the accompanying Pub/Sub
-        // generator script without the --replay flag, so that there are no simulated pauses in
-        // the sensor data publication. You may want to adjust the values otherwise.
         .apply(Window.<KV<String, StationSpeed>>into(SlidingWindows.of(
             Duration.standardMinutes(options.getWindowDuration())).
             every(Duration.standardMinutes(options.getWindowSlideEvery()))))
@@ -406,15 +373,6 @@ public class TrafficRoutes {
         .apply(BigQueryIO.Write.to(tableRef)
             .withSchema(FormatStatsFn.getSchema()));
 
-    // Inject the data into the Pub/Sub topic with a Dataflow batch pipeline.
-    if (!Strings.isNullOrEmpty(options.getInputFile())
-        && !Strings.isNullOrEmpty(options.getPubsubTopic())) {
-      dataflowUtils.runInjectorPipeline(
-          new ReadFileAndExtractTimestamps(options.getInputFile()),
-          options.getPubsubTopic(),
-          PUBSUB_TIMESTAMP_LABEL_KEY);
-    }
-
     // Run the pipeline.
     PipelineResult result = pipeline.run();