You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/03/20 22:09:31 UTC
[1/2] beam git commit: Support for querying metrics in Dataflow Runner
Repository: beam
Updated Branches:
refs/heads/master 1d9772a3a -> 59aa0dab7
Support for querying metrics in Dataflow Runner
Added MetricsFiltering class for helper methods related to matching step
names.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6de412a5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6de412a5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6de412a5
Branch: refs/heads/master
Commit: 6de412a5dfb3000ab5d354ada8761789230d3ce3
Parents: 1d9772a
Author: Pablo <pa...@google.com>
Authored: Fri Mar 10 16:10:31 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Mon Mar 20 14:48:22 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectMetrics.java | 70 +-----
.../beam/runners/direct/DirectMetricsTest.java | 86 ++-----
.../beam/runners/dataflow/DataflowMetrics.java | 212 +++++++++++++++++
.../runners/dataflow/DataflowPipelineJob.java | 14 +-
.../runners/dataflow/DataflowMetricsTest.java | 236 +++++++++++++++++++
.../beam/sdk/metrics/MetricFiltering.java | 99 ++++++++
.../beam/sdk/metrics/MetricFilteringTest.java | 72 ++++++
7 files changed, 655 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index fa8f9c3..f04dc21 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -20,12 +20,10 @@ package org.apache.beam.runners.direct;
import static java.util.Arrays.asList;
import com.google.auto.value.AutoValue;
-import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -35,9 +33,9 @@ import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.DistributionData;
import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricFiltering;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
@@ -258,7 +256,7 @@ class DirectMetrics extends MetricResults {
MetricsFilter filter,
ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder,
Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) {
- if (matches(filter, entry.getKey())) {
+ if (MetricFiltering.matches(filter, entry.getKey())) {
resultsBuilder.add(DirectMetricResult.create(
entry.getKey().metricName(),
entry.getKey().stepName(),
@@ -267,70 +265,6 @@ class DirectMetrics extends MetricResults {
}
}
- // Matching logic is implemented here rather than in MetricsFilter because we would like
- // MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with
- // a Proto/JSON/etc. schema object.
- private boolean matches(MetricsFilter filter, MetricKey key) {
- return matchesName(key.metricName(), filter.names())
- && matchesScope(key.stepName(), filter.steps());
- }
-
- /**
- * {@code subPathMatches(haystack, needle)} returns true if {@code needle}
- * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b",
- * but not "a/fool/bar/b" or "a/foo/bart/b".
- */
- public boolean subPathMatches(String haystack, String needle) {
- int location = haystack.indexOf(needle);
- int end = location + needle.length();
- if (location == -1) {
- return false; // needle not found
- } else if (location != 0 && haystack.charAt(location - 1) != '/') {
- return false; // the first entry in needle wasn't exactly matched
- } else if (end != haystack.length() && haystack.charAt(end) != '/') {
- return false; // the last entry in needle wasn't exactly matched
- } else {
- return true;
- }
- }
-
- /**
- * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched
- * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A
- * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or
- * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */
- public boolean matchesScope(String actualScope, Set<String> scopes) {
- if (scopes.isEmpty() || scopes.contains(actualScope)) {
- return true;
- }
-
- // If there is no perfect match, a stage name-level match is tried.
- // This is done by a substring search over the levels of the scope.
- // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C".
- for (String scope : scopes) {
- if (subPathMatches(actualScope, scope)) {
- return true;
- }
- }
-
- return false;
- }
-
- private boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) {
- if (nameFilters.isEmpty()) {
- return true;
- }
-
- for (MetricNameFilter nameFilter : nameFilters) {
- if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name()))
- && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) {
- return true;
- }
- }
-
- return false;
- }
-
/** Apply metric updates that represent physical counter deltas to the current metric values. */
public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
for (MetricUpdate<Long> counter : updates.counterUpdates()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 77229bf..7183124 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -23,13 +23,9 @@ import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.DistributionData;
import org.apache.beam.sdk.metrics.DistributionResult;
@@ -129,97 +125,63 @@ public class DirectMetricsTest {
committedMetricsResult("ns1", "name1", "step2", 0L)));
}
- private boolean matchesSubPath(String actualScope, String subPath) {
- return metrics.subPathMatches(actualScope, subPath);
- }
-
- @Test
- public void testMatchesSubPath() {
- assertTrue("Match of the first element",
- matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1"));
- assertTrue("Match of the first elements",
- matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
- assertTrue("Match of the last elements",
- matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1"));
- assertFalse("Substring match but no subpath match",
- matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1"));
- assertFalse("Substring match from start - but no subpath match",
- matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top"));
- }
-
- private boolean matchesScopeWithSingleFilter(String actualScope, String filter) {
- Set<String> scopeFilter = new HashSet<String>();
- scopeFilter.add(filter);
- return metrics.matchesScope(actualScope, scopeFilter);
- }
-
- @Test
- public void testMatchesScope() {
- assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1"));
- assertTrue(matchesScopeWithSingleFilter(
- "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1"));
- assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
- assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1"));
- assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1"));
- assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn"));
- }
-
@SuppressWarnings("unchecked")
@Test
- public void testPartialScopeMatchingInMetricsQuery() {
+ public void testApplyAttemptedQueryCompositeScope() {
metrics.updatePhysical(bundle1, MetricUpdates.create(
ImmutableList.of(
- MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
- MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
+ MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
+ MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
ImmutableList.<MetricUpdate<DistributionData>>of()));
metrics.updatePhysical(bundle1, MetricUpdates.create(
ImmutableList.of(
- MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
- MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
+ MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
+ MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
ImmutableList.<MetricUpdate<DistributionData>>of()));
MetricQueryResults results = metrics.queryMetrics(
- MetricsFilter.builder().addStep("Top1/Outer1").build());
+ MetricsFilter.builder().addStep("Outer1").build());
assertThat(results.counters(),
containsInAnyOrder(
- attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L),
- attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L)));
-
- results = metrics.queryMetrics(
- MetricsFilter.builder().addStep("Inner2").build());
+ attemptedMetricsResult("ns1", "name1", "Outer1/Inner1", 12L),
+ attemptedMetricsResult("ns1", "name1", "Outer1/Inner2", 8L)));
assertThat(results.counters(),
containsInAnyOrder(
- attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L),
- attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L)));
+ committedMetricsResult("ns1", "name1", "Outer1/Inner1", 0L),
+ committedMetricsResult("ns1", "name1", "Outer1/Inner2", 0L)));
}
+
@SuppressWarnings("unchecked")
@Test
- public void testApplyAttemptedQueryCompositeScope() {
+ public void testPartialScopeMatchingInMetricsQuery() {
metrics.updatePhysical(bundle1, MetricUpdates.create(
ImmutableList.of(
- MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
- MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
+ MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
+ MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
ImmutableList.<MetricUpdate<DistributionData>>of()));
metrics.updatePhysical(bundle1, MetricUpdates.create(
ImmutableList.of(
- MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
- MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
+ MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
+ MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
ImmutableList.<MetricUpdate<DistributionData>>of()));
MetricQueryResults results = metrics.queryMetrics(
- MetricsFilter.builder().addStep("Outer1").build());
+ MetricsFilter.builder().addStep("Top1/Outer1").build());
assertThat(results.counters(),
containsInAnyOrder(
- attemptedMetricsResult("ns1", "name1", "Outer1/Inner1", 12L),
- attemptedMetricsResult("ns1", "name1", "Outer1/Inner2", 8L)));
+ attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L),
+ attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L)));
+
+ results = metrics.queryMetrics(
+ MetricsFilter.builder().addStep("Inner2").build());
assertThat(results.counters(),
containsInAnyOrder(
- committedMetricsResult("ns1", "name1", "Outer1/Inner1", 0L),
- committedMetricsResult("ns1", "name1", "Outer1/Inner2", 0L)));
+ attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L),
+ attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L)));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
new file mode 100644
index 0000000..c0d1883
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricFiltering;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link MetricResults} for the Dataflow Runner.
+ */
+class DataflowMetrics extends MetricResults {
+ private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class);
+ /**
+ * Client for the Dataflow service. This can be used to query the service
+ * for information about the job.
+ */
+ private DataflowClient dataflowClient;
+
+ /**
+ * PipelineResult implementation for Dataflow Runner. It contains job state and id information.
+ */
+ private DataflowPipelineJob dataflowPipelineJob;
+
+ /**
+ * After the job has finished running, Metrics no longer will change, so their results are
+ * cached here.
+ */
+ private MetricQueryResults cachedMetricResults = null;
+
+ /**
+ * Constructor for the DataflowMetrics class.
+ * @param dataflowPipelineJob is used to get Job state and Job ID information.
+ * @param dataflowClient is used to query user metrics from the Dataflow service.
+ */
+ public DataflowMetrics(DataflowPipelineJob dataflowPipelineJob, DataflowClient dataflowClient) {
+ this.dataflowClient = dataflowClient;
+ this.dataflowPipelineJob = dataflowPipelineJob;
+ }
+
+ /**
+ * Build an immutable map that serves as a hash key for a metric update.
+ * @return a {@link MetricKey} that can be hashed and used to identify a metric.
+ */
+ private MetricKey metricHashKey(
+ com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+ String fullStepName = metricUpdate.getName().getContext().get("step");
+ fullStepName = (dataflowPipelineJob.aggregatorTransforms != null
+ ? dataflowPipelineJob.aggregatorTransforms
+ .getAppliedTransformForStepName(fullStepName).getFullName() : fullStepName);
+ return MetricKey.create(
+ fullStepName,
+ MetricName.named(
+ metricUpdate.getName().getContext().get("namespace"),
+ metricUpdate.getName().getName()));
+ }
+
+ /**
+ * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative
+ * update or not.
+ * @return true if update is tentative, false otherwise
+ */
+ private boolean isMetricTentative(
+ com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+ return (metricUpdate.getName().getContext().containsKey("tentative")
+ && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true"));
+ }
+
+ /**
+ * Take a list of metric updates coming from the Dataflow service, and format it into a
+ * Metrics API MetricQueryResults instance.
+ * @param metricUpdates
+ * @return a populated MetricQueryResults object.
+ */
+ private MetricQueryResults populateMetricQueryResults(
+ List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
+ MetricsFilter filter) {
+ // Separate metric updates by name and by tentative/committed.
+ HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
+ tentativeByName = new HashMap<>();
+ HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
+ committedByName = new HashMap<>();
+ HashSet<MetricKey> metricHashKeys = new HashSet<>();
+
+ // If the Context of the metric update does not have a namespace, then these are not
+ // actual metrics counters.
+ for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
+ if (Objects.equal(update.getName().getOrigin(), "user") && isMetricTentative(update)
+ && update.getName().getContext().containsKey("namespace")) {
+ tentativeByName.put(metricHashKey(update), update);
+ metricHashKeys.add(metricHashKey(update));
+ } else if (Objects.equal(update.getName().getOrigin(), "user")
+ && update.getName().getContext().containsKey("namespace")
+ && !isMetricTentative(update)) {
+ committedByName.put(metricHashKey(update), update);
+ metricHashKeys.add(metricHashKey(update));
+ }
+ }
+ // Create the lists with the metric result information.
+ ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
+ ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
+ ImmutableList.builder();
+ for (MetricKey metricKey : metricHashKeys) {
+ String metricName = metricKey.metricName().name();
+ if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
+ || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
+ // Skip distribution metrics, as these are not yet properly supported.
+ LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow"
+ + "User Interface");
+ continue;
+ }
+ String namespace = metricKey.metricName().namespace();
+ String step = metricKey.stepName();
+ Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue();
+ Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue();
+ if (MetricFiltering.matches(filter, metricKey)) {
+ counterResults.add(DataflowMetricResult.create(
+ MetricName.named(namespace, metricName),
+ step, committed, attempted));
+ }
+ }
+ return DataflowMetricQueryResults.create(counterResults.build(), distributionResults.build());
+ }
+
+ private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) {
+ List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates;
+ ImmutableList<MetricResult<Long>> counters = ImmutableList.of();
+ ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of();
+ JobMetrics jobMetrics;
+ try {
+ jobMetrics = dataflowClient.getJobMetrics(dataflowPipelineJob.jobId);
+ } catch (IOException e) {
+ LOG.warn("Unable to query job metrics.\n");
+ return DataflowMetricQueryResults.create(counters, distributions);
+ }
+ metricUpdates = jobMetrics.getMetrics();
+ return populateMetricQueryResults(metricUpdates, filter);
+ }
+
+ public MetricQueryResults queryMetrics() {
+ return queryMetrics(null);
+ }
+
+ @Override
+ public MetricQueryResults queryMetrics(MetricsFilter filter) {
+ if (cachedMetricResults != null) {
+ // Metric results have been cached after the job ran.
+ return cachedMetricResults;
+ }
+ MetricQueryResults result = queryServiceForMetrics(filter);
+ if (dataflowPipelineJob.getState().isTerminal()) {
+ // Add current query result to the cache.
+ cachedMetricResults = result;
+ }
+ return result;
+ }
+
+ @AutoValue
+ abstract static class DataflowMetricQueryResults implements MetricQueryResults {
+ public static MetricQueryResults create(
+ Iterable<MetricResult<Long>> counters,
+ Iterable<MetricResult<DistributionResult>> distributions) {
+ return new AutoValue_DataflowMetrics_DataflowMetricQueryResults(counters, distributions);
+ }
+ }
+
+ @AutoValue
+ abstract static class DataflowMetricResult<T> implements MetricResult<T> {
+ // need to define these here so they appear in the correct order
+ // and the generated constructor is usable and consistent
+ public abstract MetricName name();
+ public abstract String step();
+ public abstract T committed();
+ public abstract T attempted();
+
+ public static <T> MetricResult<T> create(MetricName name, String scope,
+ T committed, T attempted) {
+ return new AutoValue_DataflowMetrics_DataflowMetricResult<T>(
+ name, scope, committed, attempted);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 950a9d3..1112fbb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -57,7 +57,7 @@ public class DataflowPipelineJob implements PipelineResult {
/**
* The id for the job.
*/
- private String jobId;
+ protected String jobId;
/**
* The {@link DataflowPipelineOptions} for the job.
@@ -71,6 +71,12 @@ public class DataflowPipelineJob implements PipelineResult {
private final DataflowClient dataflowClient;
/**
+ * MetricResults object for Dataflow Runner. It allows for querying of metrics from the Dataflow
+ * service.
+ */
+ private final DataflowMetrics dataflowMetrics;
+
+ /**
* The state the job terminated in or {@code null} if the job has not terminated.
*/
@Nullable
@@ -82,7 +88,7 @@ public class DataflowPipelineJob implements PipelineResult {
@Nullable
private DataflowPipelineJob replacedByJob = null;
- private DataflowAggregatorTransforms aggregatorTransforms;
+ protected DataflowAggregatorTransforms aggregatorTransforms;
/**
* The Metric Updates retrieved after the job was in a terminal state.
@@ -129,6 +135,7 @@ public class DataflowPipelineJob implements PipelineResult {
this.dataflowOptions = dataflowOptions;
this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions));
this.aggregatorTransforms = aggregatorTransforms;
+ this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
}
/**
@@ -462,8 +469,7 @@ public class DataflowPipelineJob implements PipelineResult {
@Override
public MetricResults metrics() {
- throw new UnsupportedOperationException(
- "The DataflowRunner does not currently support metrics.");
+ return dataflowMetrics;
}
private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
new file mode 100644
index 0000000..1017978
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult;
+import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.math.BigDecimal;
+import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.TestCredential;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link DataflowMetrics}.
+ */
+@RunWith(JUnit4.class)
+public class DataflowMetricsTest {
+ private static final String PROJECT_ID = "some-project";
+ private static final String JOB_ID = "1234";
+ private static final String REGION_ID = "some-region";
+ private static final String REPLACEMENT_JOB_ID = "4321";
+
+ @Mock
+ private Dataflow mockWorkflowClient;
+ @Mock
+ private Dataflow.Projects mockProjects;
+ @Mock
+ private Dataflow.Projects.Locations mockLocations;
+ @Mock
+ private Dataflow.Projects.Locations.Jobs mockJobs;
+
+ private TestDataflowPipelineOptions options;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
+ when(mockWorkflowClient.projects()).thenReturn(mockProjects);
+ when(mockProjects.locations()).thenReturn(mockLocations);
+ when(mockLocations.jobs()).thenReturn(mockJobs);
+
+ options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+ options.setDataflowClient(mockWorkflowClient);
+ options.setProject(PROJECT_ID);
+ options.setRunner(DataflowRunner.class);
+ options.setTempLocation("gs://fakebucket/temp");
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setGcpCredential(new TestCredential());
+ }
+
+ @Test
+ public void testEmptyMetricUpdates() throws IOException {
+ Job modelJob = new Job();
+ modelJob.setCurrentState(State.RUNNING.toString());
+
+ DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ when(job.getState()).thenReturn(State.RUNNING);
+ job.jobId = JOB_ID;
+
+ JobMetrics jobMetrics = new JobMetrics();
+ jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
+ DataflowClient dataflowClient = mock(DataflowClient.class);
+ when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+
+ DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+ MetricQueryResults result = dataflowMetrics.queryMetrics();
+ assertThat(ImmutableList.copyOf(result.counters()), is(empty()));
+ assertThat(ImmutableList.copyOf(result.distributions()), is(empty()));
+ }
+
+ @Test
+ public void testCachingMetricUpdates() throws IOException {
+ Job modelJob = new Job();
+ modelJob.setCurrentState(State.RUNNING.toString());
+
+ DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ when(job.getState()).thenReturn(State.DONE);
+ job.jobId = JOB_ID;
+
+ JobMetrics jobMetrics = new JobMetrics();
+ jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
+ DataflowClient dataflowClient = mock(DataflowClient.class);
+ when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+
+ DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+ verify(dataflowClient, times(0)).getJobMetrics(JOB_ID);
+ dataflowMetrics.queryMetrics(null);
+ verify(dataflowClient, times(1)).getJobMetrics(JOB_ID);
+ dataflowMetrics.queryMetrics(null);
+ verify(dataflowClient, times(1)).getJobMetrics(JOB_ID);
+ }
+
+ private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step,
+ long scalar, boolean tentative) {
+ MetricUpdate update = new MetricUpdate();
+ update.setScalar(new BigDecimal(scalar));
+
+ MetricStructuredName structuredName = new MetricStructuredName();
+ structuredName.setName(name);
+ structuredName.setOrigin("user");
+ ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<String, String>();
+ contextBuilder.put("step", step)
+ .put("namespace", namespace);
+ if (tentative) {
+ contextBuilder.put("tentative", "true");
+ }
+ structuredName.setContext(contextBuilder.build());
+ update.setName(structuredName);
+ return update;
+ }
+
+ @Test
+ public void testSingleCounterUpdates() throws IOException {
+ JobMetrics jobMetrics = new JobMetrics();
+ DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ when(job.getState()).thenReturn(State.RUNNING);
+ job.jobId = JOB_ID;
+
+ MetricUpdate update = new MetricUpdate();
+ long stepValue = 1234L;
+ update.setScalar(new BigDecimal(stepValue));
+
+ // The parser relies on the fact that one tentative and one committed metric update exist in
+ // the job metrics results.
+ MetricUpdate mu1 = makeCounterMetricUpdate("counterName", "counterNamespace",
+ "s2", 1234L, false);
+ MetricUpdate mu1Tentative = makeCounterMetricUpdate("counterName",
+ "counterNamespace", "s2", 1233L, true);
+ jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative));
+ DataflowClient dataflowClient = mock(DataflowClient.class);
+ when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+
+ DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+ MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+ assertThat(result.counters(), containsInAnyOrder(
+ attemptedMetricsResult("counterNamespace", "counterName", "s2", 1233L)));
+ assertThat(result.counters(), containsInAnyOrder(
+ committedMetricsResult("counterNamespace", "counterName", "s2", 1234L)));
+ }
+
+ @Test
+ public void testIgnoreDistributionButGetCounterUpdates() throws IOException {
+ JobMetrics jobMetrics = new JobMetrics();
+ DataflowClient dataflowClient = mock(DataflowClient.class);
+ when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+ DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ when(job.getState()).thenReturn(State.RUNNING);
+ job.jobId = JOB_ID;
+
+ // The parser relies on the fact that one tentative and one committed metric update exist in
+ // the job metrics results.
+ jobMetrics.setMetrics(ImmutableList.of(
+ makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
+ makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
+ makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, false),
+ makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s3", 0L, true)));
+
+ DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+ MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+ assertThat(result.counters(), containsInAnyOrder(
+ attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L)));
+ assertThat(result.counters(), containsInAnyOrder(
+ committedMetricsResult("counterNamespace", "counterName", "s2", 1233L)));
+ }
+
+ @Test
+ public void testMultipleCounterUpdates() throws IOException {
+ JobMetrics jobMetrics = new JobMetrics();
+ DataflowClient dataflowClient = mock(DataflowClient.class);
+ when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+ DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ when(job.getState()).thenReturn(State.RUNNING);
+ job.jobId = JOB_ID;
+
+ // The parser relies on the fact that one tentative and one committed metric update exist in
+ // the job metrics results.
+ jobMetrics.setMetrics(ImmutableList.of(
+ makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
+ makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
+ makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, false),
+ makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, true),
+ makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1200L, false),
+ makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true)));
+
+ DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+ MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+ assertThat(result.counters(), containsInAnyOrder(
+ attemptedMetricsResult("counterNamespace", "counterName", "s2", 1234L),
+ attemptedMetricsResult("otherNamespace", "otherCounter", "s3", 12L),
+ attemptedMetricsResult("otherNamespace", "counterName", "s4", 1233L)));
+ assertThat(result.counters(), containsInAnyOrder(
+ committedMetricsResult("counterNamespace", "counterName", "s2", 1233L),
+ committedMetricsResult("otherNamespace", "otherCounter", "s3", 12L),
+ committedMetricsResult("otherNamespace", "counterName", "s4", 1200L)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
new file mode 100644
index 0000000..a3e43e1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import com.google.common.base.Objects;
+import java.util.Set;
+
+/**
+ * Implements matching for metrics filters. Specifically, matching for metric name,
+ * namespace, and step name.
+ */
+public class MetricFiltering {
+
+ private MetricFiltering() { }
+
+ /** Matching logic is implemented here rather than in MetricsFilter because we would like
+ * MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with
+ * a Proto/JSON/etc. schema object.
+ * @param filter {@link MetricsFilter} with the matching information of an actual metric
+ * @param key {@link MetricKey} with the information of a metric
+ * @return whether the filter matches the key or not
+ */
+ public static boolean matches(MetricsFilter filter, MetricKey key) {
+ return filter == null
+ || (matchesName(key.metricName(), filter.names())
+ && matchesScope(key.stepName(), filter.steps()));
+ }
+
+ /**
+ * {@code subPathMatches(haystack, needle)} returns true if {@code needle}
+ * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b",
+ * but not "a/fool/bar/b" or "a/foo/bart/b".
+ */
+ public static boolean subPathMatches(String haystack, String needle) {
+ int location = haystack.indexOf(needle);
+ int end = location + needle.length();
+ if (location == -1) {
+ return false; // needle not found
+ } else if (location != 0 && haystack.charAt(location - 1) != '/') {
+ return false; // the first entry in needle wasn't exactly matched
+ } else if (end != haystack.length() && haystack.charAt(end) != '/') {
+ return false; // the last entry in needle wasn't exactly matched
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched
+ * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A
+ * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or
+ * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */
+ public static boolean matchesScope(String actualScope, Set<String> scopes) {
+ if (scopes.isEmpty() || scopes.contains(actualScope)) {
+ return true;
+ }
+
+ // If there is no perfect match, a stage name-level match is tried.
+ // This is done by a substring search over the levels of the scope.
+ // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C".
+ for (String scope : scopes) {
+ if (subPathMatches(actualScope, scope)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private static boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) {
+ if (nameFilters.isEmpty()) {
+ return true;
+ }
+ for (MetricNameFilter nameFilter : nameFilters) {
+ if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name()))
+ && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6de412a5/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
new file mode 100644
index 0000000..3e6a499
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricFiltering}.
+ */
+@RunWith(JUnit4.class)
+public class MetricFilteringTest {
+ private static final MetricName NAME1 = MetricName.named("ns1", "name1");
+
+
+ private boolean matchesSubPath(String actualScope, String subPath) {
+ return MetricFiltering.subPathMatches(actualScope, subPath);
+ }
+
+ @Test
+ public void testMatchesSubPath() {
+ assertTrue("Match of the first element",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1"));
+ assertTrue("Match of the first elements",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+ assertTrue("Match of the last elements",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1"));
+ assertFalse("Substring match but no subpath match",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1"));
+ assertFalse("Substring match from start - but no subpath match",
+ matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top"));
+ }
+
+ private boolean matchesScopeWithSingleFilter(String actualScope, String filter) {
+ Set<String> scopeFilter = new HashSet<String>();
+ scopeFilter.add(filter);
+ return MetricFiltering.matchesScope(actualScope, scopeFilter);
+ }
+
+ @Test
+ public void testMatchesScope() {
+ assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1"));
+ assertTrue(matchesScopeWithSingleFilter(
+ "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1"));
+ assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1"));
+ assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1"));
+ assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1"));
+ assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn"));
+ }
+}
[2/2] beam git commit: Closes #2223
Posted by bc...@apache.org.
Closes #2223
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59aa0dab
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59aa0dab
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59aa0dab
Branch: refs/heads/master
Commit: 59aa0dab728213cfdb049892b09ac09a1c3b3846
Parents: 1d9772a 6de412a
Author: bchambers <bc...@google.com>
Authored: Mon Mar 20 14:48:56 2017 -0700
Committer: bchambers <bc...@google.com>
Committed: Mon Mar 20 14:48:56 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectMetrics.java | 70 +-----
.../beam/runners/direct/DirectMetricsTest.java | 86 ++-----
.../beam/runners/dataflow/DataflowMetrics.java | 212 +++++++++++++++++
.../runners/dataflow/DataflowPipelineJob.java | 14 +-
.../runners/dataflow/DataflowMetricsTest.java | 236 +++++++++++++++++++
.../beam/sdk/metrics/MetricFiltering.java | 99 ++++++++
.../beam/sdk/metrics/MetricFilteringTest.java | 72 ++++++
7 files changed, 655 insertions(+), 134 deletions(-)
----------------------------------------------------------------------