You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/09 03:30:01 UTC
[2/4] incubator-beam git commit: [BEAM-151] Expand javadoc in
TestPipeline explaining usage
[BEAM-151] Expand javadoc in TestPipeline explaining usage
Update TestDataflowPipelineRunner to capture all the failure messages.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e87e46bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e87e46bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e87e46bb
Branch: refs/heads/master
Commit: e87e46bb6e5a9cb2c8db7039fde3b5838bb8867a
Parents: c726cf1
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 8 15:49:53 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 8 18:29:28 2016 -0700
----------------------------------------------------------------------
.../sdk/testing/TestDataflowPipelineRunner.java | 31 ++++---
.../dataflow/sdk/testing/TestPipeline.java | 33 +++++---
.../cloud/dataflow/sdk/testing/PAssertTest.java | 4 -
.../testing/TestDataflowPipelineRunnerTest.java | 87 ++++++++++++++++----
4 files changed, 114 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e87e46bb/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
index 4bd9484..e961066 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
@@ -212,13 +212,19 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
return "TestDataflowPipelineRunner#" + options.getAppName();
}
+ /**
+ * Cancels the workflow on the first error message it sees.
+ *
+ * <p>Creates an error message representing the concatenation of all error messages seen.
+ */
private static class CancelWorkflowOnError implements JobMessagesHandler {
private final DataflowPipelineJob job;
private final JobMessagesHandler messageHandler;
- private volatile String errorMessage;
+ private final StringBuffer errorMessage;
private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
this.job = job;
this.messageHandler = messageHandler;
+ this.errorMessage = new StringBuffer();
}
@Override
@@ -227,21 +233,24 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
for (JobMessage message : messages) {
if (message.getMessageImportance() != null
&& message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- errorMessage = message.getMessageText();
- LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
- job.getJobId(), errorMessage);
- try {
- job.cancel();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
-
+ LOG.info("Dataflow job {} threw exception. Failure message was: {}",
+ job.getJobId(), message.getMessageText());
+ errorMessage.append(message.getMessageText());
+ }
+ }
+ if (errorMessage.length() > 0) {
+ LOG.info("Cancelling Dataflow job {}", job.getJobId());
+ try {
+ job.cancel();
+ } catch (Exception ignore) {
+ // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
+ // messages.
}
}
}
private String getErrorMessage() {
- return errorMessage;
+ return errorMessage.toString();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e87e46bb/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
index 9eab4b1..fe667a4 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
@@ -31,6 +31,8 @@ import com.google.common.collect.Iterators;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.experimental.categories.Category;
+
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Iterator;
@@ -39,24 +41,29 @@ import javax.annotation.Nullable;
/**
* A creator of test pipelines that can be used inside of tests that can be
- * configured to run locally or against the live service.
+ * configured to run locally or against a remote pipeline runner.
*
* <p>It is recommended to tag hand-selected tests for this purpose using the
- * RunnableOnService Category annotation, as each test run against the service
- * will spin up and tear down a single VM.
+ * {@link RunnableOnService} {@link Category} annotation, as each test run against a pipeline runner
+ * will utilize resources of that pipeline runner.
*
- * <p>In order to run tests on the dataflow pipeline service, the following
- * conditions must be met:
+ * <p>In order to run tests on a pipeline runner, the following conditions must be met:
* <ul>
- * <li> runIntegrationTestOnService System property must be set to true.
- * <li> System property "projectName" must be set to your Cloud project.
- * <li> System property "temp_gcs_directory" must be set to a valid GCS bucket.
- * <li> Jars containing the SDK and test classes must be added to the test classpath.
+ * <li>System property "runIntegrationTestOnService" must be set to true.</li>
+ * <li>System property "dataflowOptions" must contain a JSON delimited list of pipeline options.
+ * For example:
+ * <pre>{@code [
+ * "--runner=com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineRunner",
+ * "--project=mygcpproject",
+ * "--stagingLocation=gs://mygcsbucket/path"
+ * ]}</pre>
+ * Note that the set of pipeline options required is pipeline runner specific.
+ * </li>
+ * <li>Jars containing the SDK and test classes must be available on the classpath.</li>
* </ul>
*
- * <p>Use {@link PAssert} for tests, as it integrates with this test
- * harness in both direct and remote execution modes. For example:
- *
+ * <p>Use {@link PAssert} for tests, as it integrates with this test harness in both direct and
+ * remote execution modes. For example:
* <pre>{@code
* Pipeline p = TestPipeline.create();
* PCollection<Integer> output = ...
@@ -66,6 +73,8 @@ import javax.annotation.Nullable;
* p.run();
* }</pre>
*
+ * <p>For pipeline runners, it is required that they must throw an {@link AssertionError}
+ * containing the message from the {@link PAssert} that failed.
*/
public class TestPipeline extends Pipeline {
private static final String PROPERTY_DATAFLOW_OPTIONS = "dataflowOptions";
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e87e46bb/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
index cd82bd1..a1f88fe 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
@@ -45,8 +45,6 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-
/**
* Test case for {@link PAssert}.
*/
@@ -284,8 +282,6 @@ public class PAssertTest implements Serializable {
PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7);
- // The service runner does not give an exception we can usefully inspect.
- @Nullable
Throwable exc = runExpectingAssertionFailure(pipeline);
Pattern expectedPattern = Pattern.compile(
"Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e87e46bb/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
index e15e7ef..0a78a6d 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunnerTest.java
@@ -17,9 +17,13 @@
*/
package com.google.cloud.dataflow.sdk.testing;
+import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@@ -128,9 +132,6 @@ public class TestDataflowPipelineRunnerTest {
@Test
public void testRunBatchJobThatFails() throws Exception {
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("The dataflow failed.");
-
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -145,7 +146,57 @@ public class TestDataflowPipelineRunnerTest {
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- runner.run(p, mockRunner);
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testBatchPipelineFailsIfException() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenAnswer(new Answer<State>() {
+ @Override
+ public State answer(InvocationOnMock invocation) {
+ JobMessage message = new JobMessage();
+ message.setMessageText("FooException");
+ message.setTime(TimeUtil.toCloudTime(Instant.now()));
+ message.setMessageImportance("JOB_MESSAGE_ERROR");
+ ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+ .process(Arrays.asList(message));
+ return State.CANCELLED;
+ }
+ });
+
+ DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ assertThat(expected.getMessage(), containsString("FooException"));
+ verify(mockJob, atLeastOnce()).cancel();
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
}
@Test
@@ -172,9 +223,6 @@ public class TestDataflowPipelineRunnerTest {
@Test
public void testRunStreamingJobThatFails() throws Exception {
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("The dataflow failed.");
-
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
@@ -192,7 +240,14 @@ public class TestDataflowPipelineRunnerTest {
when(request.execute()).thenReturn(
generateMockMetricResponse(false /* success */, true /* tentative */));
TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- runner.run(p, mockRunner);
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
}
@Test
@@ -278,9 +333,6 @@ public class TestDataflowPipelineRunnerTest {
@Test
public void testStreamingPipelineFailsIfException() throws Exception {
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("The dataflow failed.");
-
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
@@ -311,8 +363,15 @@ public class TestDataflowPipelineRunnerTest {
when(request.execute()).thenReturn(
generateMockMetricResponse(false /* success */, true /* tentative */));
TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- runner.run(p, mockRunner);
-
- verify(mockJob).cancel();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ assertThat(expected.getMessage(), containsString("FooException"));
+ verify(mockJob, atLeastOnce()).cancel();
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
}
}