You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/28 00:10:48 UTC

[2/5] incubator-beam git commit: Fix checkMaxWatermark causing batch test failed

Fix checkMaxWatermark causing batch test failed


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f0588a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f0588a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f0588a2

Branch: refs/heads/master
Commit: 9f0588a2d63dd8538675b128a488ea5fa9c491f2
Parents: f6bd47b
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Wed Sep 7 11:59:02 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Sep 27 17:03:56 2016 -0700

----------------------------------------------------------------------
 .../dataflow/testing/TestDataflowRunner.java    | 20 +++-------
 .../testing/TestDataflowRunnerTest.java         | 40 ++++++++++++++------
 2 files changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f0588a2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index c569cd4..b8b4eaf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -120,11 +120,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             try {
               for (;;) {
                 Optional<Boolean> result = checkForSuccess(job);
-                if (result.isPresent()) {
-                  return result;
-                }
-                result = checkMaxWatermark(job);
-                if (result.isPresent()) {
+                if (result.isPresent() && (!result.get() || checkMaxWatermark(job))) {
                   return result;
                 }
                 Thread.sleep(10000L);
@@ -217,7 +213,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             + "{} expected assertions.", job.getJobId(), successes, failures,
             expectedNumberOfAssertions);
         return Optional.of(false);
-      } else if (successes > 0 && successes >= expectedNumberOfAssertions) {
+      } else if (successes >= expectedNumberOfAssertions) {
         LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
             + "{} expected assertions.", job.getJobId(), successes, failures,
             expectedNumberOfAssertions);
@@ -231,13 +227,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     return Optional.<Boolean>absent();
   }
 
-  Optional<Boolean> checkMaxWatermark(DataflowPipelineJob job) throws IOException {
-    State state = job.getState();
-    if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("The pipeline {}", state);
-      return Optional.of(false);
-    }
-
+  boolean checkMaxWatermark(DataflowPipelineJob job) throws IOException {
     JobMetrics metrics = options.getDataflowClient().projects().jobs()
         .getMetrics(job.getProjectId(), job.getJobId()).execute();
 
@@ -260,10 +250,10 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       }
       if (hasMaxWatermark) {
         LOG.info("All watermarks of job {} reach to max value.", job.getJobId());
-        return Optional.of(true);
+        return true;
       }
     }
-    return Optional.absent();
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f0588a2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 70c4562..3818b35 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -19,8 +19,10 @@ package org.apache.beam.runners.dataflow.testing;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -220,8 +222,13 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
+    when(request.execute())
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */))
+        .thenReturn(generateMockStreamingMetricResponse(
+            true /* hasWatermark */,
+            true /* maxWatermark */,
+            false /* multipleWatermarks */,
+            false /* multipleMaxWatermark */));
     TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
     runner.run(p, mockRunner);
   }
@@ -401,7 +408,7 @@ public class TestDataflowRunnerTest {
             false /* multipleWatermarks */,
             false /* multipleMaxWatermark */));
     doReturn(State.RUNNING).when(job).getState();
-    assertEquals(Optional.absent(), runner.checkMaxWatermark(job));
+    assertFalse(runner.checkMaxWatermark(job));
   }
 
   @Test
@@ -419,7 +426,7 @@ public class TestDataflowRunnerTest {
             false /* multipleWatermarks */,
             false /* multipleMaxWatermark */));
     doReturn(State.RUNNING).when(job).getState();
-    assertEquals(Optional.of(true), runner.checkMaxWatermark(job));
+    assertTrue(runner.checkMaxWatermark(job));
   }
 
   @Test
@@ -437,7 +444,7 @@ public class TestDataflowRunnerTest {
             false /* multipleWatermarks */,
             false /* multipleMaxWatermark */));
     doReturn(State.RUNNING).when(job).getState();
-    assertEquals(Optional.absent(), runner.checkMaxWatermark(job));
+    assertFalse(runner.checkMaxWatermark(job));
   }
 
   @Test
@@ -455,7 +462,7 @@ public class TestDataflowRunnerTest {
             true /* multipleWatermarks */,
             true /* multipleMaxWatermark */));
     doReturn(State.RUNNING).when(job).getState();
-    assertEquals(Optional.of(true), runner.checkMaxWatermark(job));
+    assertTrue(runner.checkMaxWatermark(job));
   }
 
   @Test
@@ -473,7 +480,7 @@ public class TestDataflowRunnerTest {
             true /* multipleWatermarks */,
             false /* multipleMaxWatermark */));
     doReturn(State.RUNNING).when(job).getState();
-    assertEquals(Optional.absent(), runner.checkMaxWatermark(job));
+    assertFalse(runner.checkMaxWatermark(job));
   }
 
   @Test
@@ -489,7 +496,6 @@ public class TestDataflowRunnerTest {
         generateMockMetricResponse(true /* success */, false /* tentative */));
     doReturn(State.FAILED).when(job).getState();
     assertEquals(Optional.of(false), runner.checkForSuccess(job));
-    assertEquals(Optional.of(false), runner.checkMaxWatermark(job));
   }
 
   @Test
@@ -580,8 +586,13 @@ public class TestDataflowRunnerTest {
     when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenReturn(State.DONE);
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
+    when(request.execute())
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */))
+        .thenReturn(generateMockStreamingMetricResponse(
+            true /* hasWatermark */,
+            true /* maxWatermark */,
+            false /* multipleWatermarks */,
+            false /* multipleMaxWatermark */));
     runner.run(p, mockRunner);
   }
 
@@ -630,8 +641,13 @@ public class TestDataflowRunnerTest {
     when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenReturn(State.DONE);
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
+    when(request.execute())
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */))
+        .thenReturn(generateMockStreamingMetricResponse(
+            true /* hasWatermark */,
+            true /* maxWatermark */,
+            false /* multipleWatermarks */,
+            false /* multipleMaxWatermark */));
     runner.run(p, mockRunner);
   }