You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/09 03:30:02 UTC

[3/4] incubator-beam git commit: [BEAM-151] Remove dependence on DataflowPipelineRunner in PAssert/TestPipeline

[BEAM-151] Remove dependence on DataflowPipelineRunner in PAssert/TestPipeline

Added the ability for TestDataflowPipelineRunner to throw an AssertionError
when the pipeline fails using the first job error message.

This allows for moving Dataflow to a new package.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c726cf13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c726cf13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c726cf13

Branch: refs/heads/master
Commit: c726cf1376d17489a4834780e0f4a04b6edbc603
Parents: 2ca5474
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 8 15:07:52 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 8 18:29:28 2016 -0700

----------------------------------------------------------------------
 .../sdk/testing/TestDataflowPipelineRunner.java | 74 +++++++++++++-------
 .../dataflow/sdk/testing/TestPipeline.java      | 15 +---
 .../cloud/dataflow/sdk/testing/PAssertTest.java | 46 ++++--------
 3 files changed, 65 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c726cf13/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
index 8f7374f..4bd9484 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -81,8 +82,6 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
 
   DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
 
-    final JobMessagesHandler messageHandler =
-        new MonitoringUtil.PrintHandler(options.getJobMessageOutput());
     final DataflowPipelineJob job;
     try {
       job = runner.run(pipeline);
@@ -93,8 +92,12 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
     LOG.info("Running Dataflow job {} with {} expected assertions.",
         job.getJobId(), expectedNumberOfAssertions);
 
+    CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
+        job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+
     try {
       final Optional<Boolean> result;
+
       if (options.isStreaming()) {
         Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
             new Callable<Optional<Boolean>>() {
@@ -114,24 +117,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
             }
           }
         });
-        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, new JobMessagesHandler() {
-            @Override
-            public void process(List<JobMessage> messages) {
-              messageHandler.process(messages);
-              for (JobMessage message : messages) {
-                if (message.getMessageImportance() != null
-                    && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-                  LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
-                      job.getJobId(), message.getMessageText());
-                  try {
-                    job.cancel();
-                  } catch (Exception e) {
-                    throw Throwables.propagate(e);
-                  }
-                }
-              }
-            }
-          });
+        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
         if (finalState == null || finalState == State.RUNNING) {
           LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
               job.getJobId());
@@ -146,11 +132,18 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
         throw new IllegalStateException(
             "The dataflow did not output a success or failure metric.");
       } else if (!result.get()) {
-        throw new IllegalStateException("The dataflow failed.");
+        throw new AssertionError(messageHandler.getErrorMessage() == null ?
+            "The dataflow did not return a failure reason."
+            : messageHandler.getErrorMessage());
       }
-    } catch (Exception e) {
-      Throwables.propagateIfPossible(e);
-      throw Throwables.propagate(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause());
+      throw new RuntimeException(e.getCause());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
     return job;
   }
@@ -218,4 +211,37 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
   public String toString() {
     return "TestDataflowPipelineRunner#" + options.getAppName();
   }
+
+  private static class CancelWorkflowOnError implements JobMessagesHandler {
+    private final DataflowPipelineJob job;
+    private final JobMessagesHandler messageHandler;
+    private volatile String errorMessage;
+    private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
+      this.job = job;
+      this.messageHandler = messageHandler;
+    }
+
+    @Override
+    public void process(List<JobMessage> messages) {
+      messageHandler.process(messages);
+      for (JobMessage message : messages) {
+        if (message.getMessageImportance() != null
+            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+          errorMessage = message.getMessageText();
+          LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
+              job.getJobId(), errorMessage);
+          try {
+            job.cancel();
+          } catch (Exception e) {
+            throw Throwables.propagate(e);
+          }
+
+        }
+      }
+    }
+
+    private String getErrorMessage() {
+      return errorMessage;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c726cf13/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
index 98d4823..9eab4b1 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
@@ -24,7 +24,6 @@ import com.google.cloud.dataflow.sdk.options.GcpOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 import com.google.cloud.dataflow.sdk.util.TestCredential;
 import com.google.common.base.Optional;
@@ -86,14 +85,6 @@ public class TestPipeline extends Pipeline {
     return new TestPipeline(PipelineRunner.fromOptions(options), options);
   }
 
-  /**
-   * Returns whether a {@link TestPipeline} supports dynamic work rebalancing, and thus tests
-   * of dynamic work rebalancing are expected to pass.
-   */
-  public boolean supportsDynamicWorkRebalancing() {
-    return getRunner() instanceof DataflowPipelineRunner;
-  }
-
   private TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options) {
     super(runner, options);
   }
@@ -136,11 +127,7 @@ public class TestPipeline extends Pipeline {
                   .as(PipelineOptions.class);
 
       options.as(ApplicationNameOptions.class).setAppName(getAppName());
-      if (isIntegrationTest()) {
-        // TODO: adjust everyone's integration test frameworks to set the runner class via the
-        // pipeline options via PROPERTY_DATAFLOW_OPTIONS
-        options.setRunner(TestDataflowPipelineRunner.class);
-      } else {
+      if (!isIntegrationTest()) {
         options.as(GcpOptions.class).setGcpCredential(new TestCredential());
       }
       options.setStableUniqueNames(CheckEnabled.ERROR);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c726cf13/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
index 2848eaa..cd82bd1 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
@@ -19,15 +19,12 @@ package com.google.cloud.dataflow.sdk.testing;
 
 import static com.google.cloud.dataflow.sdk.testing.SerializableMatchers.anything;
 import static com.google.cloud.dataflow.sdk.testing.SerializableMatchers.not;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
 import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
 import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
@@ -292,38 +289,23 @@ public class PAssertTest implements Serializable {
     Throwable exc = runExpectingAssertionFailure(pipeline);
     Pattern expectedPattern = Pattern.compile(
         "Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order");
-    if (exc != null) {
-      // A loose pattern, but should get the job done.
-      assertTrue(
-          "Expected error message from PAssert with substring matching "
-              + expectedPattern
-              + " but the message was \""
-              + exc.getMessage()
-              + "\"",
-          expectedPattern.matcher(exc.getMessage()).find());
-    }
+    // A loose pattern, but should get the job done.
+    assertTrue(
+        "Expected error message from PAssert with substring matching "
+            + expectedPattern
+            + " but the message was \""
+            + exc.getMessage()
+            + "\"",
+        expectedPattern.matcher(exc.getMessage()).find());
   }
 
   private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
-    // Even though this test will succeed or fail adequately whether local or on the service,
-    // it results in a different exception depending on the runner.
-    if (pipeline.getRunner() instanceof DirectPipelineRunner) {
-      // We cannot use thrown.expect(AssertionError.class) because the AssertionError
-      // is first caught by JUnit and causes a test failure.
-      try {
-        pipeline.run();
-      } catch (AssertionError exc) {
-        return exc;
-      }
-    } else if (pipeline.getRunner() instanceof TestDataflowPipelineRunner) {
-      // Separately, if this is run on the service, then the TestDataflowPipelineRunner throws
-      // an IllegalStateException with a basic message.
-      try {
-        pipeline.run();
-      } catch (IllegalStateException exc) {
-        assertThat(exc.getMessage(), containsString("The dataflow failed."));
-        return null;
-      }
+    // We cannot use thrown.expect(AssertionError.class) because the AssertionError
+    // is first caught by JUnit and causes a test failure.
+    try {
+      pipeline.run();
+    } catch (AssertionError exc) {
+      return exc;
     }
     fail("assertion should have failed");
     throw new RuntimeException("unreachable");