You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:17 UTC
[21/50] [abbrv] beam git commit: [BEAM-2084] Adding querying facility
for distribution metrics in Java
[BEAM-2084] Adding querying facility for distribution metrics in Java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a48eefac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a48eefac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a48eefac
Branch: refs/heads/DSL_SQL
Commit: a48eeface8c5257f34e85c22f312ec03801b0f82
Parents: 7c36318
Author: Pablo <pa...@google.com>
Authored: Thu May 4 14:56:14 2017 -0700
Committer: Ben Chambers <bc...@bchambers-macbookpro2.roam.corp.google.com>
Committed: Tue Jul 18 09:58:47 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/examples/WordCount.java | 4 +
pom.xml | 2 +-
.../beam/runners/dataflow/DataflowMetrics.java | 310 +++++++++++++------
.../runners/dataflow/DataflowPipelineJob.java | 4 +
.../runners/dataflow/DataflowMetricsTest.java | 174 ++++++++++-
.../beam/sdk/metrics/MetricResultsMatchers.java | 2 +-
6 files changed, 388 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index bfa7eb3..2d568ce 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -21,6 +21,7 @@ import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -88,9 +89,12 @@ public class WordCount {
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
+ private final Distribution lineLenDist = Metrics.distribution(
+ ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(ProcessContext c) {
+ lineLenDist.update(c.element().length());
if (c.element().trim().isEmpty()) {
emptyLines.inc();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d9ab9ae..d27d367 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,7 +112,7 @@
<cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
<pubsubgrpc.version>0.1.0</pubsubgrpc.version>
<clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
- <dataflow.version>v1b3-rev196-1.22.0</dataflow.version>
+ <dataflow.version>v1b3-rev198-1.20.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
<datastore.client.version>1.4.0</datastore.client.version>
<datastore.proto.version>1.3.0</datastore.proto.version>
http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index 330cc7e..4c9c493 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -19,7 +19,9 @@ package org.apache.beam.runners.dataflow;
import static com.google.common.base.MoreObjects.firstNonNull;
+import com.google.api.client.util.ArrayMap;
import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.auto.value.AutoValue;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
@@ -28,6 +30,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.metrics.MetricFiltering;
import org.apache.beam.runners.core.construction.metrics.MetricKey;
import org.apache.beam.sdk.metrics.DistributionResult;
@@ -73,39 +76,6 @@ class DataflowMetrics extends MetricResults {
}
/**
- * Build an immutable map that serves as a hash key for a metric update.
- * @return a {@link MetricKey} that can be hashed and used to identify a metric.
- */
- private MetricKey metricHashKey(
- com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
- String fullStepName = metricUpdate.getName().getContext().get("step");
- if (dataflowPipelineJob.transformStepNames == null
- || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) {
- // If we can't translate internal step names to user step names, we just skip them
- // altogether.
- return null;
- }
- fullStepName = dataflowPipelineJob.transformStepNames
- .inverse().get(fullStepName).getFullName();
- return MetricKey.create(
- fullStepName,
- MetricName.named(
- metricUpdate.getName().getContext().get("namespace"),
- metricUpdate.getName().getName()));
- }
-
- /**
- * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative
- * update or not.
- * @return true if update is tentative, false otherwise
- */
- private boolean isMetricTentative(
- com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
- return (metricUpdate.getName().getContext().containsKey("tentative")
- && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true"));
- }
-
- /**
* Take a list of metric updates coming from the Dataflow service, and format it into a
* Metrics API MetricQueryResults instance.
* @param metricUpdates
@@ -114,68 +84,8 @@ class DataflowMetrics extends MetricResults {
private MetricQueryResults populateMetricQueryResults(
List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
MetricsFilter filter) {
- // Separate metric updates by name and by tentative/committed.
- HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
- tentativeByName = new HashMap<>();
- HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
- committedByName = new HashMap<>();
- HashSet<MetricKey> metricHashKeys = new HashSet<>();
-
- // If the Context of the metric update does not have a namespace, then these are not
- // actual metrics counters.
- for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
- if (Objects.equal(update.getName().getOrigin(), "user")
- && update.getName().getContext().containsKey("namespace")) {
- MetricKey key = metricHashKey(update);
- if (key == null) {
- continue;
- }
- metricHashKeys.add(key);
- if (isMetricTentative(update)) {
- tentativeByName.put(key, update);
- } else {
- committedByName.put(key, update);
- }
- }
- }
- // Create the lists with the metric result information.
- ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
- ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
- ImmutableList.builder();
- ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder();
- for (MetricKey metricKey : metricHashKeys) {
- if (!MetricFiltering.matches(filter, metricKey)) {
- // Skip unmatched metrics early.
- continue;
- }
-
- // This code is not robust to evolutions in the types of metrics that can be returned, so
- // wrap it in a try-catch and log errors.
- try {
- String metricName = metricKey.metricName().name();
- if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
- || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
- // Skip distribution metrics, as these are not yet properly supported.
- LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow"
- + " User Interface");
- continue;
- }
-
- String namespace = metricKey.metricName().namespace();
- String step = metricKey.stepName();
- Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue();
- Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue();
- counterResults.add(
- DataflowMetricResult.create(
- MetricName.named(namespace, metricName), step, committed, attempted));
- } catch (Exception e) {
- LOG.warn("Error handling metric {} for filter {}, skipping result.", metricKey, filter);
- }
- }
- return DataflowMetricQueryResults.create(
- counterResults.build(),
- distributionResults.build(),
- gaugeResults.build());
+ return DataflowMetricQueryResultsFactory.create(dataflowPipelineJob, metricUpdates, filter)
+ .build();
}
private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) {
@@ -214,6 +124,214 @@ class DataflowMetrics extends MetricResults {
return result;
}
+ private static class DataflowMetricResultExtractor {
+ private final ImmutableList.Builder<MetricResult<Long>> counterResults;
+ private final ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults;
+ private final ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults;
+ private final boolean isStreamingJob;
+
+ DataflowMetricResultExtractor(boolean isStreamingJob) {
+ counterResults = ImmutableList.builder();
+ distributionResults = ImmutableList.builder();
+ gaugeResults = ImmutableList.builder();
+ this.isStreamingJob = isStreamingJob;
+ }
+
+ public void addMetricResult(
+ MetricKey metricKey,
+ @Nullable com.google.api.services.dataflow.model.MetricUpdate committed,
+ @Nullable com.google.api.services.dataflow.model.MetricUpdate attempted) {
+ if (committed == null || attempted == null) {
+ LOG.warn(
+ "Metric {} did not have both a committed ({}) and tentative value ({}).",
+ metricKey, committed, attempted);
+ } else if (committed.getDistribution() != null && attempted.getDistribution() != null) {
+ // distribution metric
+ DistributionResult value = getDistributionValue(committed);
+ distributionResults.add(
+ DataflowMetricResult.create(
+ metricKey.metricName(),
+ metricKey.stepName(),
+ isStreamingJob ? null : value, // Committed
+ isStreamingJob ? value : null)); // Attempted
+ /* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
+ * In Dataflow batch jobs, only COMMITTED metrics are available.
+ * Reporting the appropriate metric depending on whether it's a batch/streaming job.
+ */
+ } else if (committed.getScalar() != null && attempted.getScalar() != null) {
+ // counter metric
+ Long value = getCounterValue(committed);
+ counterResults.add(
+ DataflowMetricResult.create(
+ metricKey.metricName(),
+ metricKey.stepName(),
+ isStreamingJob ? null : value, // Committed
+ isStreamingJob ? value : null)); // Attempted
+ /* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
+ * In Dataflow batch jobs, only COMMITTED metrics are available.
+ * Reporting the appropriate metric depending on whether it's a batch/streaming job.
+ */
+ } else {
+ // This is exceptionally unexpected. We expect matching user metrics to only have the
+ // value types provided by the Metrics API.
+ LOG.warn("Unexpected / mismatched metric types."
+ + " Please report JOB ID to Dataflow Support. Metric key: {}."
+ + " Committed / attempted Metric updates: {} / {}",
+ metricKey.toString(), committed.toString(), attempted.toString());
+ }
+ }
+
+ private Long getCounterValue(com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+ if (metricUpdate.getScalar() == null) {
+ return 0L;
+ }
+ return ((Number) metricUpdate.getScalar()).longValue();
+ }
+
+ private DistributionResult getDistributionValue(
+ com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+ if (metricUpdate.getDistribution() == null) {
+ return DistributionResult.ZERO;
+ }
+ ArrayMap distributionMap = (ArrayMap) metricUpdate.getDistribution();
+ Long count = ((Number) distributionMap.get("count")).longValue();
+ Long min = ((Number) distributionMap.get("min")).longValue();
+ Long max = ((Number) distributionMap.get("max")).longValue();
+ Long sum = ((Number) distributionMap.get("sum")).longValue();
+ return DistributionResult.create(sum, count, min, max);
+ }
+
+ public Iterable<MetricResult<DistributionResult>> getDistributionResults() {
+ return distributionResults.build();
+ }
+
+ public Iterable<MetricResult<Long>> getCounterResults() {
+ return counterResults.build();
+ }
+
+ public Iterable<MetricResult<GaugeResult>> getGaugeResults() {
+ return gaugeResults.build();
+ }
+ }
+
+ private static class DataflowMetricQueryResultsFactory {
+ private final Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates;
+ private final MetricsFilter filter;
+ private final HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
+ tentativeByName;
+ private final HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
+ committedByName;
+ private final HashSet<MetricKey> metricHashKeys;
+ private final DataflowPipelineJob dataflowPipelineJob;
+
+ public static DataflowMetricQueryResultsFactory create(DataflowPipelineJob dataflowPipelineJob,
+ Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
+ MetricsFilter filter) {
+ return new DataflowMetricQueryResultsFactory(dataflowPipelineJob, metricUpdates, filter);
+ }
+
+ private DataflowMetricQueryResultsFactory(DataflowPipelineJob dataflowPipelineJob,
+ Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
+ MetricsFilter filter) {
+ this.dataflowPipelineJob = dataflowPipelineJob;
+ this.metricUpdates = metricUpdates;
+ this.filter = filter;
+
+ tentativeByName = new HashMap<>();
+ committedByName = new HashMap<>();
+ metricHashKeys = new HashSet<>();
+ }
+
+ /**
+ * Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative
+ * update or not.
+ * @return true if update is tentative, false otherwise
+ */
+ private boolean isMetricTentative(
+ com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+ return (metricUpdate.getName().getContext().containsKey("tentative")
+ && Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true"));
+ }
+
+ /**
+ * Build an {@link MetricKey} that serves as a hash key for a metric update.
+ * @return a {@link MetricKey} that can be hashed and used to identify a metric.
+ */
+ private MetricKey getMetricHashKey(
+ com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
+ String fullStepName = metricUpdate.getName().getContext().get("step");
+ if (dataflowPipelineJob.transformStepNames == null
+ || !dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) {
+ // If we can't translate internal step names to user step names, we just skip them
+ // altogether.
+ return null;
+ }
+ fullStepName = dataflowPipelineJob.transformStepNames
+ .inverse().get(fullStepName).getFullName();
+ return MetricKey.create(
+ fullStepName,
+ MetricName.named(
+ metricUpdate.getName().getContext().get("namespace"),
+ metricUpdate.getName().getName()));
+ }
+
+ private void buildMetricsIndex() {
+ // If the Context of the metric update does not have a namespace, then these are not
+ // actual metrics counters.
+ for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
+ if (update.getName().getOrigin() != null
+ && (!update.getName().getOrigin().toLowerCase().equals("user")
+ || !update.getName().getContext().containsKey("namespace"))) {
+ // Skip non-user metrics, which should have both a "user" origin and a namespace.
+ continue;
+ }
+
+ MetricKey updateKey = getMetricHashKey(update);
+ if (updateKey == null || !MetricFiltering.matches(filter, updateKey)) {
+ // Skip unmatched metrics early.
+ continue;
+ }
+
+ metricHashKeys.add(updateKey);
+ if (isMetricTentative(update)) {
+ MetricUpdate previousUpdate = tentativeByName.put(updateKey, update);
+ if (previousUpdate != null) {
+ LOG.warn("Metric {} already had a tentative value of {}", updateKey, previousUpdate);
+ }
+ } else {
+ MetricUpdate previousUpdate = committedByName.put(updateKey, update);
+ if (previousUpdate != null) {
+ LOG.warn("Metric {} already had a committed value of {}", updateKey, previousUpdate);
+ }
+ }
+ }
+ }
+
+ public MetricQueryResults build() {
+ buildMetricsIndex();
+
+ DataflowMetricResultExtractor extractor = new DataflowMetricResultExtractor(
+ dataflowPipelineJob.getDataflowOptions().isStreaming());
+ for (MetricKey metricKey : metricHashKeys) {
+ String metricName = metricKey.metricName().name();
+ if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
+ || metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
+ // Skip distribution metrics, as these are not yet properly supported.
+ // TODO: remove this when distributions stop being broken up for the UI.
+ continue;
+ }
+
+ extractor.addMetricResult(metricKey,
+ committedByName.get(metricKey),
+ tentativeByName.get(metricKey));
+ }
+ return DataflowMetricQueryResults.create(
+ extractor.getCounterResults(),
+ extractor.getDistributionResults(),
+ extractor.getGaugeResults());
+ }
+ }
+
@AutoValue
abstract static class DataflowMetricQueryResults implements MetricQueryResults {
public static MetricQueryResults create(
@@ -231,7 +349,9 @@ class DataflowMetrics extends MetricResults {
// and the generated constructor is usable and consistent
public abstract MetricName name();
public abstract String step();
+ @Nullable
public abstract T committed();
+ @Nullable
public abstract T attempted();
public static <T> MetricResult<T> create(MetricName name, String scope,
http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 2d23983..e30d426 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -164,6 +164,10 @@ public class DataflowPipelineJob implements PipelineResult {
return dataflowOptions.getProject();
}
+ public DataflowPipelineOptions getDataflowOptions() {
+ return dataflowOptions;
+ }
+
/**
* Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index c3c741c..05fe687 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow;
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.committedMetricsResult;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
@@ -28,6 +29,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.api.client.util.ArrayMap;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMetrics;
@@ -38,9 +40,11 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.math.BigDecimal;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
+import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -95,6 +99,9 @@ public class DataflowMetricsTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+ when(options.isStreaming()).thenReturn(false);
+ when(job.getDataflowOptions()).thenReturn(options);
when(job.getState()).thenReturn(State.RUNNING);
job.jobId = JOB_ID;
@@ -115,6 +122,9 @@ public class DataflowMetricsTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+ when(options.isStreaming()).thenReturn(false);
+ when(job.getDataflowOptions()).thenReturn(options);
when(job.getState()).thenReturn(State.DONE);
job.jobId = JOB_ID;
@@ -131,11 +141,8 @@ public class DataflowMetricsTest {
verify(dataflowClient, times(1)).getJobMetrics(JOB_ID);
}
- private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step,
- long scalar, boolean tentative) {
- MetricUpdate update = new MetricUpdate();
- update.setScalar(new BigDecimal(scalar));
-
+ private MetricUpdate setStructuredName(MetricUpdate update, String name, String namespace,
+ String step, boolean tentative) {
MetricStructuredName structuredName = new MetricStructuredName();
structuredName.setName(name);
structuredName.setOrigin("user");
@@ -150,10 +157,34 @@ public class DataflowMetricsTest {
return update;
}
+ private MetricUpdate makeDistributionMetricUpdate(String name, String namespace, String step,
+ Long sum, Long count, Long min, Long max, boolean tentative) {
+ MetricUpdate update = new MetricUpdate();
+ ArrayMap<String, BigDecimal> distribution = ArrayMap.create();
+ distribution.add("count", new BigDecimal(count));
+ distribution.add("mean", new BigDecimal(sum / count));
+ distribution.add("sum", new BigDecimal(sum));
+ distribution.add("min", new BigDecimal(min));
+ distribution.add("max", new BigDecimal(max));
+ update.setDistribution(distribution);
+ return setStructuredName(update, name, namespace, step, tentative);
+ }
+
+ private MetricUpdate makeCounterMetricUpdate(String name, String namespace, String step,
+ long scalar, boolean tentative) {
+ MetricUpdate update = new MetricUpdate();
+ update.setScalar(new BigDecimal(scalar));
+ return setStructuredName(update, name, namespace, step, tentative);
+
+ }
+
@Test
public void testSingleCounterUpdates() throws IOException {
JobMetrics jobMetrics = new JobMetrics();
DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+ when(options.isStreaming()).thenReturn(false);
+ when(job.getDataflowOptions()).thenReturn(options);
when(job.getState()).thenReturn(State.RUNNING);
job.jobId = JOB_ID;
@@ -179,7 +210,7 @@ public class DataflowMetricsTest {
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
- attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
+ attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null)));
assertThat(result.counters(), containsInAnyOrder(
committedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
}
@@ -190,6 +221,9 @@ public class DataflowMetricsTest {
DataflowClient dataflowClient = mock(DataflowClient.class);
when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+ when(options.isStreaming()).thenReturn(false);
+ when(job.getDataflowOptions()).thenReturn(options);
when(job.getState()).thenReturn(State.RUNNING);
job.jobId = JOB_ID;
@@ -202,24 +236,97 @@ public class DataflowMetricsTest {
// the job metrics results.
jobMetrics.setMetrics(ImmutableList.of(
makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
- makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
+ makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, true),
makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, false),
makeCounterMetricUpdate("otherCounter[MIN]", "otherNamespace", "s2", 0L, true)));
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
- attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
+ attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null)));
assertThat(result.counters(), containsInAnyOrder(
committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
}
@Test
+ public void testDistributionUpdates() throws IOException {
+ JobMetrics jobMetrics = new JobMetrics();
+ DataflowClient dataflowClient = mock(DataflowClient.class);
+ when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+ DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+ when(options.isStreaming()).thenReturn(false);
+ when(job.getDataflowOptions()).thenReturn(options);
+ when(job.getState()).thenReturn(State.RUNNING);
+ job.jobId = JOB_ID;
+
+ AppliedPTransform<?, ?, ?> myStep2 = mock(AppliedPTransform.class);
+ when(myStep2.getFullName()).thenReturn("myStepName");
+ job.transformStepNames = HashBiMap.create();
+ job.transformStepNames.put(myStep2, "s2");
+
+ // The parser relies on the fact that one tentative and one committed metric update exist in
+ // the job metrics results.
+ jobMetrics.setMetrics(ImmutableList.of(
+ makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2",
+ 18L, 2L, 2L, 16L, false),
+ makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2",
+ 18L, 2L, 2L, 16L, true)));
+
+ DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+ MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+ assertThat(result.distributions(), contains(
+ attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName",
+ (DistributionResult) null)));
+ assertThat(result.distributions(), contains(
+ committedMetricsResult("distributionNamespace", "distributionName", "myStepName",
+ DistributionResult.create(18, 2, 2, 16))));
+ }
+
+ @Test
+ public void testDistributionUpdatesStreaming() throws IOException {
+ JobMetrics jobMetrics = new JobMetrics();
+ DataflowClient dataflowClient = mock(DataflowClient.class);
+ when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+ DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+ when(options.isStreaming()).thenReturn(true);
+ when(job.getDataflowOptions()).thenReturn(options);
+ when(job.getState()).thenReturn(State.RUNNING);
+ job.jobId = JOB_ID;
+
+ AppliedPTransform<?, ?, ?> myStep2 = mock(AppliedPTransform.class);
+ when(myStep2.getFullName()).thenReturn("myStepName");
+ job.transformStepNames = HashBiMap.create();
+ job.transformStepNames.put(myStep2, "s2");
+
+ // The parser relies on the fact that one tentative and one committed metric update exist in
+ // the job metrics results.
+ jobMetrics.setMetrics(ImmutableList.of(
+ makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2",
+ 18L, 2L, 2L, 16L, false),
+ makeDistributionMetricUpdate("distributionName", "distributionNamespace", "s2",
+ 18L, 2L, 2L, 16L, true)));
+
+ DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+ MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+ assertThat(result.distributions(), contains(
+ committedMetricsResult("distributionNamespace", "distributionName", "myStepName",
+ (DistributionResult) null)));
+ assertThat(result.distributions(), contains(
+ attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName",
+ DistributionResult.create(18, 2, 2, 16))));
+ }
+
+ @Test
public void testMultipleCounterUpdates() throws IOException {
JobMetrics jobMetrics = new JobMetrics();
DataflowClient dataflowClient = mock(DataflowClient.class);
when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+ when(options.isStreaming()).thenReturn(false);
+ when(job.getDataflowOptions()).thenReturn(options);
when(job.getState()).thenReturn(State.RUNNING);
job.jobId = JOB_ID;
@@ -251,12 +358,57 @@ public class DataflowMetricsTest {
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
- attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L),
- attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
- attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1233L)));
+ attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null),
+ attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null),
+ attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null)));
assertThat(result.counters(), containsInAnyOrder(
committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L),
committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
committedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L)));
}
+
+ @Test
+ public void testMultipleCounterUpdatesStreaming() throws IOException {
+ JobMetrics jobMetrics = new JobMetrics();
+ DataflowClient dataflowClient = mock(DataflowClient.class);
+ when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+ DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+ when(options.isStreaming()).thenReturn(true);
+ when(job.getDataflowOptions()).thenReturn(options);
+ when(job.getState()).thenReturn(State.RUNNING);
+ job.jobId = JOB_ID;
+
+ AppliedPTransform<?, ?, ?> myStep2 = mock(AppliedPTransform.class);
+ when(myStep2.getFullName()).thenReturn("myStepName");
+ job.transformStepNames = HashBiMap.create();
+ job.transformStepNames.put(myStep2, "s2");
+ AppliedPTransform<?, ?, ?> myStep3 = mock(AppliedPTransform.class);
+ when(myStep3.getFullName()).thenReturn("myStepName3");
+ job.transformStepNames.put(myStep3, "s3");
+ AppliedPTransform<?, ?, ?> myStep4 = mock(AppliedPTransform.class);
+ when(myStep4.getFullName()).thenReturn("myStepName4");
+ job.transformStepNames.put(myStep4, "s4");
+
+ // The parser relies on the fact that one tentative and one committed metric update exist in
+ // the job metrics results.
+ jobMetrics.setMetrics(ImmutableList.of(
+ makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1233L, false),
+ makeCounterMetricUpdate("counterName", "counterNamespace", "s2", 1234L, true),
+ makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, false),
+ makeCounterMetricUpdate("otherCounter", "otherNamespace", "s3", 12L, true),
+ makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1200L, false),
+ makeCounterMetricUpdate("counterName", "otherNamespace", "s4", 1233L, true)));
+
+ DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+ MetricQueryResults result = dataflowMetrics.queryMetrics(null);
+ assertThat(result.counters(), containsInAnyOrder(
+ committedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null),
+ committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null),
+ committedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null)));
+ assertThat(result.counters(), containsInAnyOrder(
+ attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L),
+ attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
+ attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L)));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a48eefac/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
index 5031952..030a759 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricResultsMatchers.java
@@ -96,7 +96,7 @@ public class MetricResultsMatchers {
if (result1 instanceof GaugeResult) {
return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value());
} else {
- return result1.equals(result2);
+ return Objects.equals(result1, result2);
}
}