You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:28 UTC

[20/50] incubator-beam git commit: Change counter name in TestDataflowRunner

Change counter name in TestDataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b055d2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b055d2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b055d2d

Branch: refs/heads/gearpump-runner
Commit: 6b055d2debe879816808b4c1ee847e34cc1df5c0
Parents: 1ee191f
Author: Joshua Litt <jo...@google.com>
Authored: Sat Dec 17 11:12:12 2016 -0800
Committer: Joshua Litt <jo...@google.com>
Committed: Sat Dec 17 11:12:12 2016 -0800

----------------------------------------------------------------------
 .../dataflow/testing/TestDataflowRunner.java    | 29 ++++++++++++++++----
 .../testing/TestDataflowRunnerTest.java         | 16 ++++++++++-
 2 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b055d2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 4b0fcf2..0564448 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -61,7 +61,12 @@ import org.slf4j.LoggerFactory;
  */
 public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static final String TENTATIVE_COUNTER = "tentative";
-  private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark";
+  // See https://issues.apache.org/jira/browse/BEAM-1170
+  // we need to either fix the API or pipe the DRAINED signal through
+  @VisibleForTesting
+  static final String LEGACY_WATERMARK_METRIC_SUFFIX = "windmill-data-watermark";
+  @VisibleForTesting
+  static final String WATERMARK_METRIC_SUFFIX = "DataWatermark";
   private static final long MAX_WATERMARK_VALUE = -2L;
   private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
 
@@ -248,6 +253,23 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
+   * Checks wether a metric is a streaming watermark.
+   *
+   * @return true if the metric is a watermark.
+   */
+  boolean isWatermark(MetricUpdate metric) {
+    if (metric.getName() == null || metric.getName().getName() == null) {
+      return false; // no name -> shouldn't happen, not the watermark
+    }
+    if (metric.getScalar() == null) {
+      return false; // no scalar value -> not the watermark
+    }
+    String name = metric.getName().getName();
+    return name.endsWith(LEGACY_WATERMARK_METRIC_SUFFIX)
+        || name.endsWith(WATERMARK_METRIC_SUFFIX);
+  }
+
+  /**
    * Check watermarks of the streaming job. At least one watermark metric must exist.
    *
    * @return true if all watermarks are at max, false otherwise.
@@ -256,10 +278,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) {
     boolean hasMaxWatermark = false;
     for (MetricUpdate metric : metrics.getMetrics()) {
-      if (metric.getName() == null
-          || metric.getName().getName() == null
-          || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX)
-          || metric.getScalar() == null) {
+      if (!isWatermark(metric)) {
         continue;
       }
       BigDecimal watermark = (BigDecimal) metric.getScalar();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b055d2d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 366c6a1..da5630b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.testing;
 
+import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.LEGACY_WATERMARK_METRIC_SUFFIX;
+import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.WATERMARK_METRIC_SUFFIX;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -95,7 +97,6 @@ public class TestDataflowRunnerTest {
   @Mock private MockLowLevelHttpRequest request;
   @Mock private GcsUtil mockGcsUtil;
 
-  private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark";
   private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2);
 
   private TestDataflowPipelineOptions options;
@@ -411,6 +412,19 @@ public class TestDataflowRunnerTest {
   }
 
   @Test
+  public void testCheckMaxWatermarkWithLegacyWatermarkAtMax() throws IOException {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
+        ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
+    doReturn(State.RUNNING).when(job).getState();
+    assertTrue(runner.atMaxWatermark(job, metrics));
+  }
+
+  @Test
   public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException {
     DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);