You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/03/27 16:38:33 UTC
[1/2] beam git commit: [BEAM-1617] Add Gauge metric type to Java SDK
Repository: beam
Updated Branches:
refs/heads/master 026aec856 -> b26e10b44
[BEAM-1617] Add Gauge metric type to Java SDK
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63e953c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63e953c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63e953c6
Branch: refs/heads/master
Commit: 63e953c6026192e5e027f0bac183b86992480127
Parents: 026aec8
Author: Aviem Zur <av...@gmail.com>
Authored: Fri Mar 3 14:42:23 2017 +0200
Committer: Aviem Zur <av...@gmail.com>
Committed: Mon Mar 27 19:01:58 2017 +0300
----------------------------------------------------------------------
.../beam/runners/direct/DirectMetrics.java | 59 +++++++++++++-
.../beam/runners/direct/DirectMetricsTest.java | 42 ++++++++--
.../beam/runners/dataflow/DataflowMetrics.java | 16 +++-
.../runners/spark/metrics/SparkBeamMetric.java | 4 +
.../spark/metrics/SparkMetricResults.java | 27 +++++++
.../spark/metrics/SparkMetricsContainer.java | 20 +++++
.../java/org/apache/beam/sdk/metrics/Gauge.java | 32 ++++++++
.../org/apache/beam/sdk/metrics/GaugeCell.java | 60 +++++++++++++++
.../org/apache/beam/sdk/metrics/GaugeData.java | 81 ++++++++++++++++++++
.../apache/beam/sdk/metrics/GaugeResult.java | 61 +++++++++++++++
.../beam/sdk/metrics/MetricQueryResults.java | 3 +
.../apache/beam/sdk/metrics/MetricUpdates.java | 11 ++-
.../org/apache/beam/sdk/metrics/Metrics.java | 35 +++++++++
.../beam/sdk/metrics/MetricsContainer.java | 26 ++++++-
.../apache/beam/sdk/metrics/GaugeCellTest.java | 48 ++++++++++++
.../apache/beam/sdk/metrics/MetricMatchers.java | 12 ++-
.../apache/beam/sdk/metrics/MetricsTest.java | 37 +++++----
17 files changed, 539 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index f04dc21..fb126fb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -33,6 +33,8 @@ import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.DistributionData;
import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricFiltering;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
@@ -193,6 +195,28 @@ class DirectMetrics extends MetricResults {
}
};
+ private static final MetricAggregation<GaugeData, GaugeResult> GAUGE =
+ new MetricAggregation<GaugeData, GaugeResult>() {
+ @Override
+ public GaugeData zero() {
+ return GaugeData.empty();
+ }
+
+ @Override
+ public GaugeData combine(Iterable<GaugeData> updates) {
+ GaugeData result = GaugeData.empty();
+ for (GaugeData update : updates) {
+ result = result.combine(update);
+ }
+ return result;
+ }
+
+ @Override
+ public GaugeResult extract(GaugeData data) {
+ return data.extractResult();
+ }
+ };
+
/** The current values of counters in memory. */
private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters =
new MetricsMap<>(new MetricsMap.Factory<MetricKey, DirectMetric<Long, Long>>() {
@@ -210,13 +234,23 @@ class DirectMetrics extends MetricResults {
return new DirectMetric<>(DISTRIBUTION);
}
});
+ private MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauges =
+ new MetricsMap<>(
+ new MetricsMap.Factory<MetricKey, DirectMetric<GaugeData, GaugeResult>>() {
+ @Override
+ public DirectMetric<GaugeData, GaugeResult> createInstance(
+ MetricKey unusedKey) {
+ return new DirectMetric<>(GAUGE);
+ }
+ });
@AutoValue
abstract static class DirectMetricQueryResults implements MetricQueryResults {
public static MetricQueryResults create(
Iterable<MetricResult<Long>> counters,
- Iterable<MetricResult<DistributionResult>> distributions) {
- return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions);
+ Iterable<MetricResult<DistributionResult>> distributions,
+ Iterable<MetricResult<GaugeResult>> gauges) {
+ return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions, gauges);
}
}
@@ -248,8 +282,15 @@ class DirectMetrics extends MetricResults {
: distributions.entries()) {
maybeExtractResult(filter, distributionResults, distribution);
}
+ ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults =
+ ImmutableList.builder();
+ for (Entry<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauge
+ : gauges.entries()) {
+ maybeExtractResult(filter, gaugeResults, gauge);
+ }
- return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build());
+ return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build(),
+ gaugeResults.build());
}
private <ResultT> void maybeExtractResult(
@@ -274,6 +315,10 @@ class DirectMetrics extends MetricResults {
distributions.get(distribution.getKey())
.updatePhysical(bundle, distribution.getUpdate());
}
+ for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) {
+ gauges.get(gauge.getKey())
+ .updatePhysical(bundle, gauge.getUpdate());
+ }
}
public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
@@ -284,6 +329,10 @@ class DirectMetrics extends MetricResults {
distributions.get(distribution.getKey())
.commitPhysical(bundle, distribution.getUpdate());
}
+ for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) {
+ gauges.get(gauge.getKey())
+ .commitPhysical(bundle, gauge.getUpdate());
+ }
}
/** Apply metric updates that represent new logical values from a bundle being committed. */
@@ -295,5 +344,9 @@ class DirectMetrics extends MetricResults {
distributions.get(distribution.getKey())
.commitLogical(bundle, distribution.getUpdate());
}
+ for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) {
+ gauges.get(gauge.getKey())
+ .commitLogical(bundle, gauge.getUpdate());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 7183124..ee51e9a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -29,12 +29,15 @@ import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.DistributionData;
import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricUpdates;
import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -56,6 +59,7 @@ public class DirectMetricsTest {
private static final MetricName NAME1 = MetricName.named("ns1", "name1");
private static final MetricName NAME2 = MetricName.named("ns1", "name2");
private static final MetricName NAME3 = MetricName.named("ns2", "name1");
+ private static final MetricName NAME4 = MetricName.named("ns2", "name2");
private DirectMetrics metrics = new DirectMetrics();
@@ -73,14 +77,20 @@ public class DirectMetricsTest {
MetricUpdate.create(MetricKey.create("step1", NAME2), 8L)),
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step1", NAME1),
- DistributionData.create(8, 2, 3, 5)))));
+ DistributionData.create(8, 2, 3, 5))),
+ ImmutableList.of(
+ MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(15L)))
+ ));
metrics.commitLogical(bundle1, MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
MetricUpdate.create(MetricKey.create("step1", NAME2), 4L)),
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step1", NAME1),
- DistributionData.create(4, 1, 4, 4)))));
+ DistributionData.create(4, 1, 4, 4))),
+ ImmutableList.of(
+ MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L)))
+ ));
MetricQueryResults results = metrics.queryMetrics(MetricsFilter.builder().build());
assertThat(results.counters(), containsInAnyOrder(
@@ -95,6 +105,12 @@ public class DirectMetricsTest {
attemptedMetricsResult("ns1", "name1", "step1", DistributionResult.ZERO)));
assertThat(results.distributions(), contains(
committedMetricsResult("ns1", "name1", "step1", DistributionResult.create(12, 3, 3, 5))));
+ assertThat(results.gauges(), contains(
+ attemptedMetricsResult("ns2", "name2", "step1", GaugeResult.empty())
+ ));
+ assertThat(results.gauges(), contains(
+ committedMetricsResult("ns2", "name2", "step1", GaugeResult.create(27L, Instant.now()))
+ ));
}
@SuppressWarnings("unchecked")
@@ -104,12 +120,16 @@ public class DirectMetricsTest {
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)),
- ImmutableList.<MetricUpdate<DistributionData>>of()));
+ ImmutableList.<MetricUpdate<DistributionData>>of(),
+ ImmutableList.<MetricUpdate<GaugeData>>of()
+ ));
metrics.updatePhysical(bundle1, MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)),
- ImmutableList.<MetricUpdate<DistributionData>>of()));
+ ImmutableList.<MetricUpdate<DistributionData>>of(),
+ ImmutableList.<MetricUpdate<GaugeData>>of()
+ ));
MetricQueryResults results = metrics.queryMetrics(
MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build());
@@ -132,12 +152,14 @@ public class DirectMetricsTest {
ImmutableList.of(
MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
- ImmutableList.<MetricUpdate<DistributionData>>of()));
+ ImmutableList.<MetricUpdate<DistributionData>>of(),
+ ImmutableList.<MetricUpdate<GaugeData>>of()));
metrics.updatePhysical(bundle1, MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
- ImmutableList.<MetricUpdate<DistributionData>>of()));
+ ImmutableList.<MetricUpdate<DistributionData>>of(),
+ ImmutableList.<MetricUpdate<GaugeData>>of()));
MetricQueryResults results = metrics.queryMetrics(
MetricsFilter.builder().addStep("Outer1").build());
@@ -161,12 +183,16 @@ public class DirectMetricsTest {
ImmutableList.of(
MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
- ImmutableList.<MetricUpdate<DistributionData>>of()));
+ ImmutableList.<MetricUpdate<DistributionData>>of(),
+ ImmutableList.<MetricUpdate<GaugeData>>of()
+ ));
metrics.updatePhysical(bundle1, MetricUpdates.create(
ImmutableList.of(
MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
- ImmutableList.<MetricUpdate<DistributionData>>of()));
+ ImmutableList.<MetricUpdate<DistributionData>>of(),
+ ImmutableList.<MetricUpdate<GaugeData>>of()
+ ));
MetricQueryResults results = metrics.queryMetrics(
MetricsFilter.builder().addStep("Top1/Outer1").build());
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index c0d1883..9d28ef6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricFiltering;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
@@ -130,6 +131,7 @@ class DataflowMetrics extends MetricResults {
ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
ImmutableList.builder();
+ ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder();
for (MetricKey metricKey : metricHashKeys) {
String metricName = metricKey.metricName().name();
if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
@@ -149,19 +151,23 @@ class DataflowMetrics extends MetricResults {
step, committed, attempted));
}
}
- return DataflowMetricQueryResults.create(counterResults.build(), distributionResults.build());
+ return DataflowMetricQueryResults.create(
+ counterResults.build(),
+ distributionResults.build(),
+ gaugeResults.build());
}
private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) {
List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates;
ImmutableList<MetricResult<Long>> counters = ImmutableList.of();
ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of();
+ ImmutableList<MetricResult<GaugeResult>> gauges = ImmutableList.of();
JobMetrics jobMetrics;
try {
jobMetrics = dataflowClient.getJobMetrics(dataflowPipelineJob.jobId);
} catch (IOException e) {
LOG.warn("Unable to query job metrics.\n");
- return DataflowMetricQueryResults.create(counters, distributions);
+ return DataflowMetricQueryResults.create(counters, distributions, gauges);
}
metricUpdates = jobMetrics.getMetrics();
return populateMetricQueryResults(metricUpdates, filter);
@@ -189,8 +195,10 @@ class DataflowMetrics extends MetricResults {
abstract static class DataflowMetricQueryResults implements MetricQueryResults {
public static MetricQueryResults create(
Iterable<MetricResult<Long>> counters,
- Iterable<MetricResult<DistributionResult>> distributions) {
- return new AutoValue_DataflowMetrics_DataflowMetricQueryResults(counters, distributions);
+ Iterable<MetricResult<DistributionResult>> distributions,
+ Iterable<MetricResult<GaugeResult>> gauges) {
+ return
+ new AutoValue_DataflowMetrics_DataflowMetricQueryResults(counters, distributions, gauges);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
index 8328a1a..2d445a9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
@@ -53,6 +54,9 @@ class SparkBeamMetric implements Metric {
metrics.put(renderName(metricResult) + ".max", result.max());
metrics.put(renderName(metricResult) + ".mean", result.mean());
}
+ for (MetricResult<GaugeResult> metricResult : metricQueryResults.gauges()) {
+ metrics.put(renderName(metricResult), metricResult.attempted().value());
+ }
return metrics;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
index a9651e2..c02027a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
@@ -25,6 +25,8 @@ import com.google.common.collect.FluentIterable;
import java.util.Set;
import org.apache.beam.sdk.metrics.DistributionData;
import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricNameFilter;
@@ -72,6 +74,16 @@ public class SparkMetricResults extends MetricResults {
.toList();
}
+ @Override
+ public Iterable<MetricResult<GaugeResult>> gauges() {
+ return
+ FluentIterable
+ .from(SparkMetricsContainer.getGauges())
+ .filter(matchesFilter(filter))
+ .transform(TO_GAUGE_RESULT)
+ .toList();
+ }
+
private Predicate<MetricUpdate<?>> matchesFilter(final MetricsFilter filter) {
return new Predicate<MetricUpdate<?>>() {
@Override
@@ -146,6 +158,21 @@ public class SparkMetricResults extends MetricResults {
}
};
+ private static final Function<MetricUpdate<GaugeData>, MetricResult<GaugeResult>>
+ TO_GAUGE_RESULT =
+ new Function<MetricUpdate<GaugeData>, MetricResult<GaugeResult>>() {
+ @Override
+ public MetricResult<GaugeResult> apply(MetricUpdate<GaugeData> metricResult) {
+ if (metricResult != null) {
+ MetricKey key = metricResult.getKey();
+ return new SparkMetricResult<>(key.metricName(), key.stepName(),
+ metricResult.getUpdate().extractResult());
+ } else {
+ return null;
+ }
+ }
+ };
+
private static class SparkMetricResult<T> implements MetricResult<T> {
private final MetricName name;
private final String step;
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
index d376ce3..b6aa178 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.GaugeData;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricUpdates;
import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
@@ -47,6 +48,7 @@ public class SparkMetricsContainer implements Serializable {
private final Map<MetricKey, MetricUpdate<Long>> counters = new HashMap<>();
private final Map<MetricKey, MetricUpdate<DistributionData>> distributions = new HashMap<>();
+ private final Map<MetricKey, MetricUpdate<GaugeData>> gauges = new HashMap<>();
public MetricsContainer getContainer(String stepName) {
if (metricsContainers == null) {
@@ -76,9 +78,14 @@ public class SparkMetricsContainer implements Serializable {
return sparkMetricsContainer.distributions.values();
}
+ static Collection<MetricUpdate<GaugeData>> getGauges() {
+ return getInstance().gauges.values();
+ }
+
SparkMetricsContainer update(SparkMetricsContainer other) {
this.updateCounters(other.counters.values());
this.updateDistributions(other.distributions.values());
+ this.updateGauges(other.gauges.values());
return this;
}
@@ -101,6 +108,7 @@ public class SparkMetricsContainer implements Serializable {
MetricUpdates cumulative = container.getCumulative();
this.updateCounters(cumulative.counterUpdates());
this.updateDistributions(cumulative.distributionUpdates());
+ this.updateGauges(cumulative.gaugeUpdates());
}
}
}
@@ -123,6 +131,18 @@ public class SparkMetricsContainer implements Serializable {
}
}
+ private void updateGauges(Iterable<MetricUpdate<GaugeData>> updates) {
+ for (MetricUpdate<GaugeData> update : updates) {
+ MetricKey key = update.getKey();
+ MetricUpdate<GaugeData> current = gauges.get(key);
+ gauges.put(
+ key,
+ current != null
+ ? MetricUpdate.create(key, current.getUpdate().combine(update.getUpdate()))
+ : update);
+ }
+ }
+
private static class MetricsContainerCacheLoader extends CacheLoader<String, MetricsContainer> {
@SuppressWarnings("NullableProblems")
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java
new file mode 100644
index 0000000..6c03c80
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * A metric that reports the latest value out of reported values.
+ *
+ * <p>Since metrics are collected from many workers the value may not be the absolute last,
+ * but one of the latest values.</p>
+ */
+@Experimental(Experimental.Kind.METRICS)
+public interface Gauge extends Metric {
+ /** Set current value for this gauge. */
+ void set(long value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
new file mode 100644
index 0000000..35ae822
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * Tracks the current value (and delta) for a {@link Gauge} metric.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is within a runner where
+ * a gauge is being reported for a specific step (rather than the gauge in the current
+ * context). In that case retrieving the underlying cell and reporting directly to it avoids a step
+ * of indirection.
+ */
+@Experimental(Experimental.Kind.METRICS)
+public class GaugeCell implements MetricCell<Gauge, GaugeData>, Gauge {
+
+ private final DirtyState dirty = new DirtyState();
+ private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty());
+
+ @Override
+ public void set(long value) {
+ GaugeData original;
+ do {
+ original = gaugeValue.get();
+ } while (!gaugeValue.compareAndSet(original, original.combine(GaugeData.create(value))));
+ dirty.afterModification();
+ }
+
+ @Override
+ public DirtyState getDirty() {
+ return dirty;
+ }
+
+ @Override
+ public GaugeData getCumulative() {
+ return gaugeValue.get();
+ }
+
+ @Override
+ public Gauge getInterface() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java
new file mode 100644
index 0000000..bf3401d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.joda.time.Instant;
+
+/**
+ * Data describing the gauge. This should retain enough detail that it can be combined with
+ * other {@link GaugeData}.
+ */
+@AutoValue
+public abstract class GaugeData implements Serializable {
+
+ public abstract long value();
+
+ public abstract Instant timestamp();
+
+ public static GaugeData create(long value) {
+ return new AutoValue_GaugeData(value, Instant.now());
+ }
+
+ public static GaugeData empty() {
+ return EmptyGaugeData.INSTANCE;
+ }
+
+ public GaugeData combine(GaugeData other) {
+ if (this.timestamp().isAfter(other.timestamp())) {
+ return this;
+ } else {
+ return other;
+ }
+ }
+
+ public GaugeResult extractResult() {
+ return GaugeResult.create(value(), timestamp());
+ }
+
+ /**
+ * Empty {@link GaugeData}, representing no values reported.
+ */
+ public static class EmptyGaugeData extends GaugeData {
+
+ private static final EmptyGaugeData INSTANCE = new EmptyGaugeData();
+ private static final Instant EPOCH = new Instant(0);
+
+ private EmptyGaugeData() {
+ }
+
+ @Override
+ public long value() {
+ return -1L;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return EPOCH;
+ }
+
+ @Override
+ public GaugeResult extractResult() {
+ return GaugeResult.empty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
new file mode 100644
index 0000000..878776a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import org.joda.time.Instant;
+
+/**
+ * The result of a {@link Gauge} metric.
+ */
+@AutoValue
+public abstract class GaugeResult {
+ public abstract long value();
+
+ public abstract Instant timestamp();
+
+ public static GaugeResult create(long value, Instant timestamp) {
+ return new AutoValue_GaugeResult(value, timestamp);
+ }
+
+ public static GaugeResult empty() {
+ return EmptyGaugeResult.INSTANCE;
+ }
+
+ /**
+ * Empty {@link GaugeResult}, representing no values reported.
+ */
+ public static class EmptyGaugeResult extends GaugeResult {
+
+ private static final EmptyGaugeResult INSTANCE = new EmptyGaugeResult();
+ private static final Instant EPOCH = new Instant(0);
+
+ private EmptyGaugeResult() {
+ }
+
+ @Override
+ public long value() {
+ return -1L;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return EPOCH;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
index 2241ba8..a7838ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
@@ -30,4 +30,7 @@ public interface MetricQueryResults {
/** Return the metric results for the distributions that matched the filter. */
Iterable<MetricResult<DistributionResult>> distributions();
+
+ /** Return the metric results for the gauges that matched the filter. */
+ Iterable<MetricResult<GaugeResult>> gauges();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
index 56466d8..9cf6a5c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
@@ -33,7 +33,8 @@ public abstract class MetricUpdates {
public static final MetricUpdates EMPTY = MetricUpdates.create(
Collections.<MetricUpdate<Long>>emptyList(),
- Collections.<MetricUpdate<DistributionData>>emptyList());
+ Collections.<MetricUpdate<DistributionData>>emptyList(),
+ Collections.<MetricUpdate<GaugeData>>emptyList());
/**
* Representation of a single metric update.
@@ -64,10 +65,14 @@ public abstract class MetricUpdates {
/** All of the distribution updates. */
public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates();
+ /** All of the gauges updates. */
+ public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates();
+
/** Create a new {@link MetricUpdates} bundle. */
public static MetricUpdates create(
Iterable<MetricUpdate<Long>> counterUpdates,
- Iterable<MetricUpdate<DistributionData>> distributionUpdates) {
- return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates);
+ Iterable<MetricUpdate<DistributionData>> distributionUpdates,
+ Iterable<MetricUpdate<GaugeData>> gaugeUpdates) {
+ return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates, gaugeUpdates);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 045e076..121698d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -58,6 +58,22 @@ public class Metrics {
return new DelegatingDistribution(MetricName.named(namespace, name));
}
+ /**
+ * Create a metric that can have its new value set, and is aggregated by taking the last reported
+ * value.
+ */
+ public static Gauge gauge(String namespace, String name) {
+ return new DelegatingGauge(MetricName.named(namespace, name));
+ }
+
+ /**
+ * Create a metric that can have its new value set, and is aggregated by taking the last reported
+ * value.
+ */
+ public static Gauge gauge(Class<?> namespace, String name) {
+ return new DelegatingGauge(MetricName.named(namespace, name));
+ }
+
/** Implementation of {@link Counter} that delegates to the instance for the current context. */
private static class DelegatingCounter implements Counter, Serializable {
private final MetricName name;
@@ -108,4 +124,23 @@ public class Metrics {
}
}
}
+
+ /**
+ * Implementation of {@link Gauge} that delegates to the instance for the current context.
+ */
+ private static class DelegatingGauge implements Gauge, Serializable {
+ private final MetricName name;
+
+ private DelegatingGauge(MetricName name) {
+ this.name = name;
+ }
+
+ @Override
+ public void set(long value) {
+ MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+ if (container != null) {
+ container.getGauge(name).set(value);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index ba5a343..5812ec6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -57,6 +57,14 @@ public class MetricsContainer {
}
});
+ private MetricsMap<MetricName, GaugeCell> gauges =
+ new MetricsMap<>(new MetricsMap.Factory<MetricName, GaugeCell>() {
+ @Override
+ public GaugeCell createInstance(MetricName unusedKey) {
+ return new GaugeCell();
+ }
+ });
+
/**
* Create a new {@link MetricsContainer} associated with the given {@code stepName}.
*/
@@ -72,10 +80,22 @@ public class MetricsContainer {
return counters.get(metricName);
}
+ /**
+ * Return the {@link DistributionCell} that should be used for implementing the given
+ * {@code metricName} in this container.
+ */
public DistributionCell getDistribution(MetricName metricName) {
return distributions.get(metricName);
}
+ /**
+ * Return the {@link GaugeCell} that should be used for implementing the given
+ * {@code metricName} in this container.
+ */
+ public GaugeCell getGauge(MetricName metricName) {
+ return gauges.get(metricName);
+ }
+
private <UpdateT, CellT extends MetricCell<?, UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
MetricsMap<MetricName, CellT> cells) {
@@ -96,7 +116,8 @@ public class MetricsContainer {
public MetricUpdates getUpdates() {
return MetricUpdates.create(
extractUpdates(counters),
- extractUpdates(distributions));
+ extractUpdates(distributions),
+ extractUpdates(gauges));
}
private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) {
@@ -132,6 +153,7 @@ public class MetricsContainer {
public MetricUpdates getCumulative() {
return MetricUpdates.create(
extractCumulatives(counters),
- extractCumulatives(distributions));
+ extractCumulatives(distributions),
+ extractCumulatives(gauges));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java
new file mode 100644
index 0000000..d8ef928
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+
+/**
+ * Tests for {@link GaugeCell}.
+ */
+public class GaugeCellTest {
+ private GaugeCell cell = new GaugeCell();
+
+ @Test
+ public void testDeltaAndCumulative() {
+ cell.set(5);
+ cell.set(7);
+ assertThat(cell.getCumulative().value(), equalTo(GaugeData.create(7).value()));
+ assertThat("getCumulative is idempotent",
+ cell.getCumulative().value(), equalTo(7L));
+
+ assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+ cell.getDirty().afterCommit();
+ assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+ cell.set(30);
+ assertThat(cell.getCumulative().value(), equalTo(30L));
+
+ assertThat("Adding a new value made the cell dirty",
+ cell.getDirty().beforeCommit(), equalTo(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
index 5de8894..2251c82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
@@ -92,7 +92,7 @@ public class MetricMatchers {
return Objects.equals(namespace, item.name().namespace())
&& Objects.equals(name, item.name().name())
&& item.step().contains(step)
- && Objects.equals(attempted, item.attempted());
+ && metricResultsEqual(attempted, item.attempted());
}
@Override
@@ -135,7 +135,7 @@ public class MetricMatchers {
return Objects.equals(namespace, item.name().namespace())
&& Objects.equals(name, item.name().name())
&& item.step().contains(step)
- && Objects.equals(committed, item.committed());
+ && metricResultsEqual(committed, item.committed());
}
@Override
@@ -165,6 +165,14 @@ public class MetricMatchers {
};
}
+ private static <T> boolean metricResultsEqual(T result1, T result2) {
+ if (result1 instanceof GaugeResult) {
+ return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value());
+ } else {
+ return result1.equals(result2);
+ }
+ }
+
static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax(
final String namespace, final String name, final String step,
final Long attemptedMin, final Long attemptedMax) {
http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index fc9e18b..697ff5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.CoreMatchers;
+import org.joda.time.Instant;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
@@ -52,6 +53,7 @@ public class MetricsTest implements Serializable {
private static final String NS = "test";
private static final String NAME = "name";
private static final MetricName METRIC_NAME = MetricName.named(NS, NAME);
+ private static final String NAMESPACE = MetricsTest.class.getName();
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@@ -127,19 +129,22 @@ public class MetricsTest implements Serializable {
.build());
assertThat(metrics.counters(), hasItem(
- committedMetricsResult(MetricsTest.class.getName(), "count", "MyStep1", 3L)));
+ committedMetricsResult(NAMESPACE, "count", "MyStep1", 3L)));
assertThat(metrics.distributions(), hasItem(
- committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1",
+ committedMetricsResult(NAMESPACE, "input", "MyStep1",
DistributionResult.create(26L, 3L, 5L, 13L))));
assertThat(metrics.counters(), hasItem(
- committedMetricsResult(MetricsTest.class.getName(), "count", "MyStep2", 6L)));
+ committedMetricsResult(NAMESPACE, "count", "MyStep2", 6L)));
assertThat(metrics.distributions(), hasItem(
- committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2",
+ committedMetricsResult(NAMESPACE, "input", "MyStep2",
DistributionResult.create(52L, 6L, 5L, 13L))));
+ assertThat(metrics.gauges(), hasItem(
+ committedMetricsResult(NAMESPACE, "my-gauge", "MyStep2",
+ GaugeResult.create(12L, Instant.now()))));
assertThat(metrics.distributions(), hasItem(
- distributionCommittedMinMax(MetricsTest.class.getName(), "bundle", "MyStep1", 10L, 40L)));
+ distributionCommittedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L)));
}
@@ -154,19 +159,22 @@ public class MetricsTest implements Serializable {
// TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly.
assertThat(metrics.counters(), hasItem(
- attemptedMetricsResult(MetricsTest.class.getName(), "count", "MyStep1", 3L)));
+ attemptedMetricsResult(NAMESPACE, "count", "MyStep1", 3L)));
assertThat(metrics.distributions(), hasItem(
- attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1",
+ attemptedMetricsResult(NAMESPACE, "input", "MyStep1",
DistributionResult.create(26L, 3L, 5L, 13L))));
assertThat(metrics.counters(), hasItem(
- attemptedMetricsResult(MetricsTest.class.getName(), "count", "MyStep2", 6L)));
+ attemptedMetricsResult(NAMESPACE, "count", "MyStep2", 6L)));
assertThat(metrics.distributions(), hasItem(
- attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2",
+ attemptedMetricsResult(NAMESPACE, "input", "MyStep2",
DistributionResult.create(52L, 6L, 5L, 13L))));
+ assertThat(metrics.gauges(), hasItem(
+ attemptedMetricsResult(NAMESPACE, "my-gauge", "MyStep2",
+ GaugeResult.create(12L, Instant.now()))));
assertThat(metrics.distributions(), hasItem(
- distributionAttemptedMinMax(MetricsTest.class.getName(), "bundle", "MyStep1", 10L, 40L)));
+ distributionAttemptedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L)));
}
private PipelineResult runPipelineWithMetrics() {
@@ -205,10 +213,13 @@ public class MetricsTest implements Serializable {
@ProcessElement
public void processElement(ProcessContext c) {
Distribution values = Metrics.distribution(MetricsTest.class, "input");
+ Gauge gauge = Metrics.gauge(MetricsTest.class, "my-gauge");
+ Integer element = c.element();
count.inc();
- values.update(c.element());
- c.output(c.element());
- c.sideOutput(output2, c.element());
+ values.update(element);
+ gauge.set(12L);
+ c.output(element);
+ c.sideOutput(output2, element);
}
}));
PipelineResult result = pipeline.run();
[2/2] beam git commit: This closes #2151
Posted by av...@apache.org.
This closes #2151
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b26e10b4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b26e10b4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b26e10b4
Branch: refs/heads/master
Commit: b26e10b44a0b82359dea7b96e0d49dd595fae785
Parents: 026aec8 63e953c
Author: Aviem Zur <av...@gmail.com>
Authored: Mon Mar 27 19:37:52 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Mon Mar 27 19:37:52 2017 +0300
----------------------------------------------------------------------
.../beam/runners/direct/DirectMetrics.java | 59 +++++++++++++-
.../beam/runners/direct/DirectMetricsTest.java | 42 ++++++++--
.../beam/runners/dataflow/DataflowMetrics.java | 16 +++-
.../runners/spark/metrics/SparkBeamMetric.java | 4 +
.../spark/metrics/SparkMetricResults.java | 27 +++++++
.../spark/metrics/SparkMetricsContainer.java | 20 +++++
.../java/org/apache/beam/sdk/metrics/Gauge.java | 32 ++++++++
.../org/apache/beam/sdk/metrics/GaugeCell.java | 60 +++++++++++++++
.../org/apache/beam/sdk/metrics/GaugeData.java | 81 ++++++++++++++++++++
.../apache/beam/sdk/metrics/GaugeResult.java | 61 +++++++++++++++
.../beam/sdk/metrics/MetricQueryResults.java | 3 +
.../apache/beam/sdk/metrics/MetricUpdates.java | 11 ++-
.../org/apache/beam/sdk/metrics/Metrics.java | 35 +++++++++
.../beam/sdk/metrics/MetricsContainer.java | 26 ++++++-
.../apache/beam/sdk/metrics/GaugeCellTest.java | 48 ++++++++++++
.../apache/beam/sdk/metrics/MetricMatchers.java | 12 ++-
.../apache/beam/sdk/metrics/MetricsTest.java | 37 +++++----
17 files changed, 539 insertions(+), 35 deletions(-)
----------------------------------------------------------------------