You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/28 00:10:48 UTC
[2/5] incubator-beam git commit: Fix checkMaxWatermark causing batch
test failed
Fix checkMaxWatermark causing batch test failed
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f0588a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f0588a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f0588a2
Branch: refs/heads/master
Commit: 9f0588a2d63dd8538675b128a488ea5fa9c491f2
Parents: f6bd47b
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Wed Sep 7 11:59:02 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 27 17:03:56 2016 -0700
----------------------------------------------------------------------
.../dataflow/testing/TestDataflowRunner.java | 20 +++-------
.../testing/TestDataflowRunnerTest.java | 40 ++++++++++++++------
2 files changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f0588a2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index c569cd4..b8b4eaf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -120,11 +120,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
try {
for (;;) {
Optional<Boolean> result = checkForSuccess(job);
- if (result.isPresent()) {
- return result;
- }
- result = checkMaxWatermark(job);
- if (result.isPresent()) {
+ if (result.isPresent() && (!result.get() || checkMaxWatermark(job))) {
return result;
}
Thread.sleep(10000L);
@@ -217,7 +213,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+ "{} expected assertions.", job.getJobId(), successes, failures,
expectedNumberOfAssertions);
return Optional.of(false);
- } else if (successes > 0 && successes >= expectedNumberOfAssertions) {
+ } else if (successes >= expectedNumberOfAssertions) {
LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+ "{} expected assertions.", job.getJobId(), successes, failures,
expectedNumberOfAssertions);
@@ -231,13 +227,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
return Optional.<Boolean>absent();
}
- Optional<Boolean> checkMaxWatermark(DataflowPipelineJob job) throws IOException {
- State state = job.getState();
- if (state == State.FAILED || state == State.CANCELLED) {
- LOG.info("The pipeline {}", state);
- return Optional.of(false);
- }
-
+ boolean checkMaxWatermark(DataflowPipelineJob job) throws IOException {
JobMetrics metrics = options.getDataflowClient().projects().jobs()
.getMetrics(job.getProjectId(), job.getJobId()).execute();
@@ -260,10 +250,10 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
if (hasMaxWatermark) {
LOG.info("All watermarks of job {} reach to max value.", job.getJobId());
- return Optional.of(true);
+ return true;
}
}
- return Optional.absent();
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f0588a2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 70c4562..3818b35 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -19,8 +19,10 @@ package org.apache.beam.runners.dataflow.testing;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@@ -220,8 +222,13 @@ public class TestDataflowRunnerTest {
DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
+ when(request.execute())
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */))
+ .thenReturn(generateMockStreamingMetricResponse(
+ true /* hasWatermark */,
+ true /* maxWatermark */,
+ false /* multipleWatermarks */,
+ false /* multipleMaxWatermark */));
TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
runner.run(p, mockRunner);
}
@@ -401,7 +408,7 @@ public class TestDataflowRunnerTest {
false /* multipleWatermarks */,
false /* multipleMaxWatermark */));
doReturn(State.RUNNING).when(job).getState();
- assertEquals(Optional.absent(), runner.checkMaxWatermark(job));
+ assertFalse(runner.checkMaxWatermark(job));
}
@Test
@@ -419,7 +426,7 @@ public class TestDataflowRunnerTest {
false /* multipleWatermarks */,
false /* multipleMaxWatermark */));
doReturn(State.RUNNING).when(job).getState();
- assertEquals(Optional.of(true), runner.checkMaxWatermark(job));
+ assertTrue(runner.checkMaxWatermark(job));
}
@Test
@@ -437,7 +444,7 @@ public class TestDataflowRunnerTest {
false /* multipleWatermarks */,
false /* multipleMaxWatermark */));
doReturn(State.RUNNING).when(job).getState();
- assertEquals(Optional.absent(), runner.checkMaxWatermark(job));
+ assertFalse(runner.checkMaxWatermark(job));
}
@Test
@@ -455,7 +462,7 @@ public class TestDataflowRunnerTest {
true /* multipleWatermarks */,
true /* multipleMaxWatermark */));
doReturn(State.RUNNING).when(job).getState();
- assertEquals(Optional.of(true), runner.checkMaxWatermark(job));
+ assertTrue(runner.checkMaxWatermark(job));
}
@Test
@@ -473,7 +480,7 @@ public class TestDataflowRunnerTest {
true /* multipleWatermarks */,
false /* multipleMaxWatermark */));
doReturn(State.RUNNING).when(job).getState();
- assertEquals(Optional.absent(), runner.checkMaxWatermark(job));
+ assertFalse(runner.checkMaxWatermark(job));
}
@Test
@@ -489,7 +496,6 @@ public class TestDataflowRunnerTest {
generateMockMetricResponse(true /* success */, false /* tentative */));
doReturn(State.FAILED).when(job).getState();
assertEquals(Optional.of(false), runner.checkForSuccess(job));
- assertEquals(Optional.of(false), runner.checkMaxWatermark(job));
}
@Test
@@ -580,8 +586,13 @@ public class TestDataflowRunnerTest {
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
+ when(request.execute())
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */))
+ .thenReturn(generateMockStreamingMetricResponse(
+ true /* hasWatermark */,
+ true /* maxWatermark */,
+ false /* multipleWatermarks */,
+ false /* multipleMaxWatermark */));
runner.run(p, mockRunner);
}
@@ -630,8 +641,13 @@ public class TestDataflowRunnerTest {
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
+ when(request.execute())
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */))
+ .thenReturn(generateMockStreamingMetricResponse(
+ true /* hasWatermark */,
+ true /* maxWatermark */,
+ false /* multipleWatermarks */,
+ false /* multipleMaxWatermark */));
runner.run(p, mockRunner);
}