You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/12 01:34:45 UTC

[1/2] incubator-beam git commit: [BEAM-433] Change the ExampleUtils constructor takes PipelineOptions

Repository: incubator-beam
Updated Branches:
  refs/heads/master a59ddab21 -> f5a5eb34e


[BEAM-433] Change the ExampleUtils constructor takes PipelineOptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46140307
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46140307
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46140307

Branch: refs/heads/master
Commit: 4614030729bbaf2458f6c98dc41f9cde5451624c
Parents: a59ddab
Author: Pei He <pe...@google.com>
Authored: Mon Jul 11 14:22:15 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 18:34:41 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/common/ExampleUtils.java      | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46140307/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 6b71b0f..ad00a14 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -24,6 +24,9 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.Transport;
@@ -65,7 +68,7 @@ public class ExampleUtils {
 
   private static final int SC_NOT_FOUND = 404;
 
-  private final DataflowPipelineOptions options;
+  private final PipelineOptions options;
   private Bigquery bigQueryClient = null;
   private Pubsub pubsubClient = null;
   private Dataflow dataflowClient = null;
@@ -75,7 +78,7 @@ public class ExampleUtils {
   /**
    * Do resources and runner options setup.
    */
-  public ExampleUtils(DataflowPipelineOptions options) {
+  public ExampleUtils(PipelineOptions options) {
     this.options = options;
     setupRunner();
   }
@@ -230,7 +233,7 @@ public class ExampleUtils {
 
   private void setupPubsubTopic(String topic) throws IOException {
     if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
+      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
     }
     if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
       pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
@@ -239,7 +242,7 @@ public class ExampleUtils {
 
   private void setupPubsubSubscription(String topic, String subscription) throws IOException {
     if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
+      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
     }
     if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
       Subscription subInfo = new Subscription()
@@ -256,7 +259,7 @@ public class ExampleUtils {
    */
   private void deletePubsubTopic(String topic) throws IOException {
     if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
+      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
     }
     if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
       pubsubClient.projects().topics().delete(topic).execute();
@@ -270,7 +273,7 @@ public class ExampleUtils {
    */
   private void deletePubsubSubscription(String subscription) throws IOException {
     if (pubsubClient == null) {
-      pubsubClient = Transport.newPubsubClient(options).build();
+      pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build();
     }
     if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
       pubsubClient.projects().subscriptions().delete(subscription).execute();
@@ -283,7 +286,8 @@ public class ExampleUtils {
    */
   private void setupRunner() {
     Class<? extends PipelineRunner<?>> runner = options.getRunner();
-    if (options.isStreaming() && runner.equals(BlockingDataflowRunner.class)) {
+    if (options.as(StreamingOptions.class).isStreaming()
+        && runner.equals(BlockingDataflowRunner.class)) {
       // In order to cancel the pipelines automatically,
       // {@literal DataflowRunner} is forced to be used.
       options.setRunner(DataflowRunner.class);
@@ -316,7 +320,7 @@ public class ExampleUtils {
 
   private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
     if (dataflowClient == null) {
-      dataflowClient = options.getDataflowClient();
+      dataflowClient = options.as(DataflowPipelineOptions.class).getDataflowClient();
     }
 
     Runtime.getRuntime().addShutdownHook(new Thread() {


[2/2] incubator-beam git commit: Closes #631

Posted by dh...@apache.org.
Closes #631


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5a5eb34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5a5eb34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5a5eb34

Branch: refs/heads/master
Commit: f5a5eb34e984ba361aceb3b84e5d3a229a3c1bc3
Parents: a59ddab 4614030
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jul 11 18:34:42 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 18:34:42 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/common/ExampleUtils.java      | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------