You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:28 UTC
[20/50] incubator-beam git commit: Change counter name in
TestDataflowRunner
Change counter name in TestDataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b055d2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b055d2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b055d2d
Branch: refs/heads/gearpump-runner
Commit: 6b055d2debe879816808b4c1ee847e34cc1df5c0
Parents: 1ee191f
Author: Joshua Litt <jo...@google.com>
Authored: Sat Dec 17 11:12:12 2016 -0800
Committer: Joshua Litt <jo...@google.com>
Committed: Sat Dec 17 11:12:12 2016 -0800
----------------------------------------------------------------------
.../dataflow/testing/TestDataflowRunner.java | 29 ++++++++++++++++----
.../testing/TestDataflowRunnerTest.java | 16 ++++++++++-
2 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b055d2d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 4b0fcf2..0564448 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -61,7 +61,12 @@ import org.slf4j.LoggerFactory;
*/
public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private static final String TENTATIVE_COUNTER = "tentative";
- private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark";
+ // See https://issues.apache.org/jira/browse/BEAM-1170
+ // we need to either fix the API or pipe the DRAINED signal through
+ @VisibleForTesting
+ static final String LEGACY_WATERMARK_METRIC_SUFFIX = "windmill-data-watermark";
+ @VisibleForTesting
+ static final String WATERMARK_METRIC_SUFFIX = "DataWatermark";
private static final long MAX_WATERMARK_VALUE = -2L;
private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
@@ -248,6 +253,23 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
+ * Checks wether a metric is a streaming watermark.
+ *
+ * @return true if the metric is a watermark.
+ */
+ boolean isWatermark(MetricUpdate metric) {
+ if (metric.getName() == null || metric.getName().getName() == null) {
+ return false; // no name -> shouldn't happen, not the watermark
+ }
+ if (metric.getScalar() == null) {
+ return false; // no scalar value -> not the watermark
+ }
+ String name = metric.getName().getName();
+ return name.endsWith(LEGACY_WATERMARK_METRIC_SUFFIX)
+ || name.endsWith(WATERMARK_METRIC_SUFFIX);
+ }
+
+ /**
* Check watermarks of the streaming job. At least one watermark metric must exist.
*
* @return true if all watermarks are at max, false otherwise.
@@ -256,10 +278,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) {
boolean hasMaxWatermark = false;
for (MetricUpdate metric : metrics.getMetrics()) {
- if (metric.getName() == null
- || metric.getName().getName() == null
- || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX)
- || metric.getScalar() == null) {
+ if (!isWatermark(metric)) {
continue;
}
BigDecimal watermark = (BigDecimal) metric.getScalar();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b055d2d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 366c6a1..da5630b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.dataflow.testing;
+import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.LEGACY_WATERMARK_METRIC_SUFFIX;
+import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.WATERMARK_METRIC_SUFFIX;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -95,7 +97,6 @@ public class TestDataflowRunnerTest {
@Mock private MockLowLevelHttpRequest request;
@Mock private GcsUtil mockGcsUtil;
- private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark";
private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2);
private TestDataflowPipelineOptions options;
@@ -411,6 +412,19 @@ public class TestDataflowRunnerTest {
}
@Test
+ public void testCheckMaxWatermarkWithLegacyWatermarkAtMax() throws IOException {
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+ Pipeline p = TestPipeline.create(options);
+ p.apply(Create.of(1, 2, 3));
+
+ TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+ JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
+ ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
+ doReturn(State.RUNNING).when(job).getState();
+ assertTrue(runner.atMaxWatermark(job, metrics));
+ }
+
+ @Test
public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException {
DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);