You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/12 16:48:00 UTC

[1/2] incubator-beam git commit: Remove DataflowPipelineJob from examples

Repository: incubator-beam
Updated Branches:
  refs/heads/master 2bcbf379f -> def2526e4


Remove DataflowPipelineJob from examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a83738ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a83738ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a83738ba

Branch: refs/heads/master
Commit: a83738ba5fc631cc9be8c5294963e2ac2e82429d
Parents: 2bcbf37
Author: Pei He <pe...@google.com>
Authored: Mon Aug 1 13:41:59 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 12 09:47:41 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |  7 +-
 .../beam/examples/common/ExampleUtils.java      | 74 +++++---------------
 .../runners/dataflow/DataflowPipelineJob.java   | 14 +++-
 3 files changed, 31 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a83738ba/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index cdf80cb..6efbc54 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -221,11 +221,6 @@
 
     <dependency>
       <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-dataflow</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-bigquery</artifactId>
     </dependency>
 
@@ -286,6 +281,8 @@
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
       <version>${project.version}</version>
+      <scope>runtime</scope>
+      <optional>true</optional>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a83738ba/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 8b66861..7f03fc0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -17,17 +17,10 @@
  */
 package org.apache.beam.examples.common;
 
-import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.Transport;
 
@@ -44,7 +37,6 @@ import com.google.api.services.bigquery.model.DatasetReference;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.pubsub.model.Subscription;
 import com.google.api.services.pubsub.model.Topic;
@@ -71,8 +63,7 @@ public class ExampleUtils {
   private final PipelineOptions options;
   private Bigquery bigQueryClient = null;
   private Pubsub pubsubClient = null;
-  private Dataflow dataflowClient = null;
-  private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet();
+  private Set<PipelineResult> pipelinesToCancel = Sets.newHashSet();
   private List<String> pendingMessages = Lists.newArrayList();
 
   /**
@@ -80,7 +71,6 @@ public class ExampleUtils {
    */
   public ExampleUtils(PipelineOptions options) {
     this.options = options;
-    setupRunner();
   }
 
   /**
@@ -281,71 +271,46 @@ public class ExampleUtils {
   }
 
   /**
-   * Do some runner setup: check that the DirectRunner is not used in conjunction with
-   * streaming, and if streaming is specified, use the DataflowRunner.
-   */
-  private void setupRunner() {
-    Class<? extends PipelineRunner<?>> runner = options.getRunner();
-    if (options.as(StreamingOptions.class).isStreaming()
-        && runner.equals(BlockingDataflowRunner.class)) {
-      // In order to cancel the pipelines automatically,
-      // {@literal DataflowRunner} is forced to be used.
-      options.setRunner(DataflowRunner.class);
-    }
-  }
-
-  /**
    * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
    * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
    */
   public void waitToFinish(PipelineResult result) {
-    if (result instanceof DataflowPipelineJob) {
-      final DataflowPipelineJob job = (DataflowPipelineJob) result;
-      jobsToCancel.add(job);
-      if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
-        addShutdownHook(jobsToCancel);
-      }
-      try {
-        job.waitUntilFinish();
-      } catch (Exception e) {
-        throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
-      }
-    } else {
+    pipelinesToCancel.add(result);
+    if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
+      addShutdownHook(pipelinesToCancel);
+    }
+    try {
+      result.waitUntilFinish();
+    } catch (UnsupportedOperationException e) {
       // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
       // such as EvaluationResults returned by DirectRunner.
       tearDown();
       printPendingMessages();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to wait the pipeline until finish: " + result);
     }
   }
 
-  private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) {
-    if (dataflowClient == null) {
-      dataflowClient = options.as(DataflowPipelineOptions.class).getDataflowClient();
-    }
-
+  private void addShutdownHook(final Collection<PipelineResult> pipelineResults) {
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
       public void run() {
         tearDown();
         printPendingMessages();
-        for (DataflowPipelineJob job : jobs) {
-          System.out.println("Canceling example pipeline: " + job.getJobId());
+        for (PipelineResult pipelineResult : pipelineResults) {
           try {
-            job.cancel();
+            pipelineResult.cancel();
           } catch (IOException e) {
-            System.out.println("Failed to cancel the job,"
-                + " please go to the Developers Console to cancel it manually");
-            System.out.println(
-                MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+            System.out.println("Failed to cancel the job.");
+            System.out.println(e.getMessage());
           }
         }
 
-        for (DataflowPipelineJob job : jobs) {
+        for (PipelineResult pipelineResult : pipelineResults) {
           boolean cancellationVerified = false;
           for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
-            if (job.getState().isTerminal()) {
+            if (pipelineResult.getState().isTerminal()) {
               cancellationVerified = true;
-              System.out.println("Canceled example pipeline: " + job.getJobId());
               break;
             } else {
               System.out.println(
@@ -354,10 +319,7 @@ public class ExampleUtils {
             Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
           }
           if (!cancellationVerified) {
-            System.out.println("Failed to verify the cancellation for job: " + job.getJobId());
-            System.out.println("Please go to the Developers Console to verify manually:");
-            System.out.println(
-                MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId()));
+            System.out.println("Failed to verify the cancellation for job: " + pipelineResult);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a83738ba/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index e043e23..3d0f145 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -271,9 +271,17 @@ public class DataflowPipelineJob implements PipelineResult {
     content.setProjectId(projectId);
     content.setId(jobId);
     content.setRequestedState("JOB_STATE_CANCELLED");
-    dataflowOptions.getDataflowClient().projects().jobs()
-        .update(projectId, jobId, content)
-        .execute();
+    try {
+      dataflowOptions.getDataflowClient().projects().jobs()
+          .update(projectId, jobId, content)
+          .execute();
+    } catch (IOException e) {
+      String errorMsg = String.format(
+          "Failed to cancel the job, please go to the Developers Console to cancel it manually: %s",
+          MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
+      LOG.warn(errorMsg);
+      throw new IOException(errorMsg, e);
+    }
     return State.CANCELLED;
   }
 


[2/2] incubator-beam git commit: Closes #771

Posted by dh...@apache.org.
Closes #771


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/def2526e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/def2526e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/def2526e

Branch: refs/heads/master
Commit: def2526e4512ff31b727192007bf3410b69bcc5d
Parents: 2bcbf37 a83738b
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 12 09:47:43 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 12 09:47:43 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |  7 +-
 .../beam/examples/common/ExampleUtils.java      | 74 +++++---------------
 .../runners/dataflow/DataflowPipelineJob.java   | 14 +++-
 3 files changed, 31 insertions(+), 64 deletions(-)
----------------------------------------------------------------------