You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/08 01:49:00 UTC

[1/2] beam git commit: TestDataflowRunner: better error handling

Repository: beam
Updated Branches:
  refs/heads/master 7f55746ce -> 0c3dc0051


TestDataflowRunner: better error handling

1. There was a race in which pipelines without PAsserts might
   erroneously pass because they would be canceled, which would
   in turn cause the watermarks to reach max infinity, which
   would in turn (because there are no PAsserts) cause the main
   streaming poll loop think the pipeline succeeded.

   Fix this by making the error presence available to the main
   polling loop, and only canceling from there.

2. The fact we were canceling from two places meant we could
   get double-cancelations that led to test failures.

Fix both these issues (I hope).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/042b28a6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/042b28a6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/042b28a6

Branch: refs/heads/master
Commit: 042b28a681d1ee0c8942a0ec61df014c23a0fdaf
Parents: 7f55746
Author: Dan Halperin <dh...@google.com>
Authored: Fri Apr 7 14:56:09 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 7 18:48:50 2017 -0700

----------------------------------------------------------------------
 .../dataflow/testing/TestDataflowRunner.java    | 34 +++++++++++---------
 .../testing/TestDataflowRunnerTest.java         |  3 +-
 2 files changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/042b28a6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index d220bb0..dc32466 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -111,8 +111,8 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     assertThat(job, testPipelineOptions.getOnCreateMatcher());
 
-    CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
-        job, new MonitoringUtil.LoggingHandler());
+    final ErrorMonitorMessagesHandler messageHandler =
+        new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());
 
     try {
       final Optional<Boolean> success;
@@ -126,6 +126,10 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               for (;;) {
                 JobMetrics metrics = getJobMetrics(job);
                 Optional<Boolean> success = checkForPAssertSuccess(job, metrics);
+                if (messageHandler.hasSeenError()) {
+                  return Optional.of(false);
+                }
+
                 if (success.isPresent() && (!success.get() || atMaxWatermark(job, metrics))) {
                   // It's possible that the streaming pipeline doesn't use PAssert.
                   // So checkForSuccess() will return true before job is finished.
@@ -312,18 +316,22 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * Cancels the workflow on the first error message it sees.
+   * Monitors job log output messages for errors.
    *
    * <p>Creates an error message representing the concatenation of all error messages seen.
    */
-  private static class CancelWorkflowOnError implements JobMessagesHandler {
+  private static class ErrorMonitorMessagesHandler implements JobMessagesHandler {
     private final DataflowPipelineJob job;
     private final JobMessagesHandler messageHandler;
     private final StringBuffer errorMessage;
-    private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
+    private volatile boolean hasSeenError;
+
+    private ErrorMonitorMessagesHandler(
+        DataflowPipelineJob job, JobMessagesHandler messageHandler) {
       this.job = job;
       this.messageHandler = messageHandler;
       this.errorMessage = new StringBuffer();
+      this.hasSeenError = false;
     }
 
     @Override
@@ -335,20 +343,16 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           LOG.info("Dataflow job {} threw exception. Failure message was: {}",
               job.getJobId(), message.getMessageText());
           errorMessage.append(message.getMessageText());
+          hasSeenError = true;
         }
       }
-      if (errorMessage.length() > 0) {
-        LOG.info("Cancelling Dataflow job {}", job.getJobId());
-        try {
-          job.cancel();
-        } catch (Exception ignore) {
-          // The TestDataflowRunner will thrown an AssertionError with the job failure
-          // messages.
-        }
-      }
     }
 
-    private String getErrorMessage() {
+    boolean hasSeenError() {
+      return hasSeenError;
+    }
+
+    String getErrorMessage() {
       return errorMessage.toString();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/042b28a6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index d3eccbb..e4fa788 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -32,6 +32,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -207,7 +208,7 @@ public class TestDataflowRunnerTest {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
       assertThat(expected.getMessage(), containsString("FooException"));
-      verify(mockJob, atLeastOnce()).cancel();
+      verify(mockJob, never()).cancel();
       return;
     }
     // Note that fail throws an AssertionError which is why it is placed out here


[2/2] beam git commit: This closes #2465

Posted by dh...@apache.org.
This closes #2465


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c3dc005
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c3dc005
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c3dc005

Branch: refs/heads/master
Commit: 0c3dc0051a5e6ef4034df83b0d09467e7c55c1f0
Parents: 7f55746 042b28a
Author: Dan Halperin <dh...@google.com>
Authored: Fri Apr 7 18:48:52 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 7 18:48:52 2017 -0700

----------------------------------------------------------------------
 .../dataflow/testing/TestDataflowRunner.java    | 34 +++++++++++---------
 .../testing/TestDataflowRunnerTest.java         |  3 +-
 2 files changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------