You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/09 03:30:02 UTC
[3/4] incubator-beam git commit: [BEAM-151] Remove dependence on
DataflowPipelineRunner in PAssert/TestPipeline
[BEAM-151] Remove dependence on DataflowPipelineRunner in PAssert/TestPipeline
Added the ability for TestDataflowPipelineRunner to throw an AssertionError
when the pipeline fails using the first job error message.
This allows for moving Dataflow to a new package.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c726cf13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c726cf13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c726cf13
Branch: refs/heads/master
Commit: c726cf1376d17489a4834780e0f4a04b6edbc603
Parents: 2ca5474
Author: Luke Cwik <lc...@google.com>
Authored: Fri Apr 8 15:07:52 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 8 18:29:28 2016 -0700
----------------------------------------------------------------------
.../sdk/testing/TestDataflowPipelineRunner.java | 74 +++++++++++++-------
.../dataflow/sdk/testing/TestPipeline.java | 15 +---
.../cloud/dataflow/sdk/testing/PAssertTest.java | 46 ++++--------
3 files changed, 65 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c726cf13/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
index 8f7374f..4bd9484 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java
@@ -42,6 +42,7 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -81,8 +82,6 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
- final JobMessagesHandler messageHandler =
- new MonitoringUtil.PrintHandler(options.getJobMessageOutput());
final DataflowPipelineJob job;
try {
job = runner.run(pipeline);
@@ -93,8 +92,12 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
LOG.info("Running Dataflow job {} with {} expected assertions.",
job.getJobId(), expectedNumberOfAssertions);
+ CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
+ job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+
try {
final Optional<Boolean> result;
+
if (options.isStreaming()) {
Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
new Callable<Optional<Boolean>>() {
@@ -114,24 +117,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
}
}
});
- State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, new JobMessagesHandler() {
- @Override
- public void process(List<JobMessage> messages) {
- messageHandler.process(messages);
- for (JobMessage message : messages) {
- if (message.getMessageImportance() != null
- && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
- job.getJobId(), message.getMessageText());
- try {
- job.cancel();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
- }
- }
- });
+ State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
if (finalState == null || finalState == State.RUNNING) {
LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
job.getJobId());
@@ -146,11 +132,18 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
throw new IllegalStateException(
"The dataflow did not output a success or failure metric.");
} else if (!result.get()) {
- throw new IllegalStateException("The dataflow failed.");
+ throw new AssertionError(messageHandler.getErrorMessage() == null ?
+ "The dataflow did not return a failure reason."
+ : messageHandler.getErrorMessage());
}
- } catch (Exception e) {
- Throwables.propagateIfPossible(e);
- throw Throwables.propagate(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause());
+ throw new RuntimeException(e.getCause());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
return job;
}
@@ -218,4 +211,37 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
public String toString() {
return "TestDataflowPipelineRunner#" + options.getAppName();
}
+
+ private static class CancelWorkflowOnError implements JobMessagesHandler {
+ private final DataflowPipelineJob job;
+ private final JobMessagesHandler messageHandler;
+ private volatile String errorMessage;
+ private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
+ this.job = job;
+ this.messageHandler = messageHandler;
+ }
+
+ @Override
+ public void process(List<JobMessage> messages) {
+ messageHandler.process(messages);
+ for (JobMessage message : messages) {
+ if (message.getMessageImportance() != null
+ && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+ errorMessage = message.getMessageText();
+ LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
+ job.getJobId(), errorMessage);
+ try {
+ job.cancel();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+
+ }
+ }
+ }
+
+ private String getErrorMessage() {
+ return errorMessage;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c726cf13/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
index 98d4823..9eab4b1 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java
@@ -24,7 +24,6 @@ import com.google.cloud.dataflow.sdk.options.GcpOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.util.TestCredential;
import com.google.common.base.Optional;
@@ -86,14 +85,6 @@ public class TestPipeline extends Pipeline {
return new TestPipeline(PipelineRunner.fromOptions(options), options);
}
- /**
- * Returns whether a {@link TestPipeline} supports dynamic work rebalancing, and thus tests
- * of dynamic work rebalancing are expected to pass.
- */
- public boolean supportsDynamicWorkRebalancing() {
- return getRunner() instanceof DataflowPipelineRunner;
- }
-
private TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options) {
super(runner, options);
}
@@ -136,11 +127,7 @@ public class TestPipeline extends Pipeline {
.as(PipelineOptions.class);
options.as(ApplicationNameOptions.class).setAppName(getAppName());
- if (isIntegrationTest()) {
- // TODO: adjust everyone's integration test frameworks to set the runner class via the
- // pipeline options via PROPERTY_DATAFLOW_OPTIONS
- options.setRunner(TestDataflowPipelineRunner.class);
- } else {
+ if (!isIntegrationTest()) {
options.as(GcpOptions.class).setGcpCredential(new TestCredential());
}
options.setStableUniqueNames(CheckEnabled.ERROR);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c726cf13/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
index 2848eaa..cd82bd1 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/PAssertTest.java
@@ -19,15 +19,12 @@ package com.google.cloud.dataflow.sdk.testing;
import static com.google.cloud.dataflow.sdk.testing.SerializableMatchers.anything;
import static com.google.cloud.dataflow.sdk.testing.SerializableMatchers.not;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
@@ -292,38 +289,23 @@ public class PAssertTest implements Serializable {
Throwable exc = runExpectingAssertionFailure(pipeline);
Pattern expectedPattern = Pattern.compile(
"Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order");
- if (exc != null) {
- // A loose pattern, but should get the job done.
- assertTrue(
- "Expected error message from PAssert with substring matching "
- + expectedPattern
- + " but the message was \""
- + exc.getMessage()
- + "\"",
- expectedPattern.matcher(exc.getMessage()).find());
- }
+ // A loose pattern, but should get the job done.
+ assertTrue(
+ "Expected error message from PAssert with substring matching "
+ + expectedPattern
+ + " but the message was \""
+ + exc.getMessage()
+ + "\"",
+ expectedPattern.matcher(exc.getMessage()).find());
}
private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
- // Even though this test will succeed or fail adequately whether local or on the service,
- // it results in a different exception depending on the runner.
- if (pipeline.getRunner() instanceof DirectPipelineRunner) {
- // We cannot use thrown.expect(AssertionError.class) because the AssertionError
- // is first caught by JUnit and causes a test failure.
- try {
- pipeline.run();
- } catch (AssertionError exc) {
- return exc;
- }
- } else if (pipeline.getRunner() instanceof TestDataflowPipelineRunner) {
- // Separately, if this is run on the service, then the TestDataflowPipelineRunner throws
- // an IllegalStateException with a basic message.
- try {
- pipeline.run();
- } catch (IllegalStateException exc) {
- assertThat(exc.getMessage(), containsString("The dataflow failed."));
- return null;
- }
+ // We cannot use thrown.expect(AssertionError.class) because the AssertionError
+ // is first caught by JUnit and causes a test failure.
+ try {
+ pipeline.run();
+ } catch (AssertionError exc) {
+ return exc;
}
fail("assertion should have failed");
throw new RuntimeException("unreachable");