You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/06 05:46:06 UTC

[3/4] beam git commit: [BEAM-1672] Make MetricsContainers accumulable.

[BEAM-1672] Make MetricsContainers accumulable.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/46c2f935
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/46c2f935
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/46c2f935

Branch: refs/heads/master
Commit: 46c2f935a99350e18e5d50f1a996996760ebc2e3
Parents: db0ec99
Author: Aviem Zur <av...@gmail.com>
Authored: Fri May 5 23:13:24 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sat May 6 08:27:49 2017 +0300

----------------------------------------------------------------------
 .../apache/beam/runners/core/LateDataUtils.java |   2 +-
 .../apache/beam/sdk/metrics/CounterCell.java    |  27 +-
 .../org/apache/beam/sdk/metrics/DirtyState.java |   3 +-
 .../beam/sdk/metrics/DistributionCell.java      |  16 +-
 .../org/apache/beam/sdk/metrics/GaugeCell.java  |  20 +-
 .../org/apache/beam/sdk/metrics/MetricCell.java |  14 +-
 .../org/apache/beam/sdk/metrics/Metrics.java    |   2 +-
 .../beam/sdk/metrics/MetricsContainer.java      |  29 +-
 .../sdk/metrics/MetricsContainerStepMap.java    | 487 +++++++++++++++++++
 .../org/apache/beam/sdk/metrics/MetricsMap.java |   5 +-
 .../beam/sdk/metrics/CounterCellTest.java       |   6 +-
 .../metrics/MetricsContainerStepMapTest.java    | 258 ++++++++++
 .../beam/sdk/metrics/MetricsContainerTest.java  |  14 +-
 13 files changed, 846 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
index c45387b..f7c0d31 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -71,7 +71,7 @@ public class LateDataUtils {
                         .isBefore(timerInternals.currentInputWatermarkTime());
                 if (expired) {
                   // The element is too late for this window.
-                  droppedDueToLateness.inc();
+                  droppedDueToLateness.update(1L);
                   WindowTracing.debug(
                       "GroupAlsoByWindow: Dropping element at {} for key: {}; "
                           + "window: {} since it is too far behind inputWatermark: {}",

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
index 7ab5ebc..4b8548f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * indirection.
  */
 @Experimental(Kind.METRICS)
-public class CounterCell implements MetricCell<Long> {
+public class CounterCell implements MetricCell<Counter, Long> {
 
   private final DirtyState dirty = new DirtyState();
   private final AtomicLong value = new AtomicLong();
@@ -41,13 +41,26 @@ public class CounterCell implements MetricCell<Long> {
    */
   CounterCell() {}
 
-  /** Increment the counter by the given amount. */
-  private void add(long n) {
+  /**
+   * Increment the counter by the given amount.
+   * @param n value to increment by. Can be negative to decrement.
+   */
+  public void update(long n) {
     value.addAndGet(n);
     dirty.afterModification();
   }
 
   @Override
+  public void update(Long n) {
+    throw new UnsupportedOperationException("CounterCell.update(Long n) should not be used"
+    + " as it performs unnecessary boxing/unboxing. Use CounterCell.update(long n) instead.");
+  }
+
+  @Override public void update(MetricCell<Counter, Long> other) {
+    update((long) other.getCumulative());
+  }
+
+  @Override
   public DirtyState getDirty() {
     return dirty;
   }
@@ -56,12 +69,4 @@ public class CounterCell implements MetricCell<Long> {
   public Long getCumulative() {
     return value.get();
   }
-
-  public void inc() {
-    add(1);
-  }
-
-  public void inc(long n) {
-    add(n);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
index 6706be8..4e0c15c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.metrics;
 
+import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -41,7 +42,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * completed.
  */
 @Experimental(Kind.METRICS)
-class DirtyState {
+class DirtyState implements Serializable {
   private enum State {
     /** Indicates that there have been changes to the MetricCell since last commit. */
     DIRTY,

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
index 0f3f6a4..93a3649 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * of indirection.
  */
 @Experimental(Kind.METRICS)
-public class DistributionCell implements MetricCell<DistributionData> {
+public class DistributionCell implements MetricCell<Distribution, DistributionData> {
 
   private final DirtyState dirty = new DirtyState();
   private final AtomicReference<DistributionData> value =
@@ -42,16 +42,26 @@ public class DistributionCell implements MetricCell<DistributionData> {
    */
   DistributionCell() {}
 
-  /** Increment the counter by the given amount. */
+  /** Increment the distribution by the given amount. */
   public void update(long n) {
+    update(DistributionData.singleton(n));
+  }
+
+  @Override
+  public void update(DistributionData data) {
     DistributionData original;
     do {
       original = value.get();
-    } while (!value.compareAndSet(original, original.combine(DistributionData.singleton(n))));
+    } while (!value.compareAndSet(original, original.combine(data)));
     dirty.afterModification();
   }
 
   @Override
+  public void update(MetricCell<Distribution, DistributionData> other) {
+    update(other.getCumulative());
+  }
+
+  @Override
   public DirtyState getDirty() {
     return dirty;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
index 6f8e880..0cdd568 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
@@ -29,17 +29,33 @@ import org.apache.beam.sdk.annotations.Experimental;
  * of indirection.
  */
 @Experimental(Experimental.Kind.METRICS)
-public class GaugeCell implements MetricCell<GaugeData> {
+public class GaugeCell implements MetricCell<Gauge, GaugeData> {
 
   private final DirtyState dirty = new DirtyState();
   private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty());
 
+  /** Set the gauge to the given value. */
   public void set(long value) {
+    update(GaugeData.create(value));
+  }
+
+  @Override
+  public void update(GaugeData data) {
+    GaugeData original;
+    do {
+      original = gaugeValue.get();
+    } while (!gaugeValue.compareAndSet(original, original.combine(data)));
+    dirty.afterModification();
+  }
+
+  @Override
+  public void update(MetricCell<Gauge, GaugeData> other) {
     GaugeData original;
     do {
       original = gaugeValue.get();
-    } while (!gaugeValue.compareAndSet(original, original.combine(GaugeData.create(value))));
+    } while (!gaugeValue.compareAndSet(original, original.combine(other.getCumulative())));
     dirty.afterModification();
+    update(other.getCumulative());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
index 82e30cb..403cac2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.metrics;
 
+import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 
@@ -24,10 +25,21 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a
  * specific metric name in a single context.
  *
+ * @param <UserT> The type of the user interface for reporting changes to this cell.
  * @param <DataT> The type of metric data stored (and extracted) from this cell.
  */
 @Experimental(Kind.METRICS)
-public interface MetricCell<DataT> {
+public interface MetricCell<UserT extends Metric, DataT> extends Serializable {
+
+  /**
+   * Update value of this cell.
+   */
+  void update(DataT data);
+
+  /**
+   * Update value of this cell by merging the value of another cell.
+   */
+  void update(MetricCell<UserT, DataT> other);
 
   /**
    * Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes.

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 9286ea9..096d147 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -118,7 +118,7 @@ public class Metrics {
     @Override public void inc(long n) {
       MetricsContainer container = MetricsEnvironment.getCurrentContainer();
       if (container != null) {
-        container.getCounter(name).inc(n);
+        container.getCounter(name).update(n);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index fbb0da3..48fa359 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.metrics;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -37,7 +38,7 @@ import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
  * cumulative values/updates.
  */
 @Experimental(Kind.METRICS)
-public class MetricsContainer {
+public class MetricsContainer implements Serializable {
 
   private final String stepName;
 
@@ -96,7 +97,7 @@ public class MetricsContainer {
     return gauges.get(metricName);
   }
 
-  private <UpdateT, CellT extends MetricCell<UpdateT>>
+  private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>>
   ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
       MetricsMap<MetricName, CellT> cells) {
     ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
@@ -120,8 +121,8 @@ public class MetricsContainer {
         extractUpdates(gauges));
   }
 
-  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> cells) {
-    for (MetricCell<?> cell : cells.values()) {
+  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) {
+    for (MetricCell<?, ?> cell : cells.values()) {
       cell.getDirty().afterCommit();
     }
   }
@@ -133,9 +134,10 @@ public class MetricsContainer {
   public void commitUpdates() {
     commitUpdates(counters);
     commitUpdates(distributions);
+    commitUpdates(gauges);
   }
 
-  private <UpdateT, CellT extends MetricCell<UpdateT>>
+  private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>>
   ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
       MetricsMap<MetricName, CellT> cells) {
     ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
@@ -156,4 +158,21 @@ public class MetricsContainer {
         extractCumulatives(distributions),
         extractCumulatives(gauges));
   }
+
+  /**
+   * Update values of this {@link MetricsContainer} by merging the value of another cell.
+   */
+  public void update(MetricsContainer other) {
+    updateCells(counters, other.counters);
+    updateCells(distributions, other.distributions);
+    updateCells(gauges, other.gauges);
+  }
+
+  private <UserT extends Metric, DataT, CellT extends MetricCell<UserT, DataT>> void updateCells(
+      MetricsMap<MetricName, CellT> current,
+      MetricsMap<MetricName, CellT> updates) {
+    for (Map.Entry<MetricName, CellT> counter : updates.entries()) {
+      current.get(counter.getKey()).update(counter.getValue());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
new file mode 100644
index 0000000..d01e970
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+
+/**
+ * Metrics containers by step.
+ *
+ * <p>This class is not thread-safe.</p>
+ */
+public class MetricsContainerStepMap implements Serializable {
+  private Map<String, MetricsContainer> metricsContainers;
+
+  public MetricsContainerStepMap() {
+    this.metricsContainers = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns the container for the given step name.
+   */
+  public MetricsContainer getContainer(String stepName) {
+    if (!metricsContainers.containsKey(stepName)) {
+      metricsContainers.put(stepName, new MetricsContainer(stepName));
+    }
+    return metricsContainers.get(stepName);
+  }
+
+  /**
+   * Update this {@link MetricsContainerStepMap} with all values from given
+   * {@link MetricsContainerStepMap}.
+   */
+  public void updateAll(MetricsContainerStepMap other) {
+    for (Map.Entry<String, MetricsContainer> container : other.metricsContainers.entrySet()) {
+      getContainer(container.getKey()).update(container.getValue());
+    }
+  }
+
+  /**
+   * Update {@link MetricsContainer} for given step in this map with all values from given
+   * {@link MetricsContainer}.
+   */
+  public void update(String step, MetricsContainer container) {
+    getContainer(step).update(container);
+  }
+
+  /**
+   * Returns {@link MetricResults} based on given
+   * {@link MetricsContainerStepMap MetricsContainerStepMaps} of attempted and committed metrics.
+   *
+   * <p>This constructor is intended for runners which support both attempted and committed
+   * metrics.
+   */
+  public static MetricResults asMetricResults(
+      MetricsContainerStepMap attemptedMetricsContainers,
+      MetricsContainerStepMap committedMetricsContainers) {
+    return new MetricsContainerStepMapMetricResults(
+        attemptedMetricsContainers,
+        committedMetricsContainers);
+  }
+
+  /**
+   * Returns {@link MetricResults} based on given {@link MetricsContainerStepMap} of attempted
+   * metrics.
+   *
+   * <p>This constructor is intended for runners which only support `attempted` metrics.
+   * Accessing {@link MetricResult#committed()} in the resulting {@link MetricResults} will result
+   * in an {@link UnsupportedOperationException}.</p>
+   */
+  public static MetricResults asAttemptedOnlyMetricResults(
+      MetricsContainerStepMap attemptedMetricsContainers) {
+    return new MetricsContainerStepMapMetricResults(attemptedMetricsContainers);
+  }
+
+  private Map<String, MetricsContainer> getMetricsContainers() {
+    return metricsContainers;
+  }
+
+  private static class MetricsContainerStepMapMetricResults extends MetricResults {
+    private final Map<MetricKey, AttemptedAndCommitted<Long>> counters = new HashMap<>();
+    private final Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions =
+        new HashMap<>();
+    private final Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges = new HashMap<>();
+    private final boolean isCommittedSupported;
+
+    private MetricsContainerStepMapMetricResults(
+        MetricsContainerStepMap attemptedMetricsContainers) {
+      this(attemptedMetricsContainers, new MetricsContainerStepMap(), false);
+    }
+
+    private MetricsContainerStepMapMetricResults(
+        MetricsContainerStepMap attemptedMetricsContainers,
+        MetricsContainerStepMap committedMetricsContainers) {
+      this(attemptedMetricsContainers, committedMetricsContainers, true);
+    }
+
+    private MetricsContainerStepMapMetricResults(
+        MetricsContainerStepMap attemptedMetricsContainers,
+        MetricsContainerStepMap committedMetricsContainers,
+        boolean isCommittedSupported) {
+      for (MetricsContainer container
+          : attemptedMetricsContainers.getMetricsContainers().values()) {
+        MetricUpdates cumulative = container.getCumulative();
+        mergeCounters(counters, cumulative.counterUpdates(), attemptedCounterUpdateFn());
+        mergeDistributions(distributions, cumulative.distributionUpdates(),
+            attemptedDistributionUpdateFn());
+        mergeGauges(gauges, cumulative.gaugeUpdates(), attemptedGaugeUpdateFn());
+      }
+      for (MetricsContainer container
+          : committedMetricsContainers.getMetricsContainers().values()) {
+        MetricUpdates cumulative = container.getCumulative();
+        mergeCounters(counters, cumulative.counterUpdates(), committedCounterUpdateFn());
+        mergeDistributions(distributions, cumulative.distributionUpdates(),
+            committedDistributionUpdateFn());
+        mergeGauges(gauges, cumulative.gaugeUpdates(), committedGaugeUpdateFn());
+      }
+      this.isCommittedSupported = isCommittedSupported;
+    }
+
+    private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>>
+    attemptedDistributionUpdateFn() {
+      return new Function<MetricUpdate<DistributionData>,
+          AttemptedAndCommitted<DistributionData>>() {
+        @Override
+        public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              input,
+              MetricUpdate.create(key, DistributionData.EMPTY));
+        }
+      };
+    }
+
+    private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>>
+    committedDistributionUpdateFn() {
+      return new Function<MetricUpdate<DistributionData>,
+          AttemptedAndCommitted<DistributionData>>() {
+        @Override
+        public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(key, DistributionData.EMPTY),
+              input);
+        }
+      };
+    }
+
+    private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+    attemptedGaugeUpdateFn() {
+      return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() {
+        @Override
+        public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              input,
+              MetricUpdate.create(key, GaugeData.empty()));
+        }
+      };
+    }
+
+    private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+    committedGaugeUpdateFn() {
+      return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() {
+        @Override
+        public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(key, GaugeData.empty()),
+              input);
+        }
+      };
+    }
+
+    private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> attemptedCounterUpdateFn() {
+      return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+        @Override
+        public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              input,
+              MetricUpdate.create(key, 0L));
+        }
+      };
+    }
+
+    private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> committedCounterUpdateFn() {
+      return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+        @Override
+        public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(key, 0L),
+              input);
+        }
+      };
+    }
+
+    @Override
+    public MetricQueryResults queryMetrics(MetricsFilter filter) {
+      return new QueryResults(filter);
+    }
+
+    private class QueryResults implements MetricQueryResults {
+      private final MetricsFilter filter;
+
+      private QueryResults(MetricsFilter filter) {
+        this.filter = filter;
+      }
+
+      @Override
+      public Iterable<MetricResult<Long>> counters() {
+        return
+            FluentIterable
+                .from(counters.values())
+                .filter(matchesFilter(filter))
+                .transform(counterUpdateToResult())
+                .toList();
+      }
+
+      @Override
+      public Iterable<MetricResult<DistributionResult>> distributions() {
+        return
+            FluentIterable
+                .from(distributions.values())
+                .filter(matchesFilter(filter))
+                .transform(distributionUpdateToResult())
+                .toList();
+      }
+
+      @Override
+      public Iterable<MetricResult<GaugeResult>> gauges() {
+        return
+            FluentIterable
+                .from(gauges.values())
+                .filter(matchesFilter(filter))
+                .transform(gaugeUpdateToResult())
+                .toList();
+      }
+
+      private Predicate<AttemptedAndCommitted<?>> matchesFilter(final MetricsFilter filter) {
+        return new Predicate<AttemptedAndCommitted<?>>() {
+          @Override
+          public boolean apply(AttemptedAndCommitted<?> attemptedAndCommitted) {
+            return MetricFiltering.matches(filter, attemptedAndCommitted.getKey());
+          }
+        };
+      }
+    }
+
+    private Function<AttemptedAndCommitted<Long>, MetricResult<Long>> counterUpdateToResult() {
+      return new
+          Function<AttemptedAndCommitted<Long>, MetricResult<Long>>() {
+            @Override
+            public MetricResult<Long>
+            apply(AttemptedAndCommitted<Long> metricResult) {
+              MetricKey key = metricResult.getKey();
+              return new AccumulatedMetricResult<>(
+                  key.metricName(),
+                  key.stepName(),
+                  metricResult.getAttempted().getUpdate(),
+                  isCommittedSupported
+                      ? metricResult.getCommitted().getUpdate()
+                      : null,
+                  isCommittedSupported);
+            }
+          };
+    }
+
+    private Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>>
+    distributionUpdateToResult() {
+      return new
+          Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>>() {
+            @Override
+            public MetricResult<DistributionResult>
+            apply(AttemptedAndCommitted<DistributionData> metricResult) {
+              MetricKey key = metricResult.getKey();
+              return new AccumulatedMetricResult<>(
+                  key.metricName(),
+                  key.stepName(),
+                  metricResult.getAttempted().getUpdate().extractResult(),
+                  isCommittedSupported
+                      ? metricResult.getCommitted().getUpdate().extractResult()
+                      : null,
+                  isCommittedSupported);
+            }
+          };
+    }
+
+    private Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>>
+    gaugeUpdateToResult() {
+      return new
+          Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>>() {
+            @Override
+            public MetricResult<GaugeResult>
+            apply(AttemptedAndCommitted<GaugeData> metricResult) {
+              MetricKey key = metricResult.getKey();
+              return new AccumulatedMetricResult<>(
+                  key.metricName(),
+                  key.stepName(),
+                  metricResult.getAttempted().getUpdate().extractResult(),
+                  isCommittedSupported
+                      ? metricResult.getCommitted().getUpdate().extractResult()
+                      : null,
+                  isCommittedSupported);
+            }
+          };
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private void mergeCounters(
+        Map<MetricKey, AttemptedAndCommitted<Long>> counters,
+        Iterable<MetricUpdate<Long>> updates,
+        Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> updateToAttemptedAndCommittedFn) {
+      for (MetricUpdate<Long> metricUpdate : updates) {
+        MetricKey key = metricUpdate.getKey();
+        AttemptedAndCommitted<Long> update =
+            updateToAttemptedAndCommittedFn.apply(metricUpdate);
+        if (counters.containsKey(key)) {
+          AttemptedAndCommitted<Long> current = counters.get(key);
+          update = new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(
+                  key,
+                  update.getAttempted().getUpdate() + current.getAttempted().getUpdate()),
+              MetricUpdate.create(
+                  key,
+                  update.getCommitted().getUpdate() + current.getCommitted().getUpdate()));
+        }
+        counters.put(key, update);
+      }
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private void mergeDistributions(
+        Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions,
+        Iterable<MetricUpdate<DistributionData>> updates,
+        Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>>
+            updateToAttemptedAndCommittedFn) {
+      for (MetricUpdate<DistributionData> metricUpdate : updates) {
+        MetricKey key = metricUpdate.getKey();
+        AttemptedAndCommitted<DistributionData> update =
+            updateToAttemptedAndCommittedFn.apply(metricUpdate);
+        if (distributions.containsKey(key)) {
+          AttemptedAndCommitted<DistributionData> current = distributions.get(key);
+          update = new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(
+                  key,
+                  update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+              MetricUpdate.create(
+                  key,
+                  update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+        }
+        distributions.put(key, update);
+      }
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private void mergeGauges(
+        Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges,
+        Iterable<MetricUpdate<GaugeData>> updates,
+        Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+            updateToAttemptedAndCommittedFn) {
+      for (MetricUpdate<GaugeData> metricUpdate : updates) {
+        MetricKey key = metricUpdate.getKey();
+        AttemptedAndCommitted<GaugeData> update =
+            updateToAttemptedAndCommittedFn.apply(metricUpdate);
+        if (gauges.containsKey(key)) {
+          AttemptedAndCommitted<GaugeData> current = gauges.get(key);
+          update = new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(
+                  key,
+                  update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+              MetricUpdate.create(
+                  key,
+                  update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+        }
+        gauges.put(key, update);
+      }
+    }
+
+    /**
+     * Accumulated implementation of {@link MetricResult}.
+     */
+    private static class AccumulatedMetricResult<T> implements MetricResult<T> {
+      private final MetricName name;
+      private final String step;
+      private final T attempted;
+      private final T committed;
+      private final boolean isCommittedSupported;
+
+      private AccumulatedMetricResult(
+          MetricName name,
+          String step,
+          T attempted,
+          T committed,
+          boolean isCommittedSupported) {
+        this.name = name;
+        this.step = step;
+        this.attempted = attempted;
+        this.committed = committed;
+        this.isCommittedSupported = isCommittedSupported;
+      }
+
+      @Override
+      public MetricName name() {
+        return name;
+      }
+
+      @Override
+      public String step() {
+        return step;
+      }
+
+      @Override
+      public T committed() {
+        if (!isCommittedSupported) {
+          throw new UnsupportedOperationException("This runner does not currently support committed"
+              + " metrics results. Please use 'attempted' instead.");
+        }
+        return committed;
+      }
+
+      @Override
+      public T attempted() {
+        return attempted;
+      }
+    }
+
+    /**
+     * Attempted and committed {@link MetricUpdate MetricUpdates}.
+     */
+    private static class AttemptedAndCommitted<T> {
+      private final MetricKey key;
+      private final MetricUpdate<T> attempted;
+      private final MetricUpdate<T> committed;
+
+      private AttemptedAndCommitted(MetricKey key, MetricUpdate<T> attempted,
+          MetricUpdate<T> committed) {
+        this.key = key;
+        this.attempted = attempted;
+        this.committed = committed;
+      }
+
+      private MetricKey getKey() {
+        return key;
+      }
+
+      private MetricUpdate<T> getAttempted() {
+        return attempted;
+      }
+
+      private MetricUpdate<T> getCommitted() {
+        return committed;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
index 5a02106..8c26f18 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.metrics;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Iterables;
+import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -31,10 +32,10 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * in a thread-safe manner.
  */
 @Experimental(Kind.METRICS)
-public class MetricsMap<K, T> {
+public class MetricsMap<K, T> implements Serializable {
 
   /** Interface for creating instances to populate the {@link MetricsMap}. */
-  public interface Factory<K, T> {
+  public interface Factory<K, T> extends Serializable {
     /**
      * Create an instance of {@code T} to use with the given {@code key}.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
index 408f145..26554d4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
@@ -35,8 +35,8 @@ public class CounterCellTest {
 
   @Test
   public void testDeltaAndCumulative() {
-    cell.inc(5);
-    cell.inc(7);
+    cell.update(5);
+    cell.update(7);
     assertThat(cell.getCumulative(), equalTo(12L));
     assertThat("getCumulative is idempotent", cell.getCumulative(), equalTo(12L));
 
@@ -45,7 +45,7 @@ public class CounterCellTest {
     assertThat(cell.getDirty().beforeCommit(), equalTo(false));
     assertThat(cell.getCumulative(), equalTo(12L));
 
-    cell.inc(30);
+    cell.update(30);
     assertThat(cell.getCumulative(), equalTo(42L));
 
     assertThat(cell.getDirty().beforeCommit(), equalTo(true));

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
new file mode 100644
index 0000000..0428ce1
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult;
+import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asMetricResults;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.hamcrest.collection.IsIterableWithSize;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests for {@link MetricsContainerStepMap}.
+ */
+public class MetricsContainerStepMapTest {
+
+  private static final String NAMESPACE = MetricsContainerStepMapTest.class.getName();
+  private static final String STEP1 = "myStep1";
+  private static final String STEP2 = "myStep2";
+
+  private static final long VALUE = 100;
+
+  private static final Counter counter =
+      Metrics.counter(
+          MetricsContainerStepMapTest.class,
+          "myCounter");
+  private static final Distribution distribution =
+      Metrics.distribution(
+          MetricsContainerStepMapTest.class,
+          "myDistribution");
+  private static final Gauge gauge =
+      Metrics.gauge(
+          MetricsContainerStepMapTest.class,
+          "myGauge");
+
+  private static final MetricsContainer metricsContainer;
+
+  static {
+    metricsContainer = new MetricsContainer(null);
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
+      counter.inc(VALUE);
+      distribution.update(VALUE);
+      distribution.update(VALUE * 2);
+      gauge.set(VALUE);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    assertIterableSize(step1res.counters(), 1);
+    assertIterableSize(step1res.distributions(), 1);
+    assertIterableSize(step1res.gauges(), 1);
+
+    assertCounter(step1res, STEP1, VALUE, false);
+    assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2),
+        false);
+    assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false);
+
+    MetricQueryResults step2res =
+        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
+
+    assertIterableSize(step2res.counters(), 1);
+    assertIterableSize(step2res.distributions(), 1);
+    assertIterableSize(step2res.gauges(), 1);
+
+    assertCounter(step2res, STEP2, VALUE * 2, false);
+    assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2),
+        false);
+    assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false);
+
+    MetricQueryResults allres =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+
+    assertIterableSize(allres.counters(), 2);
+    assertIterableSize(allres.distributions(), 2);
+    assertIterableSize(allres.gauges(), 2);
+  }
+
+  @Test
+  public void testCounterCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("This runner does not currently support committed metrics results.");
+
+    assertCounter(step1res, STEP1, VALUE, true);
+  }
+
+  @Test
+  public void testDistributionCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("This runner does not currently support committed metrics results.");
+
+    assertDistribution(step1res, STEP1, DistributionResult.ZERO, true);
+  }
+
+  @Test
+  public void testGaugeCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("This runner does not currently support committed metrics results.");
+
+    assertGauge(step1res, STEP1, GaugeResult.empty(), true);
+  }
+
+  @Test
+  public void testAttemptedAndCommittedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    attemptedMetrics.update(STEP1, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+
+    MetricsContainerStepMap committedMetrics = new MetricsContainerStepMap();
+    committedMetrics.update(STEP1, metricsContainer);
+    committedMetrics.update(STEP2, metricsContainer);
+    committedMetrics.update(STEP2, metricsContainer);
+
+    MetricResults metricResults =
+        asMetricResults(attemptedMetrics, committedMetrics);
+
+    MetricQueryResults step1res =
+        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    assertIterableSize(step1res.counters(), 1);
+    assertIterableSize(step1res.distributions(), 1);
+    assertIterableSize(step1res.gauges(), 1);
+
+    assertCounter(step1res, STEP1, VALUE * 2, false);
+    assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2),
+        false);
+    assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false);
+
+    assertCounter(step1res, STEP1, VALUE, true);
+    assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2),
+        true);
+    assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), true);
+
+    MetricQueryResults step2res =
+        metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
+
+    assertIterableSize(step2res.counters(), 1);
+    assertIterableSize(step2res.distributions(), 1);
+    assertIterableSize(step2res.gauges(), 1);
+
+    assertCounter(step2res, STEP2, VALUE * 3, false);
+    assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 9, 6, VALUE, VALUE * 2),
+        false);
+    assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false);
+
+    assertCounter(step2res, STEP2, VALUE * 2, true);
+    assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2),
+        true);
+    assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), true);
+
+    MetricQueryResults allres =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+
+    assertIterableSize(allres.counters(), 2);
+    assertIterableSize(allres.distributions(), 2);
+    assertIterableSize(allres.gauges(), 2);
+  }
+
+  private <T> void assertIterableSize(Iterable<T> iterable, int size) {
+    assertThat(iterable, IsIterableWithSize.<T>iterableWithSize(size));
+  }
+
+  private void assertCounter(
+      MetricQueryResults metricQueryResults,
+      String step,
+      Long expected,
+      boolean isCommitted) {
+    assertThat(
+        metricQueryResults.counters(),
+        hasItem(metricsResult(NAMESPACE, counter.getName().name(), step, expected, isCommitted)));
+  }
+
+  private void assertDistribution(
+      MetricQueryResults metricQueryResults,
+      String step,
+      DistributionResult expected,
+      boolean isCommitted) {
+    assertThat(
+        metricQueryResults.distributions(),
+        hasItem(metricsResult(NAMESPACE, distribution.getName().name(), step, expected,
+            isCommitted)));
+  }
+
+  private void assertGauge(
+      MetricQueryResults metricQueryResults,
+      String step,
+      GaugeResult expected,
+      boolean isCommitted) {
+    assertThat(
+        metricQueryResults.gauges(),
+        hasItem(metricsResult(NAMESPACE, gauge.getName().name(), step, expected, isCommitted)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
index 58797ce..38c00d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
@@ -47,8 +47,8 @@ public class MetricsContainerTest {
     assertThat("After commit no counters should be dirty",
         container.getUpdates().counterUpdates(), emptyIterable());
 
-    c1.inc(5L);
-    c2.inc(4L);
+    c1.update(5L);
+    c2.update(4L);
 
     assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder(
         metricUpdate("name1", 5L),
@@ -63,7 +63,7 @@ public class MetricsContainerTest {
     assertThat("After commit there are no updates",
         container.getUpdates().counterUpdates(), emptyIterable());
 
-    c1.inc(8L);
+    c1.update(8L);
     assertThat(container.getUpdates().counterUpdates(), contains(
         metricUpdate("name1", 13L)));
   }
@@ -73,9 +73,9 @@ public class MetricsContainerTest {
     MetricsContainer container = new MetricsContainer("step1");
     CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
     CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
-    c1.inc(2L);
-    c2.inc(4L);
-    c1.inc(3L);
+    c1.update(2L);
+    c2.update(4L);
+    c1.update(3L);
 
     container.getUpdates();
     container.commitUpdates();
@@ -84,7 +84,7 @@ public class MetricsContainerTest {
         metricUpdate("name1", 5L),
         metricUpdate("name2", 4L)));
 
-    c1.inc(8L);
+    c1.update(8L);
     assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder(
         metricUpdate("name1", 13L),
         metricUpdate("name2", 4L)));