You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/03/27 16:38:33 UTC

[1/2] beam git commit: [BEAM-1617] Add Gauge metric type to Java SDK

Repository: beam
Updated Branches:
  refs/heads/master 026aec856 -> b26e10b44


[BEAM-1617] Add Gauge metric type to Java SDK


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63e953c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63e953c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63e953c6

Branch: refs/heads/master
Commit: 63e953c6026192e5e027f0bac183b86992480127
Parents: 026aec8
Author: Aviem Zur <av...@gmail.com>
Authored: Fri Mar 3 14:42:23 2017 +0200
Committer: Aviem Zur <av...@gmail.com>
Committed: Mon Mar 27 19:01:58 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/direct/DirectMetrics.java      | 59 +++++++++++++-
 .../beam/runners/direct/DirectMetricsTest.java  | 42 ++++++++--
 .../beam/runners/dataflow/DataflowMetrics.java  | 16 +++-
 .../runners/spark/metrics/SparkBeamMetric.java  |  4 +
 .../spark/metrics/SparkMetricResults.java       | 27 +++++++
 .../spark/metrics/SparkMetricsContainer.java    | 20 +++++
 .../java/org/apache/beam/sdk/metrics/Gauge.java | 32 ++++++++
 .../org/apache/beam/sdk/metrics/GaugeCell.java  | 60 +++++++++++++++
 .../org/apache/beam/sdk/metrics/GaugeData.java  | 81 ++++++++++++++++++++
 .../apache/beam/sdk/metrics/GaugeResult.java    | 61 +++++++++++++++
 .../beam/sdk/metrics/MetricQueryResults.java    |  3 +
 .../apache/beam/sdk/metrics/MetricUpdates.java  | 11 ++-
 .../org/apache/beam/sdk/metrics/Metrics.java    | 35 +++++++++
 .../beam/sdk/metrics/MetricsContainer.java      | 26 ++++++-
 .../apache/beam/sdk/metrics/GaugeCellTest.java  | 48 ++++++++++++
 .../apache/beam/sdk/metrics/MetricMatchers.java | 12 ++-
 .../apache/beam/sdk/metrics/MetricsTest.java    | 37 +++++----
 17 files changed, 539 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index f04dc21..fb126fb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -33,6 +33,8 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricFiltering;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
@@ -193,6 +195,28 @@ class DirectMetrics extends MetricResults {
         }
       };
 
+  private static final MetricAggregation<GaugeData, GaugeResult> GAUGE =
+      new MetricAggregation<GaugeData, GaugeResult>() {
+        @Override
+        public GaugeData zero() {
+          return GaugeData.empty();
+        }
+
+        @Override
+        public GaugeData combine(Iterable<GaugeData> updates) {
+          GaugeData result = GaugeData.empty();
+          for (GaugeData update : updates) {
+            result = result.combine(update);
+          }
+          return result;
+        }
+
+        @Override
+        public GaugeResult extract(GaugeData data) {
+          return data.extractResult();
+        }
+      };
+
   /** The current values of counters in memory. */
   private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters =
       new MetricsMap<>(new MetricsMap.Factory<MetricKey, DirectMetric<Long, Long>>() {
@@ -210,13 +234,23 @@ class DirectMetrics extends MetricResults {
           return new DirectMetric<>(DISTRIBUTION);
         }
       });
+  private MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauges =
+      new MetricsMap<>(
+          new MetricsMap.Factory<MetricKey, DirectMetric<GaugeData, GaugeResult>>() {
+            @Override
+            public DirectMetric<GaugeData, GaugeResult> createInstance(
+                MetricKey unusedKey) {
+              return new DirectMetric<>(GAUGE);
+            }
+          });
 
   @AutoValue
   abstract static class DirectMetricQueryResults implements MetricQueryResults {
     public static MetricQueryResults create(
         Iterable<MetricResult<Long>> counters,
-        Iterable<MetricResult<DistributionResult>> distributions) {
-      return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions);
+        Iterable<MetricResult<DistributionResult>> distributions,
+        Iterable<MetricResult<GaugeResult>> gauges) {
+      return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions, gauges);
     }
   }
 
@@ -248,8 +282,15 @@ class DirectMetrics extends MetricResults {
         : distributions.entries()) {
       maybeExtractResult(filter, distributionResults, distribution);
     }
+    ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults =
+        ImmutableList.builder();
+    for (Entry<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauge
+        : gauges.entries()) {
+      maybeExtractResult(filter, gaugeResults, gauge);
+    }
 
-    return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build());
+    return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build(),
+        gaugeResults.build());
   }
 
   private <ResultT> void maybeExtractResult(
@@ -274,6 +315,10 @@ class DirectMetrics extends MetricResults {
       distributions.get(distribution.getKey())
           .updatePhysical(bundle, distribution.getUpdate());
     }
+    for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) {
+      gauges.get(gauge.getKey())
+          .updatePhysical(bundle, gauge.getUpdate());
+    }
   }
 
   public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
@@ -284,6 +329,10 @@ class DirectMetrics extends MetricResults {
       distributions.get(distribution.getKey())
           .commitPhysical(bundle, distribution.getUpdate());
     }
+    for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) {
+      gauges.get(gauge.getKey())
+          .commitPhysical(bundle, gauge.getUpdate());
+    }
   }
 
   /** Apply metric updates that represent new logical values from a bundle being committed. */
@@ -295,5 +344,9 @@ class DirectMetrics extends MetricResults {
       distributions.get(distribution.getKey())
           .commitLogical(bundle, distribution.getUpdate());
     }
+    for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) {
+      gauges.get(gauge.getKey())
+          .commitLogical(bundle, gauge.getUpdate());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 7183124..ee51e9a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -29,12 +29,15 @@ import com.google.common.collect.ImmutableList;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -56,6 +59,7 @@ public class DirectMetricsTest {
   private static final MetricName NAME1 = MetricName.named("ns1", "name1");
   private static final MetricName NAME2 = MetricName.named("ns1", "name2");
   private static final MetricName NAME3 = MetricName.named("ns2", "name1");
+  private static final MetricName NAME4 = MetricName.named("ns2", "name2");
 
   private DirectMetrics metrics = new DirectMetrics();
 
@@ -73,14 +77,20 @@ public class DirectMetricsTest {
             MetricUpdate.create(MetricKey.create("step1", NAME2), 8L)),
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("step1", NAME1),
-                DistributionData.create(8, 2, 3, 5)))));
+                DistributionData.create(8, 2, 3, 5))),
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(15L)))
+        ));
     metrics.commitLogical(bundle1, MetricUpdates.create(
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
             MetricUpdate.create(MetricKey.create("step1", NAME2), 4L)),
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("step1", NAME1),
-                DistributionData.create(4, 1, 4, 4)))));
+                DistributionData.create(4, 1, 4, 4))),
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L)))
+    ));
 
     MetricQueryResults results = metrics.queryMetrics(MetricsFilter.builder().build());
     assertThat(results.counters(), containsInAnyOrder(
@@ -95,6 +105,12 @@ public class DirectMetricsTest {
         attemptedMetricsResult("ns1", "name1", "step1", DistributionResult.ZERO)));
     assertThat(results.distributions(), contains(
         committedMetricsResult("ns1", "name1", "step1", DistributionResult.create(12, 3, 3, 5))));
+    assertThat(results.gauges(), contains(
+        attemptedMetricsResult("ns2", "name2", "step1", GaugeResult.empty())
+    ));
+    assertThat(results.gauges(), contains(
+        committedMetricsResult("ns2", "name2", "step1", GaugeResult.create(27L, Instant.now()))
+    ));
   }
 
   @SuppressWarnings("unchecked")
@@ -104,12 +120,16 @@ public class DirectMetricsTest {
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
             MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)),
-        ImmutableList.<MetricUpdate<DistributionData>>of()));
+        ImmutableList.<MetricUpdate<DistributionData>>of(),
+        ImmutableList.<MetricUpdate<GaugeData>>of()
+    ));
     metrics.updatePhysical(bundle1, MetricUpdates.create(
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
             MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)),
-        ImmutableList.<MetricUpdate<DistributionData>>of()));
+        ImmutableList.<MetricUpdate<DistributionData>>of(),
+        ImmutableList.<MetricUpdate<GaugeData>>of()
+    ));
 
     MetricQueryResults results = metrics.queryMetrics(
         MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build());
@@ -132,12 +152,14 @@ public class DirectMetricsTest {
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
             MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
-        ImmutableList.<MetricUpdate<DistributionData>>of()));
+        ImmutableList.<MetricUpdate<DistributionData>>of(),
+        ImmutableList.<MetricUpdate<GaugeData>>of()));
     metrics.updatePhysical(bundle1, MetricUpdates.create(
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
             MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
-        ImmutableList.<MetricUpdate<DistributionData>>of()));
+        ImmutableList.<MetricUpdate<DistributionData>>of(),
+        ImmutableList.<MetricUpdate<GaugeData>>of()));
 
     MetricQueryResults results = metrics.queryMetrics(
         MetricsFilter.builder().addStep("Outer1").build());
@@ -161,12 +183,16 @@ public class DirectMetricsTest {
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
             MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
-        ImmutableList.<MetricUpdate<DistributionData>>of()));
+        ImmutableList.<MetricUpdate<DistributionData>>of(),
+        ImmutableList.<MetricUpdate<GaugeData>>of()
+    ));
     metrics.updatePhysical(bundle1, MetricUpdates.create(
         ImmutableList.of(
             MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
             MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
-        ImmutableList.<MetricUpdate<DistributionData>>of()));
+        ImmutableList.<MetricUpdate<DistributionData>>of(),
+        ImmutableList.<MetricUpdate<GaugeData>>of()
+    ));
 
     MetricQueryResults results = metrics.queryMetrics(
         MetricsFilter.builder().addStep("Top1/Outer1").build());

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index c0d1883..9d28ef6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricFiltering;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
@@ -130,6 +131,7 @@ class DataflowMetrics extends MetricResults {
     ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
     ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
         ImmutableList.builder();
+    ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder();
     for (MetricKey metricKey : metricHashKeys) {
       String metricName = metricKey.metricName().name();
       if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
@@ -149,19 +151,23 @@ class DataflowMetrics extends MetricResults {
             step, committed, attempted));
       }
     }
-    return DataflowMetricQueryResults.create(counterResults.build(), distributionResults.build());
+    return DataflowMetricQueryResults.create(
+        counterResults.build(),
+        distributionResults.build(),
+        gaugeResults.build());
   }
 
   private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) {
     List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates;
     ImmutableList<MetricResult<Long>> counters = ImmutableList.of();
     ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of();
+    ImmutableList<MetricResult<GaugeResult>> gauges = ImmutableList.of();
     JobMetrics jobMetrics;
     try {
       jobMetrics = dataflowClient.getJobMetrics(dataflowPipelineJob.jobId);
     } catch (IOException e) {
       LOG.warn("Unable to query job metrics.\n");
-      return DataflowMetricQueryResults.create(counters, distributions);
+      return DataflowMetricQueryResults.create(counters, distributions, gauges);
     }
     metricUpdates = jobMetrics.getMetrics();
     return populateMetricQueryResults(metricUpdates, filter);
@@ -189,8 +195,10 @@ class DataflowMetrics extends MetricResults {
   abstract static class DataflowMetricQueryResults implements MetricQueryResults {
     public static MetricQueryResults create(
         Iterable<MetricResult<Long>> counters,
-        Iterable<MetricResult<DistributionResult>> distributions) {
-      return new AutoValue_DataflowMetrics_DataflowMetricQueryResults(counters, distributions);
+        Iterable<MetricResult<DistributionResult>> distributions,
+        Iterable<MetricResult<GaugeResult>> gauges) {
+      return
+          new AutoValue_DataflowMetrics_DataflowMetricQueryResults(counters, distributions, gauges);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
index 8328a1a..2d445a9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
@@ -53,6 +54,9 @@ class SparkBeamMetric implements Metric {
       metrics.put(renderName(metricResult) + ".max", result.max());
       metrics.put(renderName(metricResult) + ".mean", result.mean());
     }
+    for (MetricResult<GaugeResult> metricResult : metricQueryResults.gauges()) {
+      metrics.put(renderName(metricResult), metricResult.attempted().value());
+    }
     return metrics;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
index a9651e2..c02027a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
@@ -25,6 +25,8 @@ import com.google.common.collect.FluentIterable;
 import java.util.Set;
 import org.apache.beam.sdk.metrics.DistributionData;
 import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
@@ -72,6 +74,16 @@ public class SparkMetricResults extends MetricResults {
               .toList();
     }
 
+    @Override
+    public Iterable<MetricResult<GaugeResult>> gauges() {
+      return
+          FluentIterable
+              .from(SparkMetricsContainer.getGauges())
+              .filter(matchesFilter(filter))
+              .transform(TO_GAUGE_RESULT)
+              .toList();
+    }
+
     private Predicate<MetricUpdate<?>> matchesFilter(final MetricsFilter filter) {
       return new Predicate<MetricUpdate<?>>() {
         @Override
@@ -146,6 +158,21 @@ public class SparkMetricResults extends MetricResults {
         }
       };
 
+  private static final Function<MetricUpdate<GaugeData>, MetricResult<GaugeResult>>
+      TO_GAUGE_RESULT =
+      new Function<MetricUpdate<GaugeData>, MetricResult<GaugeResult>>() {
+        @Override
+        public MetricResult<GaugeResult> apply(MetricUpdate<GaugeData> metricResult) {
+          if (metricResult != null) {
+            MetricKey key = metricResult.getKey();
+            return new SparkMetricResult<>(key.metricName(), key.stepName(),
+                metricResult.getUpdate().extractResult());
+          } else {
+            return null;
+          }
+        }
+      };
+
   private static class SparkMetricResult<T> implements MetricResult<T> {
     private final MetricName name;
     private final String step;

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
index d376ce3..b6aa178 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.GaugeData;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
@@ -47,6 +48,7 @@ public class SparkMetricsContainer implements Serializable {
 
   private final Map<MetricKey, MetricUpdate<Long>> counters = new HashMap<>();
   private final Map<MetricKey, MetricUpdate<DistributionData>> distributions = new HashMap<>();
+  private final Map<MetricKey, MetricUpdate<GaugeData>> gauges = new HashMap<>();
 
   public MetricsContainer getContainer(String stepName) {
     if (metricsContainers == null) {
@@ -76,9 +78,14 @@ public class SparkMetricsContainer implements Serializable {
     return sparkMetricsContainer.distributions.values();
   }
 
+  static Collection<MetricUpdate<GaugeData>> getGauges() {
+    return getInstance().gauges.values();
+  }
+
   SparkMetricsContainer update(SparkMetricsContainer other) {
     this.updateCounters(other.counters.values());
     this.updateDistributions(other.distributions.values());
+    this.updateGauges(other.gauges.values());
     return this;
   }
 
@@ -101,6 +108,7 @@ public class SparkMetricsContainer implements Serializable {
         MetricUpdates cumulative = container.getCumulative();
         this.updateCounters(cumulative.counterUpdates());
         this.updateDistributions(cumulative.distributionUpdates());
+        this.updateGauges(cumulative.gaugeUpdates());
       }
     }
   }
@@ -123,6 +131,18 @@ public class SparkMetricsContainer implements Serializable {
     }
   }
 
+  private void updateGauges(Iterable<MetricUpdate<GaugeData>> updates) {
+    for (MetricUpdate<GaugeData> update : updates) {
+      MetricKey key = update.getKey();
+      MetricUpdate<GaugeData> current = gauges.get(key);
+      gauges.put(
+          key,
+          current != null
+              ? MetricUpdate.create(key, current.getUpdate().combine(update.getUpdate()))
+              : update);
+    }
+  }
+
   private static class MetricsContainerCacheLoader extends CacheLoader<String, MetricsContainer> {
     @SuppressWarnings("NullableProblems")
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java
new file mode 100644
index 0000000..6c03c80
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * A metric that reports the latest value out of reported values.
+ *
+ * <p>Since metrics are collected from many workers the value may not be the absolute last,
+ * but one of the latest values.</p>
+ */
+@Experimental(Experimental.Kind.METRICS)
+public interface Gauge extends Metric {
+  /** Set current value for this gauge. */
+  void set(long value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
new file mode 100644
index 0000000..35ae822
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+
+/**
+ * Tracks the current value (and delta) for a {@link Gauge} metric.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is within a runner where
+ * a gauge is being reported for a specific step (rather than the gauge in the current
+ * context). In that case retrieving the underlying cell and reporting directly to it avoids a step
+ * of indirection.
+ */
+@Experimental(Experimental.Kind.METRICS)
+public class GaugeCell implements MetricCell<Gauge, GaugeData>, Gauge {
+
+  private final DirtyState dirty = new DirtyState();
+  private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty());
+
+  @Override
+  public void set(long value) {
+    GaugeData original;
+    do {
+      original = gaugeValue.get();
+    } while (!gaugeValue.compareAndSet(original, original.combine(GaugeData.create(value))));
+    dirty.afterModification();
+  }
+
+  @Override
+  public DirtyState getDirty() {
+    return dirty;
+  }
+
+  @Override
+  public GaugeData getCumulative() {
+    return gaugeValue.get();
+  }
+
+  @Override
+  public Gauge getInterface() {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java
new file mode 100644
index 0000000..bf3401d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.joda.time.Instant;
+
+/**
+ * Data describing the gauge. This should retain enough detail that it can be combined with
+ * other {@link GaugeData}.
+ */
+@AutoValue
+public abstract class GaugeData implements Serializable {
+
+  public abstract long value();
+
+  public abstract Instant timestamp();
+
+  public static GaugeData create(long value) {
+    return new AutoValue_GaugeData(value, Instant.now());
+  }
+
+  public static GaugeData empty() {
+    return EmptyGaugeData.INSTANCE;
+  }
+
+  public GaugeData combine(GaugeData other) {
+    if (this.timestamp().isAfter(other.timestamp())) {
+      return this;
+    } else {
+      return other;
+    }
+  }
+
+  public GaugeResult extractResult() {
+    return GaugeResult.create(value(), timestamp());
+  }
+
+  /**
+   * Empty {@link GaugeData}, representing no values reported.
+   */
+  public static class EmptyGaugeData extends GaugeData {
+
+    private static final EmptyGaugeData INSTANCE = new EmptyGaugeData();
+    private static final Instant EPOCH = new Instant(0);
+
+    private EmptyGaugeData() {
+    }
+
+    @Override
+    public long value() {
+      return -1L;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return EPOCH;
+    }
+
+    @Override
+    public GaugeResult extractResult() {
+      return GaugeResult.empty();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
new file mode 100644
index 0000000..878776a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import org.joda.time.Instant;
+
+/**
+ * The result of a {@link Gauge} metric.
+ */
+@AutoValue
+public abstract class GaugeResult {
+  public abstract long value();
+
+  public abstract Instant timestamp();
+
+  public static GaugeResult create(long value, Instant timestamp) {
+    return new AutoValue_GaugeResult(value, timestamp);
+  }
+
+  public static GaugeResult empty() {
+    return EmptyGaugeResult.INSTANCE;
+  }
+
+  /**
+   * Empty {@link GaugeResult}, representing no values reported.
+   */
+  public static class EmptyGaugeResult extends GaugeResult {
+
+    private static final EmptyGaugeResult INSTANCE = new EmptyGaugeResult();
+    private static final Instant EPOCH = new Instant(0);
+
+    private EmptyGaugeResult() {
+    }
+
+    @Override
+    public long value() {
+      return -1L;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return EPOCH;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
index 2241ba8..a7838ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
@@ -30,4 +30,7 @@ public interface MetricQueryResults {
 
   /** Return the metric results for the distributions that matched the filter. */
   Iterable<MetricResult<DistributionResult>> distributions();
+
+  /** Return the metric results for the gauges that matched the filter. */
+  Iterable<MetricResult<GaugeResult>> gauges();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
index 56466d8..9cf6a5c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
@@ -33,7 +33,8 @@ public abstract class MetricUpdates {
 
   public static final MetricUpdates EMPTY = MetricUpdates.create(
       Collections.<MetricUpdate<Long>>emptyList(),
-      Collections.<MetricUpdate<DistributionData>>emptyList());
+      Collections.<MetricUpdate<DistributionData>>emptyList(),
+      Collections.<MetricUpdate<GaugeData>>emptyList());
 
   /**
    * Representation of a single metric update.
@@ -64,10 +65,14 @@ public abstract class MetricUpdates {
   /** All of the distribution updates. */
   public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates();
 
+  /** All of the gauges updates. */
+  public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates();
+
   /** Create a new {@link MetricUpdates} bundle. */
   public static MetricUpdates create(
       Iterable<MetricUpdate<Long>> counterUpdates,
-      Iterable<MetricUpdate<DistributionData>> distributionUpdates) {
-    return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates);
+      Iterable<MetricUpdate<DistributionData>> distributionUpdates,
+      Iterable<MetricUpdate<GaugeData>> gaugeUpdates) {
+    return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates, gaugeUpdates);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 045e076..121698d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -58,6 +58,22 @@ public class Metrics {
     return new DelegatingDistribution(MetricName.named(namespace, name));
   }
 
+  /**
+   * Create a metric that can have its new value set, and is aggregated by taking the last reported
+   * value.
+   */
+  public static Gauge gauge(String namespace, String name) {
+    return new DelegatingGauge(MetricName.named(namespace, name));
+  }
+
+  /**
+   * Create a metric that can have its new value set, and is aggregated by taking the last reported
+   * value.
+   */
+  public static Gauge gauge(Class<?> namespace, String name) {
+    return new DelegatingGauge(MetricName.named(namespace, name));
+  }
+
   /** Implementation of {@link Counter} that delegates to the instance for the current context. */
   private static class DelegatingCounter implements Counter, Serializable {
     private final MetricName name;
@@ -108,4 +124,23 @@ public class Metrics {
       }
     }
   }
+
+  /**
+   * Implementation of {@link Gauge} that delegates to the instance for the current context.
+   */
+  private static class DelegatingGauge implements Gauge, Serializable {
+    private final MetricName name;
+
+    private DelegatingGauge(MetricName name) {
+      this.name = name;
+    }
+
+    @Override
+    public void set(long value) {
+      MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+      if (container != null) {
+        container.getGauge(name).set(value);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index ba5a343..5812ec6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -57,6 +57,14 @@ public class MetricsContainer {
         }
       });
 
+  private MetricsMap<MetricName, GaugeCell> gauges =
+      new MetricsMap<>(new MetricsMap.Factory<MetricName, GaugeCell>() {
+        @Override
+        public GaugeCell createInstance(MetricName unusedKey) {
+          return new GaugeCell();
+        }
+      });
+
   /**
    * Create a new {@link MetricsContainer} associated with the given {@code stepName}.
    */
@@ -72,10 +80,22 @@ public class MetricsContainer {
     return counters.get(metricName);
   }
 
+  /**
+   * Return the {@link DistributionCell} that should be used for implementing the given
+   * {@code metricName} in this container.
+   */
   public DistributionCell getDistribution(MetricName metricName) {
     return distributions.get(metricName);
   }
 
+  /**
+   * Return the {@link GaugeCell} that should be used for implementing the given
+   * {@code metricName} in this container.
+   */
+  public GaugeCell getGauge(MetricName metricName) {
+    return gauges.get(metricName);
+  }
+
   private <UpdateT, CellT extends MetricCell<?, UpdateT>>
   ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
       MetricsMap<MetricName, CellT> cells) {
@@ -96,7 +116,8 @@ public class MetricsContainer {
   public MetricUpdates getUpdates() {
     return MetricUpdates.create(
         extractUpdates(counters),
-        extractUpdates(distributions));
+        extractUpdates(distributions),
+        extractUpdates(gauges));
   }
 
   private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) {
@@ -132,6 +153,7 @@ public class MetricsContainer {
   public MetricUpdates getCumulative() {
     return MetricUpdates.create(
         extractCumulatives(counters),
-        extractCumulatives(distributions));
+        extractCumulatives(distributions),
+        extractCumulatives(gauges));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java
new file mode 100644
index 0000000..d8ef928
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+
+/**
+ * Tests for {@link GaugeCell}.
+ */
+public class GaugeCellTest {
+  private GaugeCell cell = new GaugeCell();
+
+  @Test
+  public void testDeltaAndCumulative() {
+    cell.set(5);
+    cell.set(7);
+    assertThat(cell.getCumulative().value(), equalTo(GaugeData.create(7).value()));
+    assertThat("getCumulative is idempotent",
+        cell.getCumulative().value(), equalTo(7L));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+    cell.set(30);
+    assertThat(cell.getCumulative().value(), equalTo(30L));
+
+    assertThat("Adding a new value made the cell dirty",
+        cell.getDirty().beforeCommit(), equalTo(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
index 5de8894..2251c82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
@@ -92,7 +92,7 @@ public class MetricMatchers {
         return Objects.equals(namespace, item.name().namespace())
             && Objects.equals(name, item.name().name())
             && item.step().contains(step)
-            && Objects.equals(attempted, item.attempted());
+            && metricResultsEqual(attempted, item.attempted());
       }
 
       @Override
@@ -135,7 +135,7 @@ public class MetricMatchers {
         return Objects.equals(namespace, item.name().namespace())
             && Objects.equals(name, item.name().name())
             && item.step().contains(step)
-            && Objects.equals(committed, item.committed());
+            && metricResultsEqual(committed, item.committed());
       }
 
       @Override
@@ -165,6 +165,14 @@ public class MetricMatchers {
     };
   }
 
+  private static <T> boolean metricResultsEqual(T result1, T result2) {
+    if (result1 instanceof GaugeResult) {
+      return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value());
+    } else {
+      return result1.equals(result2);
+    }
+  }
+
   static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax(
       final String namespace, final String name, final String step,
       final Long attemptedMin, final Long attemptedMax) {

http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index fc9e18b..697ff5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.hamcrest.CoreMatchers;
+import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
@@ -52,6 +53,7 @@ public class MetricsTest implements Serializable {
   private static final String NS = "test";
   private static final String NAME = "name";
   private static final MetricName METRIC_NAME = MetricName.named(NS, NAME);
+  private static final String NAMESPACE = MetricsTest.class.getName();
 
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
@@ -127,19 +129,22 @@ public class MetricsTest implements Serializable {
         .build());
 
     assertThat(metrics.counters(), hasItem(
-        committedMetricsResult(MetricsTest.class.getName(), "count", "MyStep1", 3L)));
+        committedMetricsResult(NAMESPACE, "count", "MyStep1", 3L)));
     assertThat(metrics.distributions(), hasItem(
-        committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1",
+        committedMetricsResult(NAMESPACE, "input", "MyStep1",
             DistributionResult.create(26L, 3L, 5L, 13L))));
 
     assertThat(metrics.counters(), hasItem(
-        committedMetricsResult(MetricsTest.class.getName(), "count", "MyStep2", 6L)));
+        committedMetricsResult(NAMESPACE, "count", "MyStep2", 6L)));
     assertThat(metrics.distributions(), hasItem(
-        committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2",
+        committedMetricsResult(NAMESPACE, "input", "MyStep2",
             DistributionResult.create(52L, 6L, 5L, 13L))));
+    assertThat(metrics.gauges(), hasItem(
+        committedMetricsResult(NAMESPACE, "my-gauge", "MyStep2",
+            GaugeResult.create(12L, Instant.now()))));
 
     assertThat(metrics.distributions(), hasItem(
-        distributionCommittedMinMax(MetricsTest.class.getName(), "bundle", "MyStep1", 10L, 40L)));
+        distributionCommittedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L)));
   }
 
 
@@ -154,19 +159,22 @@ public class MetricsTest implements Serializable {
 
     // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly.
     assertThat(metrics.counters(), hasItem(
-        attemptedMetricsResult(MetricsTest.class.getName(), "count", "MyStep1", 3L)));
+        attemptedMetricsResult(NAMESPACE, "count", "MyStep1", 3L)));
     assertThat(metrics.distributions(), hasItem(
-        attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1",
+        attemptedMetricsResult(NAMESPACE, "input", "MyStep1",
             DistributionResult.create(26L, 3L, 5L, 13L))));
 
     assertThat(metrics.counters(), hasItem(
-        attemptedMetricsResult(MetricsTest.class.getName(), "count", "MyStep2", 6L)));
+        attemptedMetricsResult(NAMESPACE, "count", "MyStep2", 6L)));
     assertThat(metrics.distributions(), hasItem(
-        attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2",
+        attemptedMetricsResult(NAMESPACE, "input", "MyStep2",
             DistributionResult.create(52L, 6L, 5L, 13L))));
+    assertThat(metrics.gauges(), hasItem(
+        attemptedMetricsResult(NAMESPACE, "my-gauge", "MyStep2",
+            GaugeResult.create(12L, Instant.now()))));
 
     assertThat(metrics.distributions(), hasItem(
-        distributionAttemptedMinMax(MetricsTest.class.getName(), "bundle", "MyStep1", 10L, 40L)));
+        distributionAttemptedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L)));
   }
 
   private PipelineResult runPipelineWithMetrics() {
@@ -205,10 +213,13 @@ public class MetricsTest implements Serializable {
               @ProcessElement
               public void processElement(ProcessContext c) {
                 Distribution values = Metrics.distribution(MetricsTest.class, "input");
+                Gauge gauge = Metrics.gauge(MetricsTest.class, "my-gauge");
+                Integer element = c.element();
                 count.inc();
-                values.update(c.element());
-                c.output(c.element());
-                c.sideOutput(output2, c.element());
+                values.update(element);
+                gauge.set(12L);
+                c.output(element);
+                c.sideOutput(output2, element);
               }
             }));
     PipelineResult result = pipeline.run();


[2/2] beam git commit: This closes #2151

Posted by av...@apache.org.
This closes #2151


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b26e10b4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b26e10b4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b26e10b4

Branch: refs/heads/master
Commit: b26e10b44a0b82359dea7b96e0d49dd595fae785
Parents: 026aec8 63e953c
Author: Aviem Zur <av...@gmail.com>
Authored: Mon Mar 27 19:37:52 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Mon Mar 27 19:37:52 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/direct/DirectMetrics.java      | 59 +++++++++++++-
 .../beam/runners/direct/DirectMetricsTest.java  | 42 ++++++++--
 .../beam/runners/dataflow/DataflowMetrics.java  | 16 +++-
 .../runners/spark/metrics/SparkBeamMetric.java  |  4 +
 .../spark/metrics/SparkMetricResults.java       | 27 +++++++
 .../spark/metrics/SparkMetricsContainer.java    | 20 +++++
 .../java/org/apache/beam/sdk/metrics/Gauge.java | 32 ++++++++
 .../org/apache/beam/sdk/metrics/GaugeCell.java  | 60 +++++++++++++++
 .../org/apache/beam/sdk/metrics/GaugeData.java  | 81 ++++++++++++++++++++
 .../apache/beam/sdk/metrics/GaugeResult.java    | 61 +++++++++++++++
 .../beam/sdk/metrics/MetricQueryResults.java    |  3 +
 .../apache/beam/sdk/metrics/MetricUpdates.java  | 11 ++-
 .../org/apache/beam/sdk/metrics/Metrics.java    | 35 +++++++++
 .../beam/sdk/metrics/MetricsContainer.java      | 26 ++++++-
 .../apache/beam/sdk/metrics/GaugeCellTest.java  | 48 ++++++++++++
 .../apache/beam/sdk/metrics/MetricMatchers.java | 12 ++-
 .../apache/beam/sdk/metrics/MetricsTest.java    | 37 +++++----
 17 files changed, 539 insertions(+), 35 deletions(-)
----------------------------------------------------------------------