You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/08 21:15:13 UTC

[1/2] incubator-beam git commit: Remove isUnbounded option, and the corresponding ExampleUtils constructor

Repository: incubator-beam
Updated Branches:
  refs/heads/master 994febef4 -> d9632b7f9


Remove isUnbounded option, and the corresponding ExampleUtils constructor


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/626240a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/626240a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/626240a4

Branch: refs/heads/master
Commit: 626240a4dc28b643a7f25fb71285918233e27da6
Parents: 994febe
Author: Pei He <pe...@google.com>
Authored: Fri Jul 8 12:46:27 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 14:15:03 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/WindowedWordCount.java |  9 +++----
 .../beam/examples/common/ExampleUtils.java      | 26 +++-----------------
 .../beam/examples/complete/AutoComplete.java    |  6 ++---
 .../examples/complete/StreamingWordExtract.java |  6 ++---
 .../examples/complete/TrafficMaxLaneFlow.java   | 13 +++-------
 .../beam/examples/complete/TrafficRoutes.java   | 13 +++-------
 .../beam/examples/cookbook/TriggerExample.java  |  8 +++---
 7 files changed, 22 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index b32128a..882ef7c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -181,10 +181,6 @@ public class WindowedWordCount {
     @Default.Integer(WINDOW_SIZE)
     Integer getWindowSize();
     void setWindowSize(Integer value);
-
-    @Description("Whether to run the pipeline with unbounded input")
-    boolean isUnbounded();
-    void setUnbounded(boolean value);
   }
 
   public static void main(String[] args) throws IOException {
@@ -192,7 +188,8 @@ public class WindowedWordCount {
     options.setBigQuerySchema(getSchema());
     // DataflowExampleUtils creates the necessary input sources to simplify execution of this
     // Pipeline.
-    ExampleUtils exampleDataflowUtils = new ExampleUtils(options, options.isUnbounded());
+    ExampleUtils exampleUtils = new ExampleUtils(options);
+    exampleUtils.setup();
 
     Pipeline pipeline = Pipeline.create(options);
 
@@ -237,6 +234,6 @@ public class WindowedWordCount {
     PipelineResult result = pipeline.run();
 
     // dataflowUtils will try to cancel the pipeline before the program exists.
-    exampleDataflowUtils.waitToFinish(result);
+    exampleUtils.waitToFinish(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index e30b1e4..6b71b0f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -72,17 +72,12 @@ public class ExampleUtils {
   private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
   private List<String> pendingMessages = Lists.newArrayList();
 
-  public ExampleUtils(DataflowPipelineOptions options) {
-    this.options = options;
-  }
-
   /**
    * Do resources and runner options setup.
    */
-  public ExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
-      throws IOException {
+  public ExampleUtils(DataflowPipelineOptions options) {
     this.options = options;
-    setupResourcesAndRunner(isUnbounded);
+    setupRunner();
   }
 
   /**
@@ -113,17 +108,6 @@ public class ExampleUtils {
   }
 
   /**
-   * Set up external resources, and configure the runner appropriately.
-   */
-  public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
-    if (isUnbounded) {
-      options.setStreaming(true);
-    }
-    setup();
-    setupRunner();
-  }
-
-  /**
    * Sets up the Google Cloud Pub/Sub topic.
    *
    * <p>If the topic doesn't exist, a new topic with the given name will be created.
@@ -297,11 +281,9 @@ public class ExampleUtils {
    * Do some runner setup: check that the DirectRunner is not used in conjunction with
    * streaming, and if streaming is specified, use the DataflowRunner.
    */
-  public void setupRunner() {
+  private void setupRunner() {
     Class<? extends PipelineRunner<?>> runner = options.getRunner();
-    if (options.isStreaming()
-        && (runner.equals(DataflowRunner.class)
-            || runner.equals(BlockingDataflowRunner.class))) {
+    if (options.isStreaming() && runner.equals(BlockingDataflowRunner.class)) {
       // In order to cancel the pipelines automatically,
       // {@literal DataflowRunner} is forced to be used.
       options.setRunner(DataflowRunner.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index f8cd0f1..708aa87 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -451,7 +451,7 @@ public class AutoComplete {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
 
     options.setBigQuerySchema(FormatForBigquery.getSchema());
-    ExampleUtils dataflowUtils = new ExampleUtils(options);
+    ExampleUtils exampleUtils = new ExampleUtils(options);
 
     // We support running the same pipeline in either
     // batch or windowed streaming mode.
@@ -480,7 +480,7 @@ public class AutoComplete {
           options.getOutputProject(), options.getProject())));
     }
     if (options.getOutputToBigQuery()) {
-      dataflowUtils.setupBigQueryTable();
+      exampleUtils.setupBigQueryTable();
 
       TableReference tableRef = new TableReference();
       tableRef.setProjectId(options.getProject());
@@ -502,6 +502,6 @@ public class AutoComplete {
     PipelineResult result = p.run();
 
     // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
-    dataflowUtils.waitToFinish(result);
+    exampleUtils.waitToFinish(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index 046428c..df2e36e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -120,8 +120,8 @@ public class StreamingWordExtract {
     options.setStreaming(true);
 
     options.setBigQuerySchema(StringToRowConverter.getSchema());
-    ExampleUtils dataflowUtils = new ExampleUtils(options);
-    dataflowUtils.setup();
+    ExampleUtils exampleUtils = new ExampleUtils(options);
+    exampleUtils.setup();
 
     Pipeline pipeline = Pipeline.create(options);
 
@@ -141,6 +141,6 @@ public class StreamingWordExtract {
     PipelineResult result = pipeline.run();
 
     // dataflowUtils will try to cancel the pipeline before the program exists.
-    dataflowUtils.waitToFinish(result);
+    exampleUtils.waitToFinish(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 1bbc68b..ae7b8cc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -78,9 +78,6 @@ import java.util.List;
  */
 public class TrafficMaxLaneFlow {
 
-  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
-  private static final Integer VALID_INPUTS = 4999;
-
   static final int WINDOW_DURATION = 60;  // Default sliding window duration in minutes
   static final int WINDOW_SLIDE_EVERY = 5;  // Default window 'slide every' setting in minutes
 
@@ -323,11 +320,6 @@ public class TrafficMaxLaneFlow {
     @Default.Integer(WINDOW_SLIDE_EVERY)
     Integer getWindowSlideEvery();
     void setWindowSlideEvery(Integer value);
-
-    @Description("Whether to run the pipeline with unbounded input")
-    @Default.Boolean(false)
-    boolean isUnbounded();
-    void setUnbounded(boolean value);
   }
 
   /**
@@ -341,7 +333,8 @@ public class TrafficMaxLaneFlow {
         .as(TrafficMaxLaneFlowOptions.class);
     options.setBigQuerySchema(FormatMaxesFn.getSchema());
     // Using DataflowExampleUtils to set up required resources.
-    ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded());
+    ExampleUtils exampleUtils = new ExampleUtils(options);
+    exampleUtils.setup();
 
     Pipeline pipeline = Pipeline.create(options);
     TableReference tableRef = new TableReference();
@@ -365,7 +358,7 @@ public class TrafficMaxLaneFlow {
     PipelineResult result = pipeline.run();
 
     // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
-    dataflowUtils.waitToFinish(result);
+    exampleUtils.waitToFinish(result);
   }
 
   private static Integer tryIntParse(String number) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 8af0922..1a3d46d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -82,9 +82,6 @@ import java.util.Map;
 
 public class TrafficRoutes {
 
-  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
-  private static final Integer VALID_INPUTS = 4999;
-
   // Instantiate some small predefined San Diego routes to analyze
   static Map<String, String> sdStations = buildStationInfo();
   static final int WINDOW_DURATION = 3;  // Default sliding window duration in minutes
@@ -333,11 +330,6 @@ public class TrafficRoutes {
     @Default.Integer(WINDOW_SLIDE_EVERY)
     Integer getWindowSlideEvery();
     void setWindowSlideEvery(Integer value);
-
-    @Description("Whether to run the pipeline with unbounded input")
-    @Default.Boolean(false)
-    boolean isUnbounded();
-    void setUnbounded(boolean value);
   }
 
   /**
@@ -352,7 +344,8 @@ public class TrafficRoutes {
 
     options.setBigQuerySchema(FormatStatsFn.getSchema());
     // Using DataflowExampleUtils to set up required resources.
-    ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded());
+    ExampleUtils exampleUtils = new ExampleUtils(options);
+    exampleUtils.setup();
 
     Pipeline pipeline = Pipeline.create(options);
     TableReference tableRef = new TableReference();
@@ -376,7 +369,7 @@ public class TrafficRoutes {
     PipelineResult result = pipeline.run();
 
     // dataflowUtils will try to cancel the pipeline and the injector before the program exists.
-    dataflowUtils.waitToFinish(result);
+    exampleUtils.waitToFinish(result);
   }
 
   private static Double tryParseAvgSpeed(String[] inputItems) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index aa91ac6..922cf23 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -433,8 +433,6 @@ public class TriggerExample {
     void setWindowDuration(Integer value);
   }
 
-  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
-
   public static void main(String[] args) throws Exception {
     TrafficFlowOptions options = PipelineOptionsFactory.fromArgs(args)
         .withValidation()
@@ -443,8 +441,8 @@ public class TriggerExample {
 
     options.setBigQuerySchema(getSchema());
 
-    ExampleUtils dataflowUtils = new ExampleUtils(options);
-    dataflowUtils.setup();
+    ExampleUtils exampleUtils = new ExampleUtils(options);
+    exampleUtils.setup();
 
     Pipeline pipeline = Pipeline.create(options);
 
@@ -464,7 +462,7 @@ public class TriggerExample {
     PipelineResult result = pipeline.run();
 
     // dataflowUtils will try to cancel the pipeline and the injector before the program exits.
-    dataflowUtils.waitToFinish(result);
+    exampleUtils.waitToFinish(result);
   }
 
   /**


[2/2] incubator-beam git commit: Closes #612

Posted by dh...@apache.org.
Closes #612


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9632b7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9632b7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9632b7f

Branch: refs/heads/master
Commit: d9632b7f978759f65ab505e1b2a6cacac4c420c7
Parents: 994febe 626240a
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 8 14:15:04 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 14:15:04 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/WindowedWordCount.java |  9 +++----
 .../beam/examples/common/ExampleUtils.java      | 26 +++-----------------
 .../beam/examples/complete/AutoComplete.java    |  6 ++---
 .../examples/complete/StreamingWordExtract.java |  6 ++---
 .../examples/complete/TrafficMaxLaneFlow.java   | 13 +++-------
 .../beam/examples/complete/TrafficRoutes.java   | 13 +++-------
 .../beam/examples/cookbook/TriggerExample.java  |  8 +++---
 7 files changed, 22 insertions(+), 59 deletions(-)
----------------------------------------------------------------------