You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/10 21:03:40 UTC

[1/4] beam git commit: Re-enable UsesTimersInParDo tests in Dataflow runner

Repository: beam
Updated Branches:
  refs/heads/master 16e4c747c -> d32213bfc


Re-enable UsesTimersInParDo tests in Dataflow runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b79c888b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b79c888b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b79c888b

Branch: refs/heads/master
Commit: b79c888b01d0c42a35ecf1b98134261237e9c83e
Parents: 1a8a217
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Feb 14 14:54:11 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 10 14:00:03 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml                      | 1 -
 .../src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java | 5 ++---
 2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b79c888b/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index fd26ae8..3809335 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -164,7 +164,6 @@
                 org.apache.beam.sdk.testing.UsesGaugeMetrics,
                 org.apache.beam.sdk.testing.UsesSetState,
                 org.apache.beam.sdk.testing.UsesMapState,
-                org.apache.beam.sdk.testing.UsesTimersInParDo,
                 org.apache.beam.sdk.testing.UsesSplittableParDo,
                 org.apache.beam.sdk.testing.UsesUnboundedPCollections,
                 org.apache.beam.sdk.testing.UsesTestStream,

http://git-wip-us.apache.org/repos/asf/beam/blob/b79c888b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index ef27f4c..35c02ba 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -54,7 +54,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -2745,7 +2744,7 @@ public class ParDoTest implements Serializable {
         };
 
     PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
-    thrown.expect(PipelineExecutionException.class);
+    thrown.expect(RuntimeException.class);
     // Note that runners can reasonably vary their message - this matcher should be flexible
     // and can be evolved.
     thrown.expectMessage("relative timers");
@@ -2775,7 +2774,7 @@ public class ParDoTest implements Serializable {
         };
 
     PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
-    thrown.expect(PipelineExecutionException.class);
+    thrown.expect(RuntimeException.class);
     // Note that runners can reasonably vary their message - this matcher should be flexible
     // and can be evolved.
     thrown.expectMessage("event time timer");


[4/4] beam git commit: This closes #2988: Fix TestDataflowRunner, PAssertTest, UsesTimersInParDo

Posted by ke...@apache.org.
This closes #2988: Fix TestDataflowRunner, PAssertTest, UsesTimersInParDo

  Re-enable UsesTimersInParDo tests in Dataflow runner
  TestDataflowRunner: throw AssertionError only when assertion known failed
  Allow any throwable in PAssert to constitute adequate failure


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d32213bf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d32213bf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d32213bf

Branch: refs/heads/master
Commit: d32213bfcdf1571f25cb381221bf798d2688b8a5
Parents: 16e4c74 b79c888
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 10 14:02:57 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 10 14:02:57 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   1 -
 .../runners/dataflow/TestDataflowRunner.java    | 254 ++++++++++++-------
 .../dataflow/TestDataflowRunnerTest.java        |  42 +--
 .../apache/beam/sdk/testing/PAssertTest.java    |   2 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |   5 +-
 5 files changed, 175 insertions(+), 129 deletions(-)
----------------------------------------------------------------------



[2/4] beam git commit: TestDataflowRunner: throw AssertionError only when assertion known failed

Posted by ke...@apache.org.
TestDataflowRunner: throw AssertionError only when assertion known failed

It is quite confusing to receive an assertion error when in fact the pipeline
has crashed because of user error interacting with e.g. timers.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a8a217b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a8a217b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a8a217b

Branch: refs/heads/master
Commit: 1a8a217bf8e291a2930c99dba58a305deea3270f
Parents: 8641675
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Apr 30 16:08:48 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 10 14:00:03 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/TestDataflowRunner.java    | 254 ++++++++++++-------
 .../dataflow/TestDataflowRunnerTest.java        |  42 +--
 2 files changed, 172 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1a8a217b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
index b81b487..1abea99 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -29,6 +29,7 @@ import com.google.common.base.Strings;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -45,8 +46,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link TestDataflowRunner} is a pipeline runner that wraps a
- * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
+ * {@link TestDataflowRunner} is a pipeline runner that wraps a {@link DataflowRunner} when running
+ * tests against the {@link TestPipeline}.
  *
  * @see TestPipeline
  */
@@ -65,16 +66,12 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     this.runner = DataflowRunner.fromOptions(options);
   }
 
-  /**
-   * Constructs a runner from the provided options.
-   */
+  /** Constructs a runner from the provided options. */
   public static TestDataflowRunner fromOptions(PipelineOptions options) {
     TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-    String tempLocation = Joiner.on("/").join(
-        dataflowOptions.getTempRoot(),
-        dataflowOptions.getJobName(),
-        "output",
-        "results");
+    String tempLocation =
+        Joiner.on("/")
+            .join(dataflowOptions.getTempRoot(), dataflowOptions.getJobName(), "output", "results");
     dataflowOptions.setTempLocation(tempLocation);
 
     return new TestDataflowRunner(
@@ -99,88 +96,115 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     final DataflowPipelineJob job;
     job = runner.run(pipeline);
 
-    LOG.info("Running Dataflow job {} with {} expected assertions.",
-        job.getJobId(), expectedNumberOfAssertions);
+    LOG.info(
+        "Running Dataflow job {} with {} expected assertions.",
+        job.getJobId(),
+        expectedNumberOfAssertions);
 
     assertThat(job, testPipelineOptions.getOnCreateMatcher());
 
-    final ErrorMonitorMessagesHandler messageHandler =
+    Boolean jobSuccess;
+    Optional<Boolean> allAssertionsPassed;
+
+    ErrorMonitorMessagesHandler messageHandler =
         new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());
 
-    try {
-      Optional<Boolean> result = Optional.absent();
-
-      if (options.isStreaming()) {
-        // In streaming, there are infinite retries, so rather than timeout
-        // we try to terminate early by polling and canceling if we see
-        // an error message
-        while (true) {
-          State state = job.waitUntilFinish(Duration.standardSeconds(3), messageHandler);
-          if (state != null && state.isTerminal()) {
-            break;
-          }
+    if (options.isStreaming()) {
+      jobSuccess = waitForStreamingJobTermination(job, messageHandler);
+      // No metrics in streaming
+      allAssertionsPassed = Optional.absent();
+    } else {
+      jobSuccess = waitForBatchJobTermination(job, messageHandler);
+      allAssertionsPassed = checkForPAssertSuccess(job);
+    }
 
-          if (messageHandler.hasSeenError()) {
-            if (!job.getState().isTerminal()) {
-              LOG.info("Cancelling Dataflow job {}", job.getJobId());
-              job.cancel();
-            }
-            break;
-          }
-        }
+    // If there is a certain assertion failure, throw the most precise exception we can.
+    // There are situations where the metric will not be available, but as long as we recover
+    // the actionable message from the logs it is acceptable.
+    if (!allAssertionsPassed.isPresent()) {
+      LOG.warn("Dataflow job {} did not output a success or failure metric.", job.getJobId());
+    } else if (!allAssertionsPassed.get()) {
+      throw new AssertionError(errorMessage(job, messageHandler));
+    }
 
-        // Whether we canceled or not, this gets the final state of the job or times out
-        State finalState =
-            job.waitUntilFinish(
-                Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler);
+    // Other failures, or jobs where metrics fell through for some reason, will manifest
+    // as simply job failures.
+    if (!jobSuccess) {
+      throw new RuntimeException(errorMessage(job, messageHandler));
+    }
 
-        // Getting the final state timed out; it may not indicate a failure.
-        // This cancellation may be the second
-        if (finalState == null || finalState == State.RUNNING) {
-          LOG.info(
-              "Dataflow job {} took longer than {} seconds to complete, cancelling.",
-              job.getJobId(),
-              options.getTestTimeoutSeconds());
-          job.cancel();
-        }
+    // If there is no reason to immediately fail, run the success matcher.
+    assertThat(job, testPipelineOptions.getOnSuccessMatcher());
+    return job;
+  }
 
-        if (messageHandler.hasSeenError()) {
-          result = Optional.of(false);
-        }
-      } else {
-        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
-        result = checkForPAssertSuccess(job);
+  /**
+   * Return {@code true} if the job succeeded or {@code false} if it terminated in any other manner.
+   */
+  private boolean waitForStreamingJobTermination(
+      final DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
+    // In streaming, there are infinite retries, so rather than timeout
+    // we try to terminate early by polling and canceling if we see
+    // an error message
+    options.getExecutorService().submit(new CancelOnError(job, messageHandler));
+
+    // Whether we canceled or not, this gets the final state of the job or times out
+    State finalState;
+    try {
+      finalState =
+          job.waitUntilFinish(
+              Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      return false;
+    }
+
+    // Getting the final state may have timed out; it may not indicate a failure.
+    // This cancellation may be the second
+    if (finalState == null || !finalState.isTerminal()) {
+      LOG.info(
+          "Dataflow job {} took longer than {} seconds to complete, cancelling.",
+          job.getJobId(),
+          options.getTestTimeoutSeconds());
+      try {
+        job.cancel();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
+      return false;
+    } else {
+      return finalState == State.DONE && !messageHandler.hasSeenError();
+    }
+  }
 
-      if (!result.isPresent()) {
-        if (options.isStreaming()) {
-          LOG.warn(
-              "Dataflow job {} did not output a success or failure metric."
-                  + " In rare situations, some PAsserts may not have run."
-                  + " This is a known limitation of Dataflow in streaming.",
-              job.getJobId());
-        } else {
-          throw new IllegalStateException(
-              String.format(
-                  "Dataflow job %s did not output a success or failure metric.", job.getJobId()));
-        }
-      } else if (!result.get()) {
-        throw new AssertionError(
-            Strings.isNullOrEmpty(messageHandler.getErrorMessage())
-                ? String.format(
-                    "Dataflow job %s terminated in state %s but did not return a failure reason.",
-                    job.getJobId(), job.getState())
-                : messageHandler.getErrorMessage());
-      } else {
-        assertThat(job, testPipelineOptions.getOnSuccessMatcher());
+  /**
+   * Return {@code true} if the job succeeded or {@code false} if it terminated in any other manner.
+   */
+  private boolean waitForBatchJobTermination(
+      DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
+    {
+      try {
+        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        return false;
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+
+      return job.getState() == State.DONE && !messageHandler.hasSeenError();
     }
-    return job;
+  }
+
+  private static String errorMessage(
+      DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
+    return Strings.isNullOrEmpty(messageHandler.getErrorMessage())
+        ? String.format(
+            "Dataflow job %s terminated in state %s but did not return a failure reason.",
+            job.getJobId(), job.getState())
+        : messageHandler.getErrorMessage();
   }
 
   @VisibleForTesting
@@ -199,19 +223,12 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
    * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used within the
    * pipeline, then this method will state that all PAsserts succeeded.
    *
-   * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed,
-   *     Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the
-   *     evidence is inconclusive.
+   * @return Optional.of(false) if we are certain a PAssert failed. Optional.of(true) if we are
+   *     certain all PAsserts passed. Optional.absent() if the evidence is inconclusive, including
+   *     when the pipeline may have failed for other reasons.
    */
   @VisibleForTesting
-  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws IOException {
-
-    // If the job failed, this is a definite failure. We only cancel jobs when they fail.
-    State state = job.getState();
-    if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state);
-      return Optional.of(false);
-    }
+  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) {
 
     JobMetrics metrics = getJobMetrics(job);
     if (metrics == null || metrics.getMetrics() == null) {
@@ -236,8 +253,12 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     if (failures > 0) {
-      LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of "
-          + "{} expected assertions.", job.getJobId(), successes, failures,
+      LOG.info(
+          "Failure result for Dataflow job {}. Found {} success, {} failures out of "
+              + "{} expected assertions.",
+          job.getJobId(),
+          successes,
+          failures,
           expectedNumberOfAssertions);
       return Optional.of(false);
     } else if (successes >= expectedNumberOfAssertions) {
@@ -251,6 +272,16 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       return Optional.of(true);
     }
 
+    // If the job failed, this is a definite failure. We only cancel jobs when they fail.
+    State state = job.getState();
+    if (state == State.FAILED || state == State.CANCELLED) {
+      LOG.info(
+          "Dataflow job {} terminated in failure state {} without reporting a failed assertion",
+          job.getJobId(),
+          state);
+      return Optional.absent();
+    }
+
     LOG.info(
         "Inconclusive results for Dataflow job {}."
             + " Found {} success, {} failures out of {} expected assertions.",
@@ -303,8 +334,10 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       for (JobMessage message : messages) {
         if (message.getMessageImportance() != null
             && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
-              job.getJobId(), message.getMessageText());
+          LOG.info(
+              "Dataflow job {} threw exception. Failure message was: {}",
+              job.getJobId(),
+              message.getMessageText());
           errorMessage.append(message.getMessageText());
           hasSeenError = true;
         }
@@ -319,4 +352,37 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       return errorMessage.toString();
     }
   }
+
+  private static class CancelOnError implements Callable<Void> {
+
+    private final DataflowPipelineJob job;
+    private final ErrorMonitorMessagesHandler messageHandler;
+
+    public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
+      this.job = job;
+      this.messageHandler = messageHandler;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      while (true) {
+        State jobState = job.getState();
+
+        // If we see an error, cancel and note failure
+        if (messageHandler.hasSeenError()) {
+          if (!job.getState().isTerminal()) {
+            job.cancel();
+            LOG.info("Cancelling Dataflow job {}", job.getJobId());
+            return null;
+          }
+        }
+
+        if (jobState.isTerminal()) {
+          return null;
+        }
+
+        Thread.sleep(3000L);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1a8a217b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
index 883d344..1c0876a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
@@ -124,6 +124,10 @@ public class TestDataflowRunnerTest {
     assertEquals(mockJob, runner.run(p, mockRunner));
   }
 
+  /**
+   * Tests that when a batch job terminates in a failure state even if all assertions
+   * passed, it throws an error to that effect.
+   */
   @Test
   public void testRunBatchJobThatFails() throws Exception {
     Pipeline p = TestPipeline.create(options);
@@ -140,12 +144,9 @@ public class TestDataflowRunnerTest {
 
     TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */));
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      return;
-    }
+        .thenReturn(generateMockMetricResponse(true /* success */, false /* tentative */));
+    expectedException.expect(RuntimeException.class);
+    runner.run(p, mockRunner);
     // Note that fail throws an AssertionError which is why it is placed out here
     // instead of inside the try-catch block.
     fail("AssertionError expected");
@@ -357,22 +358,6 @@ public class TestDataflowRunnerTest {
   }
 
   /**
-   * Tests that if a streaming pipeline terminates with FAIL that the check for PAssert
-   * success is a conclusive failure.
-   */
-  @Test
-  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    doReturn(State.FAILED).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
-  }
-
-  /**
    * Tests that if a streaming pipeline crash loops for a non-assertion reason that the test run
    * throws an {@link AssertionError}.
    *
@@ -411,12 +396,8 @@ public class TestDataflowRunnerTest {
         .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
     TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
 
-    try {
-      runner.run(pipeline, mockRunner);
-    } catch (AssertionError exc) {
-      return;
-    }
-    fail("AssertionError expected");
+    expectedException.expect(RuntimeException.class);
+    runner.run(pipeline, mockRunner);
   }
 
   @Test
@@ -581,7 +562,7 @@ public class TestDataflowRunnerTest {
    * Tests that when a streaming pipeline terminates in FAIL that the {@link
    * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is not
    * invoked.
-   */
+   */t ad
   @Test
   public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
     options.setStreaming(true);
@@ -603,8 +584,9 @@ public class TestDataflowRunnerTest {
     when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenReturn(State.FAILED);
 
+    expectedException.expect(RuntimeException.class);
     runner.run(p, mockRunner);
-    // If the onSuccessMatcher were invoked, it would have crashed here.
+    // If the onSuccessMatcher were invoked, it would have crashed here with AssertionError
   }
 
   static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements


[3/4] beam git commit: Allow any throwable in PAssert to constitute adequate failure

Posted by ke...@apache.org.
Allow any throwable in PAssert to constitute adequate failure

Currently, some PAssert tests require an AssertionError to be thrown. This
succeeds on all runners only because many gratuitously throw AssertionError
when they don't actually know that an assertion has failed.

The spec is just that a pipeline has to fail. We don't have a good enough story
around exception propagation to have such a strict - and fake - spec. And it
isn't cross-language anyhow FWIW, looking forward to the possibility of running
a PAssert in a pipeline combining multiple SDK harnesses.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86416752
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86416752
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86416752

Branch: refs/heads/master
Commit: 86416752cd023f704ec0eefbb5ecf01f677aeee5
Parents: 16e4c74
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 8 20:07:29 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 10 14:00:03 2017 -0700

----------------------------------------------------------------------
 .../src/test/java/org/apache/beam/sdk/testing/PAssertTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/86416752/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 37db4ef..491f001 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -554,7 +554,7 @@ public class PAssertTest implements Serializable {
     // is first caught by JUnit and causes a test failure.
     try {
       pipeline.run();
-    } catch (AssertionError exc) {
+    } catch (Throwable exc) {
       return exc;
     }
     fail("assertion should have failed");