You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/27 17:28:32 UTC

[44/50] [abbrv] incubator-beam git commit: Remove checked exceptions from PipelineResult.waitUntilFinish interface.

Remove checked exceptions from PipelineResult.waitUntilFinish interface.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1559a7f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1559a7f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1559a7f4

Branch: refs/heads/python-sdk
Commit: 1559a7f40fb96d3c22de93391792a298f9a84480
Parents: 09a80ec
Author: Pei He <pe...@google.com>
Authored: Fri Oct 21 17:38:21 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Oct 26 14:47:10 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/DirectRunner.java   |  2 +-
 .../runners/dataflow/BlockingDataflowRunner.java   | 13 +------------
 .../beam/runners/dataflow/DataflowPipelineJob.java | 17 +++++++++++++----
 .../spark/translation/EvaluationContext.java       |  6 ++----
 .../streaming/StreamingEvaluationContext.java      |  6 ++----
 .../java/org/apache/beam/sdk/PipelineResult.java   | 10 ++--------
 6 files changed, 21 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1559a7f4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index e02c8a6..44d1986 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -431,7 +431,7 @@ public class DirectRunner
     }
 
     @Override
-    public State waitUntilFinish(Duration duration) throws IOException {
+    public State waitUntilFinish(Duration duration) {
       throw new UnsupportedOperationException(
           "DirectPipelineResult does not support waitUntilFinish with a Duration parameter. See"
               + " BEAM-596.");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1559a7f4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
index d265361..5285ade 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.dataflow;
 
-import java.io.IOException;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -111,17 +110,7 @@ public class BlockingDataflowRunner extends
       Runtime.getRuntime().addShutdownHook(shutdownHook);
 
       @Nullable
-      State result;
-      try {
-        result = job.waitUntilFinish(Duration.standardSeconds(BUILTIN_JOB_TIMEOUT_SEC));
-      } catch (IOException | InterruptedException ex) {
-        if (ex instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex);
-        throw new DataflowServiceException(
-            job, "Exception caught while retrieving status for job " + job.getJobId(), ex);
-      }
+      State result = job.waitUntilFinish(Duration.standardSeconds(BUILTIN_JOB_TIMEOUT_SEC));
 
       if (result == null) {
         throw new DataflowServiceException(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1559a7f4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index bbcf11f..c3be192 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -167,15 +167,24 @@ public class DataflowPipelineJob implements PipelineResult {
 
   @Override
   @Nullable
-  public State waitUntilFinish() throws IOException, InterruptedException {
+  public State waitUntilFinish() {
     return waitUntilFinish(Duration.millis(-1));
   }
 
   @Override
   @Nullable
-  public State waitUntilFinish(Duration duration)
-          throws IOException, InterruptedException {
-    return waitUntilFinish(duration, new MonitoringUtil.LoggingHandler());
+  public State waitUntilFinish(Duration duration) {
+    try {
+      return waitUntilFinish(duration, new MonitoringUtil.LoggingHandler());
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
+      }
+      throw new RuntimeException(e);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1559a7f4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index c1c65dd..6ccec85 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -301,14 +301,12 @@ public class EvaluationContext implements EvaluationResult {
   }
 
   @Override
-  public State waitUntilFinish()
-      throws IOException, InterruptedException {
+  public State waitUntilFinish() {
     return waitUntilFinish(Duration.millis(-1));
   }
 
   @Override
-  public State waitUntilFinish(Duration duration)
-      throws IOException, InterruptedException {
+  public State waitUntilFinish(Duration duration) {
     // This is no-op, since Spark runner in batch is blocking.
     // It needs to be updated once SparkRunner supports non-blocking execution:
     // https://issues.apache.org/jira/browse/BEAM-595

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1559a7f4/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
index 49afa26..bfba316 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
@@ -212,15 +212,13 @@ public class StreamingEvaluationContext extends EvaluationContext {
   }
 
   @Override
-  public State waitUntilFinish()
-      throws IOException, InterruptedException {
+  public State waitUntilFinish() {
     throw new UnsupportedOperationException(
         "Spark runner StreamingEvaluationContext does not support waitUntilFinish.");
   }
 
   @Override
-  public State waitUntilFinish(Duration duration)
-      throws IOException, InterruptedException {
+  public State waitUntilFinish(Duration duration) {
     throw new UnsupportedOperationException(
         "Spark runner StreamingEvaluationContext does not support waitUntilFinish.");
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1559a7f4/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index cd09c4d..35f11eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -52,23 +52,17 @@ public interface PipelineResult {
    *     Provide a value less than 1 ms for an infinite wait.
    *
    * @return The final state of the pipeline or null on timeout.
-   * @throws IOException If there is a persistent problem getting job
-   *   information.
-   * @throws InterruptedException if the thread is interrupted.
    * @throws UnsupportedOperationException if the runner does not support cancellation.
    */
-  State waitUntilFinish(Duration duration) throws IOException, InterruptedException;
+  State waitUntilFinish(Duration duration);
 
   /**
    * Waits until the pipeline finishes and returns the final status.
    *
    * @return The final state of the pipeline.
-   * @throws IOException If there is a persistent problem getting job
-   *   information.
-   * @throws InterruptedException if the thread is interrupted.
    * @throws UnsupportedOperationException if the runner does not support cancellation.
    */
-  State waitUntilFinish() throws IOException, InterruptedException;
+  State waitUntilFinish();
 
   /**
    * Retrieves the current value of the provided {@link Aggregator}.