You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/09 03:30:01 UTC

[2/4] incubator-beam git commit: [BEAM-151] Expand javadoc in TestPipeline explaining usage

[BEAM-151] Expand javadoc in TestPipeline explaining usage

Update TestDataflowPipelineRunner to capture all the failure messages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e87e46bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e87e46bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e87e46bb

Branch: refs/heads/master
Commit: e87e46bb6e5a9cb2c8db7039fde3b5838bb8867a
Parents: c726cf1
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 8 15:49:53 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 8 18:29:28 2016 -0700

----------------------------------------------------------------------
 .../sdk/testing/TestDataflowPipelineRunner.java | 31 ++++---
 .../dataflow/sdk/testing/TestPipeline.java      | 33 +++++---
 .../cloud/dataflow/sdk/testing/PAssertTest.java |  4 -
 .../testing/TestDataflowPipelineRunnerTest.java | 87 ++++++++++++++++----
 4 files changed, 114 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e87e46bb/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
index 4bd9484..e961066 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
@@ -212,13 +212,19 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
     return "TestDataflowPipelineRunner#" + options.getAppName();
   }
 
+  /**
+   * Cancels the workflow on the first error message it sees.
+   *
+   * <p>Creates an error message representing the concatenation of all error messages seen.
+   */
   private static class CancelWorkflowOnError implements JobMessagesHandler {
     private final DataflowPipelineJob job;
     private final JobMessagesHandler messageHandler;
-    private volatile String errorMessage;
+    private final StringBuffer errorMessage;
     private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
       this.job = job;
       this.messageHandler = messageHandler;
+      this.errorMessage = new StringBuffer();
     }
 
     @Override
@@ -227,21 +233,24 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
       for (JobMessage message : messages) {
         if (message.getMessageImportance() != null
             && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          errorMessage = message.getMessageText();
-          LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
-              job.getJobId(), errorMessage);
-          try {
-            job.cancel();
-          } catch (Exception e) {
-            throw Throwables.propagate(e);
-          }
-
+          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
+              job.getJobId(), message.getMessageText());
+          errorMessage.append(message.getMessageText());
+        }
+      }
+      if (errorMessage.length() > 0) {
+        LOG.info("Cancelling Dataflow job {}", job.getJobId());
+        try {
+          job.cancel();
+        } catch (Exception ignore) {
+          // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
+          // messages.
         }
       }
     }
 
     private String getErrorMessage() {
-      return errorMessage;
+      return errorMessage.toString();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e87e46bb/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
index 9eab4b1..fe667a4 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
@@ -31,6 +31,8 @@ import com.google.common.collect.Iterators;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.junit.experimental.categories.Category;
+
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Iterator;
@@ -39,24 +41,29 @@ import javax.annotation.Nullable;
 
 /**
  * A creator of test pipelines that can be used inside of tests that can be
- * configured to run locally or against the live service.
+ * configured to run locally or against a remote pipeline runner.
  *
  * <p>It is recommended to tag hand-selected tests for this purpose using the
- * RunnableOnService Category annotation, as each test run against the service
- * will spin up and tear down a single VM.
+ * {@link RunnableOnService} {@link Category} annotation, as each test run against a pipeline runner
+ * will utilize resources of that pipeline runner.
  *
- * <p>In order to run tests on the dataflow pipeline service, the following
- * conditions must be met:
+ * <p>In order to run tests on a pipeline runner, the following conditions must be met:
  * <ul>
- * <li> runIntegrationTestOnService System property must be set to true.
- * <li> System property "projectName" must be set to your Cloud project.
- * <li> System property "temp_gcs_directory" must be set to a valid GCS bucket.
- * <li> Jars containing the SDK and test classes must be added to the test classpath.
+ *   <li>System property "runIntegrationTestOnService" must be set to true.</li>
+ *   <li>System property "dataflowOptions" must contain a JSON delimited list of pipeline options.
+ *   For example:
+ *   <pre>{@code [
+ *     "--runner=com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineRunner",
+ *     "--project=mygcpproject",
+ *     "--stagingLocation=gs://mygcsbucket/path"
+ *     ]}</pre>
+ *     Note that the set of pipeline options required is pipeline runner specific.
+ *   </li>
+ *   <li>Jars containing the SDK and test classes must be available on the classpath.</li>
  * </ul>
  *
- * <p>Use {@link PAssert} for tests, as it integrates with this test
- * harness in both direct and remote execution modes.  For example:
- *
+ * <p>Use {@link PAssert} for tests, as it integrates with this test harness in both direct and
+ * remote execution modes. For example:
  * <pre>{@code
  * Pipeline p = TestPipeline.create();
  * PCollection<Integer> output = ...
@@ -66,6 +73,8 @@ import javax.annotation.Nullable;
  * p.run();
  * }</pre>
  *
+ * <p>For pipeline runners, it is required that they must throw an {@link AssertionError}
+ * containing the message from the {@link PAssert} that failed.
  */
 public class TestPipeline extends Pipeline {
   private static final String PROPERTY_DATAFLOW_OPTIONS = "dataflowOptions";

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e87e46bb/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
index cd82bd1..a1f88fe 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
@@ -45,8 +45,6 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.regex.Pattern;
 
-import javax.annotation.Nullable;
-
 /**
  * Test case for {@link PAssert}.
  */
@@ -284,8 +282,6 @@ public class PAssertTest implements Serializable {
 
     PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7);
 
-    // The service runner does not give an exception we can usefully inspect.
-    @Nullable
     Throwable exc = runExpectingAssertionFailure(pipeline);
     Pattern expectedPattern = Pattern.compile(
         "Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e87e46bb/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
index e15e7ef..0a78a6d 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
@@ -17,9 +17,13 @@
  */
 package com.google.cloud.dataflow.sdk.testing;
 
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
@@ -128,9 +132,6 @@ public class TestDataflowPipelineRunnerTest {
 
   @Test
   public void testRunBatchJobThatFails() throws Exception {
-    expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage("The dataflow failed.");
-
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -145,7 +146,57 @@ public class TestDataflowPipelineRunnerTest {
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
     TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    runner.run(p, mockRunner);
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testBatchPipelineFailsIfException() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, atLeastOnce()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
   }
 
   @Test
@@ -172,9 +223,6 @@ public class TestDataflowPipelineRunnerTest {
 
   @Test
   public void testRunStreamingJobThatFails() throws Exception {
-    expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage("The dataflow failed.");
-
     options.setStreaming(true);
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
@@ -192,7 +240,14 @@ public class TestDataflowPipelineRunnerTest {
     when(request.execute()).thenReturn(
         generateMockMetricResponse(false /* success */, true /* tentative */));
     TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    runner.run(p, mockRunner);
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
   }
 
   @Test
@@ -278,9 +333,6 @@ public class TestDataflowPipelineRunnerTest {
 
   @Test
   public void testStreamingPipelineFailsIfException() throws Exception {
-    expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage("The dataflow failed.");
-
     options.setStreaming(true);
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
@@ -311,8 +363,15 @@ public class TestDataflowPipelineRunnerTest {
     when(request.execute()).thenReturn(
         generateMockMetricResponse(false /* success */, true /* tentative */));
     TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    runner.run(p, mockRunner);
-
-    verify(mockJob).cancel();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, atLeastOnce()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
   }
 }