You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/26 20:49:10 UTC
[1/2] beam git commit: Make it possible to test runners that don't
support all metrics
Repository: beam
Updated Branches:
refs/heads/master de9d89c1e -> f23dd6709
Make it possible to test runners that don't support all metrics
Runners may only partially support metrics. This partial support runs along axes:
- attempted & committed
- counters & distributions & gauges & ...
Prior to this PR, we have categories for the first axis, but not the second.
This means that a runner that supports only part of the second axis has to blacklist tests
on the first axis. Adding categories for both axes lets runners build a matrix of supported
features.
(This also lets the DataflowRunner run more tests by accurately identifying its test
matrix.)
* MetricsMatchers: refactor matchers for committed/attempted code reuse
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4faa8feb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4faa8feb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4faa8feb
Branch: refs/heads/master
Commit: 4faa8feba822db000b4b42636408245422ed324d
Parents: de9d89c
Author: Dan Halperin <dh...@google.com>
Authored: Fri Apr 21 10:03:50 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 26 13:48:42 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 4 +-
.../beam/sdk/testing/UsesCounterMetrics.java | 25 +++
.../sdk/testing/UsesDistributionMetrics.java | 26 ++++
.../beam/sdk/testing/UsesGaugeMetrics.java | 25 +++
.../apache/beam/sdk/metrics/MetricMatchers.java | 144 ++++++-----------
.../apache/beam/sdk/metrics/MetricsTest.java | 154 ++++++++++++-------
6 files changed, 225 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 64dc71e..75aac43 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -122,8 +122,8 @@
<id>validates-runner-tests</id>
<configuration>
<excludedGroups>
- org.apache.beam.sdk.testing.UsesAttemptedMetrics,
- org.apache.beam.sdk.testing.UsesCommittedMetrics,
+ org.apache.beam.sdk.testing.UsesDistributionMetrics,
+ org.apache.beam.sdk.testing.UsesGaugeMetrics,
org.apache.beam.sdk.testing.UsesSetState,
org.apache.beam.sdk.testing.UsesMapState,
org.apache.beam.sdk.testing.UsesTimersInParDo,
http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java
new file mode 100644
index 0000000..0d0ed6f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Counter}.
+ * Tests tagged with {@link UsesCounterMetrics} should be run for runners which support counters.
+ */
+public class UsesCounterMetrics {}
http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java
new file mode 100644
index 0000000..6422024
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Distribution}.
+ * Tests tagged with {@link UsesDistributionMetrics} should be run for runners which support
+ * distributions.
+ */
+public class UsesDistributionMetrics {}
http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java
new file mode 100644
index 0000000..9d6455e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Gauge}.
+ * Tests tagged with {@link UsesGaugeMetrics} should be run for runners which support gauges.
+ */
+public class UsesGaugeMetrics {}
http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
index 2251c82..a0dd119 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
@@ -81,61 +81,39 @@ public class MetricMatchers {
}
/**
- * Matches a {@link MetricResult} with the given namespace, name and step, and whose attempted
- * value equals the given value.
+ * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals
+ * the given value for attempted metrics.
*/
public static <T> Matcher<MetricResult<T>> attemptedMetricsResult(
- final String namespace, final String name, final String step, final T attempted) {
- return new TypeSafeMatcher<MetricResult<T>>() {
- @Override
- protected boolean matchesSafely(MetricResult<T> item) {
- return Objects.equals(namespace, item.name().namespace())
- && Objects.equals(name, item.name().name())
- && item.step().contains(step)
- && metricResultsEqual(attempted, item.attempted());
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("MetricResult{inNamespace=").appendValue(namespace)
- .appendText(", name=").appendValue(name)
- .appendText(", step=").appendValue(step)
- .appendText(", attempted=").appendValue(attempted)
- .appendText("}");
- }
-
- @Override
- protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) {
- mismatchDescription.appendText("MetricResult{");
-
- describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step);
-
- if (!Objects.equals(attempted, item.attempted())) {
- mismatchDescription
- .appendText("attempted: ").appendValue(attempted)
- .appendText(" != ").appendValue(item.attempted());
- }
-
- mismatchDescription.appendText("}");
- }
- };
+ final String namespace, final String name, final String step, final T value) {
+ return metricsResult(namespace, name, step, value, false);
}
/**
- * Matches a {@link MetricResult} with the given namespace, name and step, and whose committed
- * value equals the given value.
+ * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals
+ * the given value for committed metrics.
*/
public static <T> Matcher<MetricResult<T>> committedMetricsResult(
- final String namespace, final String name, final String step,
- final T committed) {
+ final String namespace, final String name, final String step, final T value) {
+ return metricsResult(namespace, name, step, value, true);
+ }
+
+ /**
+ * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals
+ * the given value for either committed or attempted (based on {@code isCommitted}) metrics.
+ */
+ public static <T> Matcher<MetricResult<T>> metricsResult(
+ final String namespace, final String name, final String step, final T value,
+ final boolean isCommitted) {
+ final String metricState = isCommitted ? "committed" : "attempted";
return new TypeSafeMatcher<MetricResult<T>>() {
@Override
protected boolean matchesSafely(MetricResult<T> item) {
+ final T metricValue = isCommitted ? item.committed() : item.attempted();
return Objects.equals(namespace, item.name().namespace())
&& Objects.equals(name, item.name().name())
&& item.step().contains(step)
- && metricResultsEqual(committed, item.committed());
+ && metricResultsEqual(value, metricValue);
}
@Override
@@ -144,20 +122,21 @@ public class MetricMatchers {
.appendText("MetricResult{inNamespace=").appendValue(namespace)
.appendText(", name=").appendValue(name)
.appendText(", step=").appendValue(step)
- .appendText(", committed=").appendValue(committed)
+ .appendText(String.format(", %s=", metricState)).appendValue(value)
.appendText("}");
}
@Override
protected void describeMismatchSafely(MetricResult<T> item, Description mismatchDescription) {
mismatchDescription.appendText("MetricResult{");
+ final T metricValue = isCommitted ? item.committed() : item.attempted();
describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step);
- if (!Objects.equals(committed, item.committed())) {
+ if (!Objects.equals(value, metricValue)) {
mismatchDescription
- .appendText("committed: ").appendValue(committed)
- .appendText(" != ").appendValue(item.committed());
+ .appendText(String.format("%s: ", metricState)).appendValue(value)
+ .appendText(" != ").appendValue(metricValue);
}
mismatchDescription.appendText("}");
@@ -176,62 +155,28 @@ public class MetricMatchers {
static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax(
final String namespace, final String name, final String step,
final Long attemptedMin, final Long attemptedMax) {
- return new TypeSafeMatcher<MetricResult<DistributionResult>>() {
- @Override
- protected boolean matchesSafely(MetricResult<DistributionResult> item) {
- return Objects.equals(namespace, item.name().namespace())
- && Objects.equals(name, item.name().name())
- && item.step().contains(step)
- && Objects.equals(attemptedMin, item.attempted().min())
- && Objects.equals(attemptedMax, item.attempted().max());
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("MetricResult{inNamespace=").appendValue(namespace)
- .appendText(", name=").appendValue(name)
- .appendText(", step=").appendValue(step)
- .appendText(", attemptedMin=").appendValue(attemptedMin)
- .appendText(", attemptedMax=").appendValue(attemptedMax)
- .appendText("}");
- }
-
- @Override
- protected void describeMismatchSafely(MetricResult<DistributionResult> item,
- Description mismatchDescription) {
- mismatchDescription.appendText("MetricResult{");
-
- describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step);
-
- if (!Objects.equals(attemptedMin, item.attempted())) {
- mismatchDescription
- .appendText("attemptedMin: ").appendValue(attemptedMin)
- .appendText(" != ").appendValue(item.attempted());
- }
-
- if (!Objects.equals(attemptedMax, item.attempted())) {
- mismatchDescription
- .appendText("attemptedMax: ").appendValue(attemptedMax)
- .appendText(" != ").appendValue(item.attempted());
- }
-
- mismatchDescription.appendText("}");
- }
- };
+ return distributionMinMax(namespace, name, step, attemptedMin, attemptedMax, false);
}
static Matcher<MetricResult<DistributionResult>> distributionCommittedMinMax(
final String namespace, final String name, final String step,
final Long committedMin, final Long committedMax) {
+ return distributionMinMax(namespace, name, step, committedMin, committedMax, true);
+ }
+
+ static Matcher<MetricResult<DistributionResult>> distributionMinMax(
+ final String namespace, final String name, final String step,
+ final Long min, final Long max, final boolean isCommitted) {
+ final String metricState = isCommitted ? "committed" : "attempted";
return new TypeSafeMatcher<MetricResult<DistributionResult>>() {
@Override
protected boolean matchesSafely(MetricResult<DistributionResult> item) {
+ DistributionResult metricValue = isCommitted ? item.committed() : item.attempted();
return Objects.equals(namespace, item.name().namespace())
&& Objects.equals(name, item.name().name())
&& item.step().contains(step)
- && Objects.equals(committedMin, item.committed().min())
- && Objects.equals(committedMax, item.committed().max());
+ && Objects.equals(min, metricValue.min())
+ && Objects.equals(max, metricValue.max());
}
@Override
@@ -240,8 +185,8 @@ public class MetricMatchers {
.appendText("MetricResult{inNamespace=").appendValue(namespace)
.appendText(", name=").appendValue(name)
.appendText(", step=").appendValue(step)
- .appendText(", committedMin=").appendValue(committedMin)
- .appendText(", committedMax=").appendValue(committedMax)
+ .appendText(String.format(", %sMin=", metricState)).appendValue(min)
+ .appendText(String.format(", %sMax=", metricState)).appendValue(max)
.appendText("}");
}
@@ -251,17 +196,18 @@ public class MetricMatchers {
mismatchDescription.appendText("MetricResult{");
describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step);
+ DistributionResult metricValue = isCommitted ? item.committed() : item.attempted();
- if (!Objects.equals(committedMin, item.committed())) {
+ if (!Objects.equals(min, metricValue.min())) {
mismatchDescription
- .appendText("committedMin: ").appendValue(committedMin)
- .appendText(" != ").appendValue(item.committed());
+ .appendText(String.format("%sMin: ", metricState)).appendValue(min)
+ .appendText(" != ").appendValue(metricValue.min());
}
- if (!Objects.equals(committedMax, item.committed())) {
+ if (!Objects.equals(max, metricValue.max())) {
mismatchDescription
- .appendText("committedMax: ").appendValue(committedMax)
- .appendText(" != ").appendValue(item.committed());
+ .appendText(String.format("%sMax: ", metricState)).appendValue(max)
+ .appendText(" != ").appendValue(metricValue.max());
}
mismatchDescription.appendText("}");
http://git-wip-us.apache.org/repos/asf/beam/blob/4faa8feb/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index c7068e1..8077c27 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -19,9 +19,8 @@
package org.apache.beam.sdk.metrics;
import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
-import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
-import static org.apache.beam.sdk.metrics.MetricMatchers.distributionAttemptedMinMax;
-import static org.apache.beam.sdk.metrics.MetricMatchers.distributionCommittedMinMax;
+import static org.apache.beam.sdk.metrics.MetricMatchers.distributionMinMax;
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertNull;
@@ -33,6 +32,9 @@ import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesAttemptedMetrics;
import org.apache.beam.sdk.testing.UsesCommittedMetrics;
+import org.apache.beam.sdk.testing.UsesCounterMetrics;
+import org.apache.beam.sdk.testing.UsesDistributionMetrics;
+import org.apache.beam.sdk.testing.UsesGaugeMetrics;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -58,6 +60,13 @@ public class MetricsTest implements Serializable {
private static final String NAMESPACE = MetricsTest.class.getName();
private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName();
+ private static MetricQueryResults queryTestMetrics(PipelineResult result) {
+ return result.metrics().queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
+ .build());
+ }
+
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@@ -67,14 +76,14 @@ public class MetricsTest implements Serializable {
}
@Test
- public void distributionWithoutContainer() {
+ public void testDistributionWithoutContainer() {
assertNull(MetricsEnvironment.getCurrentContainer());
// Should not fail even though there is no metrics container.
Metrics.distribution(NS, NAME).update(5L);
}
@Test
- public void counterWithoutContainer() {
+ public void testCounterWithoutContainer() {
assertNull(MetricsEnvironment.getCurrentContainer());
// Should not fail even though there is no metrics container.
Counter counter = Metrics.counter(NS, NAME);
@@ -85,7 +94,7 @@ public class MetricsTest implements Serializable {
}
@Test
- public void distributionToCell() {
+ public void testDistributionToCell() {
MetricsContainer container = new MetricsContainer("step");
MetricsEnvironment.setCurrentContainer(container);
@@ -104,7 +113,7 @@ public class MetricsTest implements Serializable {
}
@Test
- public void counterToCell() {
+ public void testCounterToCell() {
MetricsContainer container = new MetricsContainer("step");
MetricsEnvironment.setCurrentContainer(container);
Counter counter = Metrics.counter(NS, NAME);
@@ -122,62 +131,73 @@ public class MetricsTest implements Serializable {
assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L));
}
- @Category({ValidatesRunner.class, UsesCommittedMetrics.class})
+ @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class,
+ UsesDistributionMetrics.class, UsesGaugeMetrics.class})
@Test
- public void committedMetricsReportToQuery() {
+ public void testAllCommittedMetrics() {
PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
- MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
- .build());
-
- assertThat(metrics.counters(), hasItem(
- committedMetricsResult(NAMESPACE, "count", "MyStep1", 3L)));
- assertThat(metrics.distributions(), hasItem(
- committedMetricsResult(NAMESPACE, "input", "MyStep1",
- DistributionResult.create(26L, 3L, 5L, 13L))));
+ assertAllMetrics(metrics, true);
+ }
- assertThat(metrics.counters(), hasItem(
- committedMetricsResult(NAMESPACE, "count", "MyStep2", 6L)));
- assertThat(metrics.distributions(), hasItem(
- committedMetricsResult(NAMESPACE, "input", "MyStep2",
- DistributionResult.create(52L, 6L, 5L, 13L))));
- assertThat(metrics.gauges(), hasItem(
- committedMetricsResult(NAMESPACE, "my-gauge", "MyStep2",
- GaugeResult.create(12L, Instant.now()))));
+ @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class,
+ UsesDistributionMetrics.class, UsesGaugeMetrics.class})
+ @Test
+ public void testAllAttemptedMetrics() {
+ PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
- assertThat(metrics.distributions(), hasItem(
- distributionCommittedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L)));
+ // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly.
+ assertAllMetrics(metrics, false);
}
+ @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class})
+ @Test
+ public void testCommittedCounterMetrics() {
+ PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
+ assertCounterMetrics(metrics, true);
+ }
- @Category({ValidatesRunner.class, UsesAttemptedMetrics.class})
+ @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
@Test
- public void attemptedMetricsReportToQuery() {
+ public void testAttemptedCounterMetrics() {
PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
+ assertCounterMetrics(metrics, false);
+ }
- MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class))
- .build());
+ @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesDistributionMetrics.class})
+ @Test
+ public void testCommittedDistributionMetrics() {
+ PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
+ assertDistributionMetrics(metrics, true);
+ }
- // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly.
- assertThat(metrics.counters(), hasItem(
- attemptedMetricsResult(NAMESPACE, "count", "MyStep1", 3L)));
- assertThat(metrics.distributions(), hasItem(
- attemptedMetricsResult(NAMESPACE, "input", "MyStep1",
- DistributionResult.create(26L, 3L, 5L, 13L))));
+ @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesDistributionMetrics.class})
+ @Test
+ public void testAttemptedDistributionMetrics() {
+ PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
+ assertDistributionMetrics(metrics, false);
+ }
- assertThat(metrics.counters(), hasItem(
- attemptedMetricsResult(NAMESPACE, "count", "MyStep2", 6L)));
- assertThat(metrics.distributions(), hasItem(
- attemptedMetricsResult(NAMESPACE, "input", "MyStep2",
- DistributionResult.create(52L, 6L, 5L, 13L))));
- assertThat(metrics.gauges(), hasItem(
- attemptedMetricsResult(NAMESPACE, "my-gauge", "MyStep2",
- GaugeResult.create(12L, Instant.now()))));
+ @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesGaugeMetrics.class})
+ @Test
+ public void testCommittedGaugeMetrics() {
+ PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
+ assertGaugeMetrics(metrics, true);
+ }
- assertThat(metrics.distributions(), hasItem(
- distributionAttemptedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L)));
+ @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesGaugeMetrics.class})
+ @Test
+ public void testAttemptedGaugeMetrics() {
+ PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
+ assertGaugeMetrics(metrics, false);
}
private PipelineResult runPipelineWithMetrics() {
@@ -232,8 +252,39 @@ public class MetricsTest implements Serializable {
return result;
}
+ private static void assertCounterMetrics(MetricQueryResults metrics, boolean isCommitted) {
+ assertThat(metrics.counters(), hasItem(
+ metricsResult(NAMESPACE, "count", "MyStep1", 3L, isCommitted)));
+ assertThat(metrics.counters(), hasItem(
+ metricsResult(NAMESPACE, "count", "MyStep2", 6L, isCommitted)));
+ }
+
+ private static void assertGaugeMetrics(MetricQueryResults metrics, boolean isCommitted) {
+ assertThat(metrics.gauges(), hasItem(
+ metricsResult(NAMESPACE, "my-gauge", "MyStep2",
+ GaugeResult.create(12L, Instant.now()), isCommitted)));
+ }
+
+ private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) {
+ assertThat(metrics.distributions(), hasItem(
+ metricsResult(NAMESPACE, "input", "MyStep1",
+ DistributionResult.create(26L, 3L, 5L, 13L), isCommitted)));
+
+ assertThat(metrics.distributions(), hasItem(
+ metricsResult(NAMESPACE, "input", "MyStep2",
+ DistributionResult.create(52L, 6L, 5L, 13L), isCommitted)));
+ assertThat(metrics.distributions(), hasItem(
+ distributionMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L, isCommitted)));
+ }
+
+ private static void assertAllMetrics(MetricQueryResults metrics, boolean isCommitted) {
+ assertCounterMetrics(metrics, isCommitted);
+ assertDistributionMetrics(metrics, isCommitted);
+ assertGaugeMetrics(metrics, isCommitted);
+ }
+
@Test
- @Category({ValidatesRunner.class, UsesAttemptedMetrics.class})
+ @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
public void testBoundedSourceMetrics() {
long numElements = 1000;
@@ -259,11 +310,10 @@ public class MetricsTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesAttemptedMetrics.class})
+ @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
public void testUnboundedSourceMetrics() {
long numElements = 1000;
-
// Use withMaxReadTime to force unbounded mode.
pipeline.apply(
GenerateSequence.from(0).to(numElements).withMaxReadTime(Duration.standardDays(1)));
[2/2] beam git commit: This closes #2633
Posted by dh...@apache.org.
This closes #2633
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f23dd670
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f23dd670
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f23dd670
Branch: refs/heads/master
Commit: f23dd6709bff3cf3827bfeeab0c122ca23ac09ca
Parents: de9d89c 4faa8fe
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 26 13:49:00 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 26 13:49:00 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 4 +-
.../beam/sdk/testing/UsesCounterMetrics.java | 25 +++
.../sdk/testing/UsesDistributionMetrics.java | 26 ++++
.../beam/sdk/testing/UsesGaugeMetrics.java | 25 +++
.../apache/beam/sdk/metrics/MetricMatchers.java | 144 ++++++-----------
.../apache/beam/sdk/metrics/MetricsTest.java | 154 ++++++++++++-------
6 files changed, 225 insertions(+), 153 deletions(-)
----------------------------------------------------------------------