You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/12 01:34:45 UTC
[1/2] incubator-beam git commit: [BEAM-433] Change the ExampleUtils
constructor takes PipelineOptions
Repository: incubator-beam
Updated Branches:
refs/heads/master a59ddab21 -> f5a5eb34e
[BEAM-433] Change the ExampleUtils constructor takes PipelineOptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46140307
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46140307
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46140307
Branch: refs/heads/master
Commit: 4614030729bbaf2458f6c98dc41f9cde5451624c
Parents: a59ddab
Author: Pei He <pe...@google.com>
Authored: Mon Jul 11 14:22:15 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 18:34:41 2016 -0700
----------------------------------------------------------------------
.../beam/examples/common/ExampleUtils.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46140307/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 6b71b0f..ad00a14 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -24,6 +24,9 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
import org.apache.beam.sdk.util.Transport;
@@ -65,7 +68,7 @@ public class ExampleUtils {
private static final int SC_NOT_FOUND = 404;
- private final DataflowPipelineOptions options;
+ private final PipelineOptions options;
private Bigquery bigQueryClient = null;
private Pubsub pubsubClient = null;
private Dataflow dataflowClient = null;
@@ -75,7 +78,7 @@ public class ExampleUtils {
/**
* Do resources and runner options setup.
*/
- public ExampleUtils(DataflowPipelineOptions options) {
+ public ExampleUtils(PipelineOptions options) {
this.options = options;
setupRunner();
}
@@ -230,7 +233,7 @@ public class ExampleUtils {
private void setupPubsubTopic(String topic) throws IOException {
if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
+ pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
}
if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
@@ -239,7 +242,7 @@ public class ExampleUtils {
private void setupPubsubSubscription(String topic, String subscription) throws IOException {
if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
+ pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
}
if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
Subscription subInfo = new Subscription()
@@ -256,7 +259,7 @@ public class ExampleUtils {
*/
private void deletePubsubTopic(String topic) throws IOException {
if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
+ pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
}
if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
pubsubClient.projects().topics().delete(topic).execute();
@@ -270,7 +273,7 @@ public class ExampleUtils {
*/
private void deletePubsubSubscription(String subscription) throws IOException {
if (pubsubClient == null) {
- pubsubClient = Transport.newPubsubClient(options).build();
+ pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
}
if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
pubsubClient.projects().subscriptions().delete(subscription).execute();
@@ -283,7 +286,8 @@ public class ExampleUtils {
*/
private void setupRunner() {
Class<? extends PipelineRunner<?>> runner = options.getRunner();
- if (options.isStreaming() && runner.equals(BlockingDataflowRunner.class)) {
+ if (options.as(StreamingOptions.class).isStreaming()
+ && runner.equals(BlockingDataflowRunner.class)) {
// In order to cancel the pipelines automatically,
// {@literal DataflowRunner} is forced to be used.
options.setRunner(DataflowRunner.class);
@@ -316,7 +320,7 @@ public class ExampleUtils {
private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
if (dataflowClient == null) {
- dataflowClient = options.getDataflowClient();
+ dataflowClient = options.as(DataflowPipelineOptions.class).getDataflowClient();
}
Runtime.getRuntime().addShutdownHook(new Thread() {
[2/2] incubator-beam git commit: Closes #631
Posted by dh...@apache.org.
Closes #631
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5a5eb34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5a5eb34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5a5eb34
Branch: refs/heads/master
Commit: f5a5eb34e984ba361aceb3b84e5d3a229a3c1bc3
Parents: a59ddab 4614030
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jul 11 18:34:42 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 18:34:42 2016 -0700
----------------------------------------------------------------------
.../beam/examples/common/ExampleUtils.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------