You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/08 21:15:13 UTC
[1/2] incubator-beam git commit: Remove isUnbounded option,
and the corresponding ExampleUtils constructor
Repository: incubator-beam
Updated Branches:
refs/heads/master 994febef4 -> d9632b7f9
Remove isUnbounded option, and the corresponding ExampleUtils constructor
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/626240a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/626240a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/626240a4
Branch: refs/heads/master
Commit: 626240a4dc28b643a7f25fb71285918233e27da6
Parents: 994febe
Author: Pei He <pe...@google.com>
Authored: Fri Jul 8 12:46:27 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 14:15:03 2016 -0700
----------------------------------------------------------------------
.../apache/beam/examples/WindowedWordCount.java | 9 +++----
.../beam/examples/common/ExampleUtils.java | 26 +++-----------------
.../beam/examples/complete/AutoComplete.java | 6 ++---
.../examples/complete/StreamingWordExtract.java | 6 ++---
.../examples/complete/TrafficMaxLaneFlow.java | 13 +++-------
.../beam/examples/complete/TrafficRoutes.java | 13 +++-------
.../beam/examples/cookbook/TriggerExample.java | 8 +++---
7 files changed, 22 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index b32128a..882ef7c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -181,10 +181,6 @@ public class WindowedWordCount {
@Default.Integer(WINDOW_SIZE)
Integer getWindowSize();
void setWindowSize(Integer value);
-
- @Description("Whether to run the pipeline with unbounded input")
- boolean isUnbounded();
- void setUnbounded(boolean value);
}
public static void main(String[] args) throws IOException {
@@ -192,7 +188,8 @@ public class WindowedWordCount {
options.setBigQuerySchema(getSchema());
// DataflowExampleUtils creates the necessary input sources to simplify execution of this
// Pipeline.
- ExampleUtils exampleDataflowUtils = new ExampleUtils(options, options.isUnbounded());
+ ExampleUtils exampleUtils = new ExampleUtils(options);
+ exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
@@ -237,6 +234,6 @@ public class WindowedWordCount {
PipelineResult result = pipeline.run();
// dataflowUtils will try to cancel the pipeline before the program exists.
- exampleDataflowUtils.waitToFinish(result);
+ exampleUtils.waitToFinish(result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index e30b1e4..6b71b0f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -72,17 +72,12 @@ public class ExampleUtils {
private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
private List<String> pendingMessages = Lists.newArrayList();
- public ExampleUtils(DataflowPipelineOptions options) {
- this.options = options;
- }
-
/**
* Do resources and runner options setup.
*/
- public ExampleUtils(DataflowPipelineOptions options, boolean isUnbounded)
- throws IOException {
+ public ExampleUtils(DataflowPipelineOptions options) {
this.options = options;
- setupResourcesAndRunner(isUnbounded);
+ setupRunner();
}
/**
@@ -113,17 +108,6 @@ public class ExampleUtils {
}
/**
- * Set up external resources, and configure the runner appropriately.
- */
- public void setupResourcesAndRunner(boolean isUnbounded) throws IOException {
- if (isUnbounded) {
- options.setStreaming(true);
- }
- setup();
- setupRunner();
- }
-
- /**
* Sets up the Google Cloud Pub/Sub topic.
*
* <p>If the topic doesn't exist, a new topic with the given name will be created.
@@ -297,11 +281,9 @@ public class ExampleUtils {
* Do some runner setup: check that the DirectRunner is not used in conjunction with
* streaming, and if streaming is specified, use the DataflowRunner.
*/
- public void setupRunner() {
+ private void setupRunner() {
Class<? extends PipelineRunner<?>> runner = options.getRunner();
- if (options.isStreaming()
- && (runner.equals(DataflowRunner.class)
- || runner.equals(BlockingDataflowRunner.class))) {
+ if (options.isStreaming() && runner.equals(BlockingDataflowRunner.class)) {
// In order to cancel the pipelines automatically,
// {@literal DataflowRunner} is forced to be used.
options.setRunner(DataflowRunner.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index f8cd0f1..708aa87 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -451,7 +451,7 @@ public class AutoComplete {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setBigQuerySchema(FormatForBigquery.getSchema());
- ExampleUtils dataflowUtils = new ExampleUtils(options);
+ ExampleUtils exampleUtils = new ExampleUtils(options);
// We support running the same pipeline in either
// batch or windowed streaming mode.
@@ -480,7 +480,7 @@ public class AutoComplete {
options.getOutputProject(), options.getProject())));
}
if (options.getOutputToBigQuery()) {
- dataflowUtils.setupBigQueryTable();
+ exampleUtils.setupBigQueryTable();
TableReference tableRef = new TableReference();
tableRef.setProjectId(options.getProject());
@@ -502,6 +502,6 @@ public class AutoComplete {
PipelineResult result = p.run();
// dataflowUtils will try to cancel the pipeline and the injector before the program exists.
- dataflowUtils.waitToFinish(result);
+ exampleUtils.waitToFinish(result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index 046428c..df2e36e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -120,8 +120,8 @@ public class StreamingWordExtract {
options.setStreaming(true);
options.setBigQuerySchema(StringToRowConverter.getSchema());
- ExampleUtils dataflowUtils = new ExampleUtils(options);
- dataflowUtils.setup();
+ ExampleUtils exampleUtils = new ExampleUtils(options);
+ exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
@@ -141,6 +141,6 @@ public class StreamingWordExtract {
PipelineResult result = pipeline.run();
// dataflowUtils will try to cancel the pipeline before the program exists.
- dataflowUtils.waitToFinish(result);
+ exampleUtils.waitToFinish(result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 1bbc68b..ae7b8cc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -78,9 +78,6 @@ import java.util.List;
*/
public class TrafficMaxLaneFlow {
- private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
- private static final Integer VALID_INPUTS = 4999;
-
static final int WINDOW_DURATION = 60; // Default sliding window duration in minutes
static final int WINDOW_SLIDE_EVERY = 5; // Default window 'slide every' setting in minutes
@@ -323,11 +320,6 @@ public class TrafficMaxLaneFlow {
@Default.Integer(WINDOW_SLIDE_EVERY)
Integer getWindowSlideEvery();
void setWindowSlideEvery(Integer value);
-
- @Description("Whether to run the pipeline with unbounded input")
- @Default.Boolean(false)
- boolean isUnbounded();
- void setUnbounded(boolean value);
}
/**
@@ -341,7 +333,8 @@ public class TrafficMaxLaneFlow {
.as(TrafficMaxLaneFlowOptions.class);
options.setBigQuerySchema(FormatMaxesFn.getSchema());
// Using DataflowExampleUtils to set up required resources.
- ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded());
+ ExampleUtils exampleUtils = new ExampleUtils(options);
+ exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
@@ -365,7 +358,7 @@ public class TrafficMaxLaneFlow {
PipelineResult result = pipeline.run();
// dataflowUtils will try to cancel the pipeline and the injector before the program exists.
- dataflowUtils.waitToFinish(result);
+ exampleUtils.waitToFinish(result);
}
private static Integer tryIntParse(String number) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 8af0922..1a3d46d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -82,9 +82,6 @@ import java.util.Map;
public class TrafficRoutes {
- private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
- private static final Integer VALID_INPUTS = 4999;
-
// Instantiate some small predefined San Diego routes to analyze
static Map<String, String> sdStations = buildStationInfo();
static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes
@@ -333,11 +330,6 @@ public class TrafficRoutes {
@Default.Integer(WINDOW_SLIDE_EVERY)
Integer getWindowSlideEvery();
void setWindowSlideEvery(Integer value);
-
- @Description("Whether to run the pipeline with unbounded input")
- @Default.Boolean(false)
- boolean isUnbounded();
- void setUnbounded(boolean value);
}
/**
@@ -352,7 +344,8 @@ public class TrafficRoutes {
options.setBigQuerySchema(FormatStatsFn.getSchema());
// Using DataflowExampleUtils to set up required resources.
- ExampleUtils dataflowUtils = new ExampleUtils(options, options.isUnbounded());
+ ExampleUtils exampleUtils = new ExampleUtils(options);
+ exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
@@ -376,7 +369,7 @@ public class TrafficRoutes {
PipelineResult result = pipeline.run();
// dataflowUtils will try to cancel the pipeline and the injector before the program exists.
- dataflowUtils.waitToFinish(result);
+ exampleUtils.waitToFinish(result);
}
private static Double tryParseAvgSpeed(String[] inputItems) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/626240a4/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index aa91ac6..922cf23 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -433,8 +433,6 @@ public class TriggerExample {
void setWindowDuration(Integer value);
}
- private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
-
public static void main(String[] args) throws Exception {
TrafficFlowOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
@@ -443,8 +441,8 @@ public class TriggerExample {
options.setBigQuerySchema(getSchema());
- ExampleUtils dataflowUtils = new ExampleUtils(options);
- dataflowUtils.setup();
+ ExampleUtils exampleUtils = new ExampleUtils(options);
+ exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
@@ -464,7 +462,7 @@ public class TriggerExample {
PipelineResult result = pipeline.run();
// dataflowUtils will try to cancel the pipeline and the injector before the program exits.
- dataflowUtils.waitToFinish(result);
+ exampleUtils.waitToFinish(result);
}
/**
[2/2] incubator-beam git commit: Closes #612
Posted by dh...@apache.org.
Closes #612
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9632b7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9632b7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9632b7f
Branch: refs/heads/master
Commit: d9632b7f978759f65ab505e1b2a6cacac4c420c7
Parents: 994febe 626240a
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 8 14:15:04 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 14:15:04 2016 -0700
----------------------------------------------------------------------
.../apache/beam/examples/WindowedWordCount.java | 9 +++----
.../beam/examples/common/ExampleUtils.java | 26 +++-----------------
.../beam/examples/complete/AutoComplete.java | 6 ++---
.../examples/complete/StreamingWordExtract.java | 6 ++---
.../examples/complete/TrafficMaxLaneFlow.java | 13 +++-------
.../beam/examples/complete/TrafficRoutes.java | 13 +++-------
.../beam/examples/cookbook/TriggerExample.java | 8 +++---
7 files changed, 22 insertions(+), 59 deletions(-)
----------------------------------------------------------------------