You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:42 UTC
[49/50] [abbrv] incubator-beam git commit: Remove Pipeline from
TestDataflowPipelineRunner
Remove Pipeline from TestDataflowPipelineRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a24e5570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a24e5570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a24e5570
Branch: refs/heads/python-sdk
Commit: a24e557090c7fb59846b332c0c9df9d49565c808
Parents: 90d0bcf
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 17 16:36:22 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:31 2016 -0700
----------------------------------------------------------------------
examples/java/README.md | 4 +-
.../beam/runners/flink/examples/TFIDF.java | 2 +-
.../testing/TestDataflowPipelineRunner.java | 271 -------------------
.../dataflow/testing/TestDataflowRunner.java | 271 +++++++++++++++++++
.../testing/TestDataflowRunnerTest.java | 40 +--
5 files changed, 294 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a24e5570/examples/java/README.md
----------------------------------------------------------------------
diff --git a/examples/java/README.md b/examples/java/README.md
index ef3cf07..2b5edf5 100644
--- a/examples/java/README.md
+++ b/examples/java/README.md
@@ -64,7 +64,7 @@ the same pipeline on fully managed resources in Google Cloud Platform:
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=<YOUR CLOUD PLATFORM PROJECT ID> \
--tempLocation=<YOUR CLOUD STORAGE LOCATION> \
- --runner=BlockingDataflowPipelineRunner"
+ --runner=BlockingDataflowRunner"
Make sure to use your project id, not the project number or the descriptive name.
The Cloud Storage location should be entered in the form of
@@ -86,7 +86,7 @@ Platform:
org.apache.beam.examples.WordCount \
--project=<YOUR CLOUD PLATFORM PROJECT ID> \
--tempLocation=<YOUR CLOUD STORAGE LOCATION> \
- --runner=BlockingDataflowPipelineRunner
+ --runner=BlockingDataflowRunner
Other examples can be run similarly by replacing the `WordCount` class path with the example classpath, e.g.
`org.apache.beam.examples.cookbook.BigQueryTornadoes`,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a24e5570/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 0afde0a..876ecde 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -82,7 +82,7 @@ import java.util.Set;
* <pre>{@code
* --project=YOUR_PROJECT_ID
* --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
+ * --runner=BlockingDataflowRunner
* and an output prefix on GCS:
* --output=gs://YOUR_OUTPUT_PREFIX
* }</pre>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a24e5570/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
deleted file mode 100644
index f83a139..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
- * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
- private static final String TENTATIVE_COUNTER = "tentative";
- private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
-
- private final TestDataflowPipelineOptions options;
- private final DataflowRunner runner;
- private int expectedNumberOfAssertions = 0;
-
- TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
- this.options = options;
- this.runner = DataflowRunner.fromOptions(options);
- }
-
- /**
- * Constructs a runner from the provided options.
- */
- public static TestDataflowPipelineRunner fromOptions(
- PipelineOptions options) {
- TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
- dataflowOptions.setStagingLocation(Joiner.on("/").join(
- new String[]{dataflowOptions.getTempRoot(),
- dataflowOptions.getJobName(), "output", "results"}));
-
- return new TestDataflowPipelineRunner(dataflowOptions);
- }
-
- @Override
- public DataflowPipelineJob run(Pipeline pipeline) {
- return run(pipeline, runner);
- }
-
- DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
-
- TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
- final DataflowPipelineJob job;
- try {
- job = runner.run(pipeline);
- } catch (DataflowJobExecutionException ex) {
- throw new IllegalStateException("The dataflow failed.");
- }
-
- LOG.info("Running Dataflow job {} with {} expected assertions.",
- job.getJobId(), expectedNumberOfAssertions);
-
- assertThat(job, testPipelineOptions.getOnCreateMatcher());
-
- CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
- job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
-
- try {
- final Optional<Boolean> result;
-
- if (options.isStreaming()) {
- Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
- new Callable<Optional<Boolean>>() {
- @Override
- public Optional<Boolean> call() throws Exception {
- try {
- for (;;) {
- Optional<Boolean> result = checkForSuccess(job);
- if (result.isPresent()) {
- return result;
- }
- Thread.sleep(10000L);
- }
- } finally {
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- job.cancel();
- }
- }
- });
- State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
- if (finalState == null || finalState == State.RUNNING) {
- LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
- job.getJobId());
- job.cancel();
- }
- result = resultFuture.get();
- } else {
- job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
- result = checkForSuccess(job);
- }
- if (!result.isPresent()) {
- throw new IllegalStateException(
- "The dataflow did not output a success or failure metric.");
- } else if (!result.get()) {
- throw new AssertionError(messageHandler.getErrorMessage() == null
- ? "The dataflow did not return a failure reason."
- : messageHandler.getErrorMessage());
- } else {
- assertThat(job, testPipelineOptions.getOnSuccessMatcher());
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- Throwables.propagateIfPossible(e.getCause());
- throw new RuntimeException(e.getCause());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return job;
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- if (transform instanceof PAssert.OneSideInputAssert
- || transform instanceof PAssert.GroupThenAssert
- || transform instanceof PAssert.GroupThenAssertForSingleton) {
- expectedNumberOfAssertions += 1;
- }
-
- return runner.apply(transform, input);
- }
-
- Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
- throws IOException {
- State state = job.getState();
- if (state == State.FAILED || state == State.CANCELLED) {
- LOG.info("The pipeline failed");
- return Optional.of(false);
- }
-
- JobMetrics metrics = job.getDataflowClient().projects().jobs()
- .getMetrics(job.getProjectId(), job.getJobId()).execute();
-
- if (metrics == null || metrics.getMetrics() == null) {
- LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
- } else {
- int successes = 0;
- int failures = 0;
- for (MetricUpdate metric : metrics.getMetrics()) {
- if (metric.getName() == null || metric.getName().getContext() == null
- || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
- // Don't double count using the non-tentative version of the metric.
- continue;
- }
- if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
- successes += ((BigDecimal) metric.getScalar()).intValue();
- } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
- failures += ((BigDecimal) metric.getScalar()).intValue();
- }
- }
-
- if (failures > 0) {
- LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
- + "{} expected assertions.", job.getJobId(), successes, failures,
- expectedNumberOfAssertions);
- return Optional.of(false);
- } else if (successes >= expectedNumberOfAssertions) {
- LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
- + "{} expected assertions.", job.getJobId(), successes, failures,
- expectedNumberOfAssertions);
- return Optional.of(true);
- }
-
- LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
- + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
- }
-
- return Optional.<Boolean>absent();
- }
-
- @Override
- public String toString() {
- return "TestDataflowPipelineRunner#" + options.getAppName();
- }
-
- /**
- * Cancels the workflow on the first error message it sees.
- *
- * <p>Creates an error message representing the concatenation of all error messages seen.
- */
- private static class CancelWorkflowOnError implements JobMessagesHandler {
- private final DataflowPipelineJob job;
- private final JobMessagesHandler messageHandler;
- private final StringBuffer errorMessage;
- private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
- this.job = job;
- this.messageHandler = messageHandler;
- this.errorMessage = new StringBuffer();
- }
-
- @Override
- public void process(List<JobMessage> messages) {
- messageHandler.process(messages);
- for (JobMessage message : messages) {
- if (message.getMessageImportance() != null
- && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- LOG.info("Dataflow job {} threw exception. Failure message was: {}",
- job.getJobId(), message.getMessageText());
- errorMessage.append(message.getMessageText());
- }
- }
- if (errorMessage.length() > 0) {
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- try {
- job.cancel();
- } catch (Exception ignore) {
- // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure
- // messages.
- }
- }
- }
-
- private String getErrorMessage() {
- return errorMessage.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a24e5570/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
new file mode 100644
index 0000000..19a2178
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link TestDataflowRunner} is a pipeline runner that wraps a
+ * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
+ *
+ * @see TestPipeline
+ */
+public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+ private static final String TENTATIVE_COUNTER = "tentative";
+ private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
+
+ private final TestDataflowPipelineOptions options;
+ private final DataflowRunner runner;
+ private int expectedNumberOfAssertions = 0;
+
+ TestDataflowRunner(TestDataflowPipelineOptions options) {
+ this.options = options;
+ this.runner = DataflowRunner.fromOptions(options);
+ }
+
+ /**
+ * Constructs a runner from the provided options.
+ */
+ public static TestDataflowRunner fromOptions(
+ PipelineOptions options) {
+ TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
+ dataflowOptions.setStagingLocation(Joiner.on("/").join(
+ new String[]{dataflowOptions.getTempRoot(),
+ dataflowOptions.getJobName(), "output", "results"}));
+
+ return new TestDataflowRunner(dataflowOptions);
+ }
+
+ @Override
+ public DataflowPipelineJob run(Pipeline pipeline) {
+ return run(pipeline, runner);
+ }
+
+ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
+
+ TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
+ final DataflowPipelineJob job;
+ try {
+ job = runner.run(pipeline);
+ } catch (DataflowJobExecutionException ex) {
+ throw new IllegalStateException("The dataflow failed.");
+ }
+
+ LOG.info("Running Dataflow job {} with {} expected assertions.",
+ job.getJobId(), expectedNumberOfAssertions);
+
+ assertThat(job, testPipelineOptions.getOnCreateMatcher());
+
+ CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
+ job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+
+ try {
+ final Optional<Boolean> result;
+
+ if (options.isStreaming()) {
+ Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit(
+ new Callable<Optional<Boolean>>() {
+ @Override
+ public Optional<Boolean> call() throws Exception {
+ try {
+ for (;;) {
+ Optional<Boolean> result = checkForSuccess(job);
+ if (result.isPresent()) {
+ return result;
+ }
+ Thread.sleep(10000L);
+ }
+ } finally {
+ LOG.info("Cancelling Dataflow job {}", job.getJobId());
+ job.cancel();
+ }
+ }
+ });
+ State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
+ if (finalState == null || finalState == State.RUNNING) {
+ LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
+ job.getJobId());
+ job.cancel();
+ }
+ result = resultFuture.get();
+ } else {
+ job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
+ result = checkForSuccess(job);
+ }
+ if (!result.isPresent()) {
+ throw new IllegalStateException(
+ "The dataflow did not output a success or failure metric.");
+ } else if (!result.get()) {
+ throw new AssertionError(messageHandler.getErrorMessage() == null
+ ? "The dataflow did not return a failure reason."
+ : messageHandler.getErrorMessage());
+ } else {
+ assertThat(job, testPipelineOptions.getOnSuccessMatcher());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause());
+ throw new RuntimeException(e.getCause());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return job;
+ }
+
+ @Override
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
+ if (transform instanceof PAssert.OneSideInputAssert
+ || transform instanceof PAssert.GroupThenAssert
+ || transform instanceof PAssert.GroupThenAssertForSingleton) {
+ expectedNumberOfAssertions += 1;
+ }
+
+ return runner.apply(transform, input);
+ }
+
+ Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
+ throws IOException {
+ State state = job.getState();
+ if (state == State.FAILED || state == State.CANCELLED) {
+ LOG.info("The pipeline failed");
+ return Optional.of(false);
+ }
+
+ JobMetrics metrics = job.getDataflowClient().projects().jobs()
+ .getMetrics(job.getProjectId(), job.getJobId()).execute();
+
+ if (metrics == null || metrics.getMetrics() == null) {
+ LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
+ } else {
+ int successes = 0;
+ int failures = 0;
+ for (MetricUpdate metric : metrics.getMetrics()) {
+ if (metric.getName() == null || metric.getName().getContext() == null
+ || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
+ // Don't double count using the non-tentative version of the metric.
+ continue;
+ }
+ if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
+ successes += ((BigDecimal) metric.getScalar()).intValue();
+ } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
+ failures += ((BigDecimal) metric.getScalar()).intValue();
+ }
+ }
+
+ if (failures > 0) {
+ LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+ + "{} expected assertions.", job.getJobId(), successes, failures,
+ expectedNumberOfAssertions);
+ return Optional.of(false);
+ } else if (successes >= expectedNumberOfAssertions) {
+ LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+ + "{} expected assertions.", job.getJobId(), successes, failures,
+ expectedNumberOfAssertions);
+ return Optional.of(true);
+ }
+
+ LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
+ + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
+ }
+
+ return Optional.<Boolean>absent();
+ }
+
+ @Override
+ public String toString() {
+ return "TestDataflowRunner#" + options.getAppName();
+ }
+
+ /**
+ * Cancels the workflow on the first error message it sees.
+ *
+ * <p>Creates an error message representing the concatenation of all error messages seen.
+ */
+ private static class CancelWorkflowOnError implements JobMessagesHandler {
+ private final DataflowPipelineJob job;
+ private final JobMessagesHandler messageHandler;
+ private final StringBuffer errorMessage;
+ private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) {
+ this.job = job;
+ this.messageHandler = messageHandler;
+ this.errorMessage = new StringBuffer();
+ }
+
+ @Override
+ public void process(List<JobMessage> messages) {
+ messageHandler.process(messages);
+ for (JobMessage message : messages) {
+ if (message.getMessageImportance() != null
+ && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+ LOG.info("Dataflow job {} threw exception. Failure message was: {}",
+ job.getJobId(), message.getMessageText());
+ errorMessage.append(message.getMessageText());
+ }
+ }
+ if (errorMessage.length() > 0) {
+ LOG.info("Cancelling Dataflow job {}", job.getJobId());
+ try {
+ job.cancel();
+ } catch (Exception ignore) {
+ // The TestDataflowRunner will thrown an AssertionError with the job failure
+ // messages.
+ }
+ }
+ }
+
+ private String getErrorMessage() {
+ return errorMessage.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a24e5570/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 4067f08..cd99643 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -85,7 +85,7 @@ import java.math.BigDecimal;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-/** Tests for {@link TestDataflowPipelineRunner}. */
+/** Tests for {@link TestDataflowRunner}. */
@RunWith(JUnit4.class)
public class TestDataflowRunnerTest {
@Rule public ExpectedException expectedException = ExpectedException.none();
@@ -110,14 +110,14 @@ public class TestDataflowRunnerTest {
options.setTempRoot("gs://test");
options.setGcpCredential(new TestCredential());
options.setDataflowClient(service);
- options.setRunner(TestDataflowPipelineRunner.class);
+ options.setRunner(TestDataflowRunner.class);
options.setPathValidatorClass(NoopPathValidator.class);
}
@Test
public void testToString() {
- assertEquals("TestDataflowPipelineRunner#TestAppName",
- new TestDataflowPipelineRunner(options).toString());
+ assertEquals("TestDataflowRunner#TestAppName",
+ new TestDataflowRunner(options).toString());
}
@Test
@@ -135,7 +135,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
when(request.execute()).thenReturn(
generateMockMetricResponse(true /* success */, true /* tentative */));
assertEquals(mockJob, runner.run(p, mockRunner));
@@ -156,7 +156,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
@@ -197,7 +197,7 @@ public class TestDataflowRunnerTest {
when(request.execute()).thenReturn(
generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
@@ -228,7 +228,7 @@ public class TestDataflowRunnerTest {
when(request.execute()).thenReturn(
generateMockMetricResponse(true /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
runner.run(p, mockRunner);
}
@@ -250,7 +250,7 @@ public class TestDataflowRunnerTest {
when(request.execute()).thenReturn(
generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
@@ -269,7 +269,7 @@ public class TestDataflowRunnerTest {
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
when(request.execute()).thenReturn(
generateMockMetricResponse(true /* success */, true /* tentative */));
doReturn(State.DONE).when(job).getState();
@@ -284,7 +284,7 @@ public class TestDataflowRunnerTest {
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
when(request.execute()).thenReturn(
generateMockMetricResponse(false /* success */, true /* tentative */));
doReturn(State.DONE).when(job).getState();
@@ -299,7 +299,7 @@ public class TestDataflowRunnerTest {
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
when(request.execute()).thenReturn(
generateMockMetricResponse(true /* success */, false /* tentative */));
doReturn(State.RUNNING).when(job).getState();
@@ -335,7 +335,7 @@ public class TestDataflowRunnerTest {
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
when(request.execute()).thenReturn(
generateMockMetricResponse(true /* success */, false /* tentative */));
doReturn(State.FAILED).when(job).getState();
@@ -373,7 +373,7 @@ public class TestDataflowRunnerTest {
when(request.execute()).thenReturn(
generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
@@ -401,7 +401,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
p.getOptions().as(TestPipelineOptions.class)
.setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
@@ -426,7 +426,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
p.getOptions().as(TestPipelineOptions.class)
.setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
@@ -453,7 +453,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
@@ -478,7 +478,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
@@ -505,7 +505,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestFailureMatcher());
@@ -537,7 +537,7 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestFailureMatcher());