You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/06 05:46:06 UTC
[3/4] beam git commit: [BEAM-1672] Make MetricsContainers accumulable.
[BEAM-1672] Make MetricsContainers accumulable.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/46c2f935
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/46c2f935
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/46c2f935
Branch: refs/heads/master
Commit: 46c2f935a99350e18e5d50f1a996996760ebc2e3
Parents: db0ec99
Author: Aviem Zur <av...@gmail.com>
Authored: Fri May 5 23:13:24 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sat May 6 08:27:49 2017 +0300
----------------------------------------------------------------------
.../apache/beam/runners/core/LateDataUtils.java | 2 +-
.../apache/beam/sdk/metrics/CounterCell.java | 27 +-
.../org/apache/beam/sdk/metrics/DirtyState.java | 3 +-
.../beam/sdk/metrics/DistributionCell.java | 16 +-
.../org/apache/beam/sdk/metrics/GaugeCell.java | 20 +-
.../org/apache/beam/sdk/metrics/MetricCell.java | 14 +-
.../org/apache/beam/sdk/metrics/Metrics.java | 2 +-
.../beam/sdk/metrics/MetricsContainer.java | 29 +-
.../sdk/metrics/MetricsContainerStepMap.java | 487 +++++++++++++++++++
.../org/apache/beam/sdk/metrics/MetricsMap.java | 5 +-
.../beam/sdk/metrics/CounterCellTest.java | 6 +-
.../metrics/MetricsContainerStepMapTest.java | 258 ++++++++++
.../beam/sdk/metrics/MetricsContainerTest.java | 14 +-
13 files changed, 846 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
index c45387b..f7c0d31 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -71,7 +71,7 @@ public class LateDataUtils {
.isBefore(timerInternals.currentInputWatermarkTime());
if (expired) {
// The element is too late for this window.
- droppedDueToLateness.inc();
+ droppedDueToLateness.update(1L);
WindowTracing.debug(
"GroupAlsoByWindow: Dropping element at {} for key: {}; "
+ "window: {} since it is too far behind inputWatermark: {}",
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
index 7ab5ebc..4b8548f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
* indirection.
*/
@Experimental(Kind.METRICS)
-public class CounterCell implements MetricCell<Long> {
+public class CounterCell implements MetricCell<Counter, Long> {
private final DirtyState dirty = new DirtyState();
private final AtomicLong value = new AtomicLong();
@@ -41,13 +41,26 @@ public class CounterCell implements MetricCell<Long> {
*/
CounterCell() {}
- /** Increment the counter by the given amount. */
- private void add(long n) {
+ /**
+ * Increment the counter by the given amount.
+ * @param n value to increment by. Can be negative to decrement.
+ */
+ public void update(long n) {
value.addAndGet(n);
dirty.afterModification();
}
@Override
+ public void update(Long n) {
+ throw new UnsupportedOperationException("CounterCell.update(Long n) should not be used"
+ + " as it performs unnecessary boxing/unboxing. Use CounterCell.update(long n) instead.");
+ }
+
+ @Override public void update(MetricCell<Counter, Long> other) {
+ update((long) other.getCumulative());
+ }
+
+ @Override
public DirtyState getDirty() {
return dirty;
}
@@ -56,12 +69,4 @@ public class CounterCell implements MetricCell<Long> {
public Long getCumulative() {
return value.get();
}
-
- public void inc() {
- add(1);
- }
-
- public void inc(long n) {
- add(n);
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
index 6706be8..4e0c15c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.metrics;
+import java.io.Serializable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -41,7 +42,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
* completed.
*/
@Experimental(Kind.METRICS)
-class DirtyState {
+class DirtyState implements Serializable {
private enum State {
/** Indicates that there have been changes to the MetricCell since last commit. */
DIRTY,
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
index 0f3f6a4..93a3649 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
* of indirection.
*/
@Experimental(Kind.METRICS)
-public class DistributionCell implements MetricCell<DistributionData> {
+public class DistributionCell implements MetricCell<Distribution, DistributionData> {
private final DirtyState dirty = new DirtyState();
private final AtomicReference<DistributionData> value =
@@ -42,16 +42,26 @@ public class DistributionCell implements MetricCell<DistributionData> {
*/
DistributionCell() {}
- /** Increment the counter by the given amount. */
+ /** Increment the distribution by the given amount. */
public void update(long n) {
+ update(DistributionData.singleton(n));
+ }
+
+ @Override
+ public void update(DistributionData data) {
DistributionData original;
do {
original = value.get();
- } while (!value.compareAndSet(original, original.combine(DistributionData.singleton(n))));
+ } while (!value.compareAndSet(original, original.combine(data)));
dirty.afterModification();
}
@Override
+ public void update(MetricCell<Distribution, DistributionData> other) {
+ update(other.getCumulative());
+ }
+
+ @Override
public DirtyState getDirty() {
return dirty;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
index 6f8e880..0cdd568 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java
@@ -29,17 +29,33 @@ import org.apache.beam.sdk.annotations.Experimental;
* of indirection.
*/
@Experimental(Experimental.Kind.METRICS)
-public class GaugeCell implements MetricCell<GaugeData> {
+public class GaugeCell implements MetricCell<Gauge, GaugeData> {
private final DirtyState dirty = new DirtyState();
private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty());
+ /** Set the gauge to the given value. */
public void set(long value) {
+ update(GaugeData.create(value));
+ }
+
+ @Override
+ public void update(GaugeData data) {
+ GaugeData original;
+ do {
+ original = gaugeValue.get();
+ } while (!gaugeValue.compareAndSet(original, original.combine(data)));
+ dirty.afterModification();
+ }
+
+ @Override
+ public void update(MetricCell<Gauge, GaugeData> other) {
GaugeData original;
do {
original = gaugeValue.get();
- } while (!gaugeValue.compareAndSet(original, original.combine(GaugeData.create(value))));
+ } while (!gaugeValue.compareAndSet(original, original.combine(other.getCumulative())));
dirty.afterModification();
+ update(other.getCumulative());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
index 82e30cb..403cac2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.metrics;
+import java.io.Serializable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -24,10 +25,21 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
* A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a
* specific metric name in a single context.
*
+ * @param <UserT> The type of the user interface for reporting changes to this cell.
* @param <DataT> The type of metric data stored (and extracted) from this cell.
*/
@Experimental(Kind.METRICS)
-public interface MetricCell<DataT> {
+public interface MetricCell<UserT extends Metric, DataT> extends Serializable {
+
+ /**
+ * Update value of this cell.
+ */
+ void update(DataT data);
+
+ /**
+ * Update value of this cell by merging the value of another cell.
+ */
+ void update(MetricCell<UserT, DataT> other);
/**
* Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes.
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 9286ea9..096d147 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -118,7 +118,7 @@ public class Metrics {
@Override public void inc(long n) {
MetricsContainer container = MetricsEnvironment.getCurrentContainer();
if (container != null) {
- container.getCounter(name).inc(n);
+ container.getCounter(name).update(n);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index fbb0da3..48fa359 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.metrics;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -37,7 +38,7 @@ import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
* cumulative values/updates.
*/
@Experimental(Kind.METRICS)
-public class MetricsContainer {
+public class MetricsContainer implements Serializable {
private final String stepName;
@@ -96,7 +97,7 @@ public class MetricsContainer {
return gauges.get(metricName);
}
- private <UpdateT, CellT extends MetricCell<UpdateT>>
+ private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
MetricsMap<MetricName, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
@@ -120,8 +121,8 @@ public class MetricsContainer {
extractUpdates(gauges));
}
- private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> cells) {
- for (MetricCell<?> cell : cells.values()) {
+ private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) {
+ for (MetricCell<?, ?> cell : cells.values()) {
cell.getDirty().afterCommit();
}
}
@@ -133,9 +134,10 @@ public class MetricsContainer {
public void commitUpdates() {
commitUpdates(counters);
commitUpdates(distributions);
+ commitUpdates(gauges);
}
- private <UpdateT, CellT extends MetricCell<UpdateT>>
+ private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
MetricsMap<MetricName, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
@@ -156,4 +158,21 @@ public class MetricsContainer {
extractCumulatives(distributions),
extractCumulatives(gauges));
}
+
+ /**
+ * Update values of this {@link MetricsContainer} by merging the value of another cell.
+ */
+ public void update(MetricsContainer other) {
+ updateCells(counters, other.counters);
+ updateCells(distributions, other.distributions);
+ updateCells(gauges, other.gauges);
+ }
+
+ private <UserT extends Metric, DataT, CellT extends MetricCell<UserT, DataT>> void updateCells(
+ MetricsMap<MetricName, CellT> current,
+ MetricsMap<MetricName, CellT> updates) {
+ for (Map.Entry<MetricName, CellT> counter : updates.entries()) {
+ current.get(counter.getKey()).update(counter.getValue());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
new file mode 100644
index 0000000..d01e970
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+
+/**
+ * Metrics containers by step.
+ *
+ * <p>This class is not thread-safe.</p>
+ */
+public class MetricsContainerStepMap implements Serializable {
+ private Map<String, MetricsContainer> metricsContainers;
+
+ public MetricsContainerStepMap() {
+ this.metricsContainers = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Returns the container for the given step name.
+ */
+ public MetricsContainer getContainer(String stepName) {
+ if (!metricsContainers.containsKey(stepName)) {
+ metricsContainers.put(stepName, new MetricsContainer(stepName));
+ }
+ return metricsContainers.get(stepName);
+ }
+
+ /**
+ * Update this {@link MetricsContainerStepMap} with all values from given
+ * {@link MetricsContainerStepMap}.
+ */
+ public void updateAll(MetricsContainerStepMap other) {
+ for (Map.Entry<String, MetricsContainer> container : other.metricsContainers.entrySet()) {
+ getContainer(container.getKey()).update(container.getValue());
+ }
+ }
+
+ /**
+ * Update {@link MetricsContainer} for given step in this map with all values from given
+ * {@link MetricsContainer}.
+ */
+ public void update(String step, MetricsContainer container) {
+ getContainer(step).update(container);
+ }
+
+ /**
+ * Returns {@link MetricResults} based on given
+ * {@link MetricsContainerStepMap MetricsContainerStepMaps} of attempted and committed metrics.
+ *
+ * <p>This constructor is intended for runners which support both attempted and committed
+ * metrics.
+ */
+ public static MetricResults asMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers,
+ MetricsContainerStepMap committedMetricsContainers) {
+ return new MetricsContainerStepMapMetricResults(
+ attemptedMetricsContainers,
+ committedMetricsContainers);
+ }
+
+ /**
+ * Returns {@link MetricResults} based on given {@link MetricsContainerStepMap} of attempted
+ * metrics.
+ *
+ * <p>This constructor is intended for runners which only support `attempted` metrics.
+ * Accessing {@link MetricResult#committed()} in the resulting {@link MetricResults} will result
+ * in an {@link UnsupportedOperationException}.</p>
+ */
+ public static MetricResults asAttemptedOnlyMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers) {
+ return new MetricsContainerStepMapMetricResults(attemptedMetricsContainers);
+ }
+
+ private Map<String, MetricsContainer> getMetricsContainers() {
+ return metricsContainers;
+ }
+
+ private static class MetricsContainerStepMapMetricResults extends MetricResults {
+ private final Map<MetricKey, AttemptedAndCommitted<Long>> counters = new HashMap<>();
+ private final Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions =
+ new HashMap<>();
+ private final Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges = new HashMap<>();
+ private final boolean isCommittedSupported;
+
+ private MetricsContainerStepMapMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers) {
+ this(attemptedMetricsContainers, new MetricsContainerStepMap(), false);
+ }
+
+ private MetricsContainerStepMapMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers,
+ MetricsContainerStepMap committedMetricsContainers) {
+ this(attemptedMetricsContainers, committedMetricsContainers, true);
+ }
+
+ private MetricsContainerStepMapMetricResults(
+ MetricsContainerStepMap attemptedMetricsContainers,
+ MetricsContainerStepMap committedMetricsContainers,
+ boolean isCommittedSupported) {
+ for (MetricsContainer container
+ : attemptedMetricsContainers.getMetricsContainers().values()) {
+ MetricUpdates cumulative = container.getCumulative();
+ mergeCounters(counters, cumulative.counterUpdates(), attemptedCounterUpdateFn());
+ mergeDistributions(distributions, cumulative.distributionUpdates(),
+ attemptedDistributionUpdateFn());
+ mergeGauges(gauges, cumulative.gaugeUpdates(), attemptedGaugeUpdateFn());
+ }
+ for (MetricsContainer container
+ : committedMetricsContainers.getMetricsContainers().values()) {
+ MetricUpdates cumulative = container.getCumulative();
+ mergeCounters(counters, cumulative.counterUpdates(), committedCounterUpdateFn());
+ mergeDistributions(distributions, cumulative.distributionUpdates(),
+ committedDistributionUpdateFn());
+ mergeGauges(gauges, cumulative.gaugeUpdates(), committedGaugeUpdateFn());
+ }
+ this.isCommittedSupported = isCommittedSupported;
+ }
+
+ private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>>
+ attemptedDistributionUpdateFn() {
+ return new Function<MetricUpdate<DistributionData>,
+ AttemptedAndCommitted<DistributionData>>() {
+ @Override
+ public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ input,
+ MetricUpdate.create(key, DistributionData.EMPTY));
+ }
+ };
+ }
+
+ private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>>
+ committedDistributionUpdateFn() {
+ return new Function<MetricUpdate<DistributionData>,
+ AttemptedAndCommitted<DistributionData>>() {
+ @Override
+ public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(key, DistributionData.EMPTY),
+ input);
+ }
+ };
+ }
+
+ private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+ attemptedGaugeUpdateFn() {
+ return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() {
+ @Override
+ public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ input,
+ MetricUpdate.create(key, GaugeData.empty()));
+ }
+ };
+ }
+
+ private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+ committedGaugeUpdateFn() {
+ return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() {
+ @Override
+ public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(key, GaugeData.empty()),
+ input);
+ }
+ };
+ }
+
+ private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> attemptedCounterUpdateFn() {
+ return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+ @Override
+ public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ input,
+ MetricUpdate.create(key, 0L));
+ }
+ };
+ }
+
+ private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> committedCounterUpdateFn() {
+ return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+ @Override
+ public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+ MetricKey key = input.getKey();
+ return new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(key, 0L),
+ input);
+ }
+ };
+ }
+
+ @Override
+ public MetricQueryResults queryMetrics(MetricsFilter filter) {
+ return new QueryResults(filter);
+ }
+
+ private class QueryResults implements MetricQueryResults {
+ private final MetricsFilter filter;
+
+ private QueryResults(MetricsFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public Iterable<MetricResult<Long>> counters() {
+ return
+ FluentIterable
+ .from(counters.values())
+ .filter(matchesFilter(filter))
+ .transform(counterUpdateToResult())
+ .toList();
+ }
+
+ @Override
+ public Iterable<MetricResult<DistributionResult>> distributions() {
+ return
+ FluentIterable
+ .from(distributions.values())
+ .filter(matchesFilter(filter))
+ .transform(distributionUpdateToResult())
+ .toList();
+ }
+
+ @Override
+ public Iterable<MetricResult<GaugeResult>> gauges() {
+ return
+ FluentIterable
+ .from(gauges.values())
+ .filter(matchesFilter(filter))
+ .transform(gaugeUpdateToResult())
+ .toList();
+ }
+
+ private Predicate<AttemptedAndCommitted<?>> matchesFilter(final MetricsFilter filter) {
+ return new Predicate<AttemptedAndCommitted<?>>() {
+ @Override
+ public boolean apply(AttemptedAndCommitted<?> attemptedAndCommitted) {
+ return MetricFiltering.matches(filter, attemptedAndCommitted.getKey());
+ }
+ };
+ }
+ }
+
+ private Function<AttemptedAndCommitted<Long>, MetricResult<Long>> counterUpdateToResult() {
+ return new
+ Function<AttemptedAndCommitted<Long>, MetricResult<Long>>() {
+ @Override
+ public MetricResult<Long>
+ apply(AttemptedAndCommitted<Long> metricResult) {
+ MetricKey key = metricResult.getKey();
+ return new AccumulatedMetricResult<>(
+ key.metricName(),
+ key.stepName(),
+ metricResult.getAttempted().getUpdate(),
+ isCommittedSupported
+ ? metricResult.getCommitted().getUpdate()
+ : null,
+ isCommittedSupported);
+ }
+ };
+ }
+
+ private Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>>
+ distributionUpdateToResult() {
+ return new
+ Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>>() {
+ @Override
+ public MetricResult<DistributionResult>
+ apply(AttemptedAndCommitted<DistributionData> metricResult) {
+ MetricKey key = metricResult.getKey();
+ return new AccumulatedMetricResult<>(
+ key.metricName(),
+ key.stepName(),
+ metricResult.getAttempted().getUpdate().extractResult(),
+ isCommittedSupported
+ ? metricResult.getCommitted().getUpdate().extractResult()
+ : null,
+ isCommittedSupported);
+ }
+ };
+ }
+
+ private Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>>
+ gaugeUpdateToResult() {
+ return new
+ Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>>() {
+ @Override
+ public MetricResult<GaugeResult>
+ apply(AttemptedAndCommitted<GaugeData> metricResult) {
+ MetricKey key = metricResult.getKey();
+ return new AccumulatedMetricResult<>(
+ key.metricName(),
+ key.stepName(),
+ metricResult.getAttempted().getUpdate().extractResult(),
+ isCommittedSupported
+ ? metricResult.getCommitted().getUpdate().extractResult()
+ : null,
+ isCommittedSupported);
+ }
+ };
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private void mergeCounters(
+ Map<MetricKey, AttemptedAndCommitted<Long>> counters,
+ Iterable<MetricUpdate<Long>> updates,
+ Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> updateToAttemptedAndCommittedFn) {
+ for (MetricUpdate<Long> metricUpdate : updates) {
+ MetricKey key = metricUpdate.getKey();
+ AttemptedAndCommitted<Long> update =
+ updateToAttemptedAndCommittedFn.apply(metricUpdate);
+ if (counters.containsKey(key)) {
+ AttemptedAndCommitted<Long> current = counters.get(key);
+ update = new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(
+ key,
+ update.getAttempted().getUpdate() + current.getAttempted().getUpdate()),
+ MetricUpdate.create(
+ key,
+ update.getCommitted().getUpdate() + current.getCommitted().getUpdate()));
+ }
+ counters.put(key, update);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private void mergeDistributions(
+ Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions,
+ Iterable<MetricUpdate<DistributionData>> updates,
+ Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>>
+ updateToAttemptedAndCommittedFn) {
+ for (MetricUpdate<DistributionData> metricUpdate : updates) {
+ MetricKey key = metricUpdate.getKey();
+ AttemptedAndCommitted<DistributionData> update =
+ updateToAttemptedAndCommittedFn.apply(metricUpdate);
+ if (distributions.containsKey(key)) {
+ AttemptedAndCommitted<DistributionData> current = distributions.get(key);
+ update = new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(
+ key,
+ update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+ MetricUpdate.create(
+ key,
+ update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+ }
+ distributions.put(key, update);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private void mergeGauges(
+ Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges,
+ Iterable<MetricUpdate<GaugeData>> updates,
+ Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+ updateToAttemptedAndCommittedFn) {
+ for (MetricUpdate<GaugeData> metricUpdate : updates) {
+ MetricKey key = metricUpdate.getKey();
+ AttemptedAndCommitted<GaugeData> update =
+ updateToAttemptedAndCommittedFn.apply(metricUpdate);
+ if (gauges.containsKey(key)) {
+ AttemptedAndCommitted<GaugeData> current = gauges.get(key);
+ update = new AttemptedAndCommitted<>(
+ key,
+ MetricUpdate.create(
+ key,
+ update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+ MetricUpdate.create(
+ key,
+ update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+ }
+ gauges.put(key, update);
+ }
+ }
+
+ /**
+ * Accumulated implementation of {@link MetricResult}.
+ */
+ private static class AccumulatedMetricResult<T> implements MetricResult<T> {
+ private final MetricName name;
+ private final String step;
+ private final T attempted;
+ private final T committed;
+ private final boolean isCommittedSupported;
+
+ private AccumulatedMetricResult(
+ MetricName name,
+ String step,
+ T attempted,
+ T committed,
+ boolean isCommittedSupported) {
+ this.name = name;
+ this.step = step;
+ this.attempted = attempted;
+ this.committed = committed;
+ this.isCommittedSupported = isCommittedSupported;
+ }
+
+ @Override
+ public MetricName name() {
+ return name;
+ }
+
+ @Override
+ public String step() {
+ return step;
+ }
+
+ @Override
+ public T committed() {
+ if (!isCommittedSupported) {
+ throw new UnsupportedOperationException("This runner does not currently support committed"
+ + " metrics results. Please use 'attempted' instead.");
+ }
+ return committed;
+ }
+
+ @Override
+ public T attempted() {
+ return attempted;
+ }
+ }
+
+ /**
+ * Attempted and committed {@link MetricUpdate MetricUpdates}.
+ */
+ private static class AttemptedAndCommitted<T> {
+ private final MetricKey key;
+ private final MetricUpdate<T> attempted;
+ private final MetricUpdate<T> committed;
+
+ private AttemptedAndCommitted(MetricKey key, MetricUpdate<T> attempted,
+ MetricUpdate<T> committed) {
+ this.key = key;
+ this.attempted = attempted;
+ this.committed = committed;
+ }
+
+ private MetricKey getKey() {
+ return key;
+ }
+
+ private MetricUpdate<T> getAttempted() {
+ return attempted;
+ }
+
+ private MetricUpdate<T> getCommitted() {
+ return committed;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
index 5a02106..8c26f18 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.metrics;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Iterables;
+import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -31,10 +32,10 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
* in a thread-safe manner.
*/
@Experimental(Kind.METRICS)
-public class MetricsMap<K, T> {
+public class MetricsMap<K, T> implements Serializable {
/** Interface for creating instances to populate the {@link MetricsMap}. */
- public interface Factory<K, T> {
+ public interface Factory<K, T> extends Serializable {
/**
* Create an instance of {@code T} to use with the given {@code key}.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
index 408f145..26554d4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
@@ -35,8 +35,8 @@ public class CounterCellTest {
@Test
public void testDeltaAndCumulative() {
- cell.inc(5);
- cell.inc(7);
+ cell.update(5);
+ cell.update(7);
assertThat(cell.getCumulative(), equalTo(12L));
assertThat("getCumulative is idempotent", cell.getCumulative(), equalTo(12L));
@@ -45,7 +45,7 @@ public class CounterCellTest {
assertThat(cell.getDirty().beforeCommit(), equalTo(false));
assertThat(cell.getCumulative(), equalTo(12L));
- cell.inc(30);
+ cell.update(30);
assertThat(cell.getCumulative(), equalTo(42L));
assertThat(cell.getDirty().beforeCommit(), equalTo(true));
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
new file mode 100644
index 0000000..0428ce1
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult;
+import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asMetricResults;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.hamcrest.collection.IsIterableWithSize;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests for {@link MetricsContainerStepMap}.
+ */
+public class MetricsContainerStepMapTest {
+
+ private static final String NAMESPACE = MetricsContainerStepMapTest.class.getName();
+ private static final String STEP1 = "myStep1";
+ private static final String STEP2 = "myStep2";
+
+ private static final long VALUE = 100;
+
+ private static final Counter counter =
+ Metrics.counter(
+ MetricsContainerStepMapTest.class,
+ "myCounter");
+ private static final Distribution distribution =
+ Metrics.distribution(
+ MetricsContainerStepMapTest.class,
+ "myDistribution");
+ private static final Gauge gauge =
+ Metrics.gauge(
+ MetricsContainerStepMapTest.class,
+ "myGauge");
+
+ private static final MetricsContainer metricsContainer;
+
+ static {
+ metricsContainer = new MetricsContainer(null);
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
+ counter.inc(VALUE);
+ distribution.update(VALUE);
+ distribution.update(VALUE * 2);
+ gauge.set(VALUE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Rule
+ public transient ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testAttemptedAccumulatedMetricResults() {
+ MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+ attemptedMetrics.update(STEP1, metricsContainer);
+ attemptedMetrics.update(STEP2, metricsContainer);
+ attemptedMetrics.update(STEP2, metricsContainer);
+
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(attemptedMetrics);
+
+ MetricQueryResults step1res =
+ metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+ assertIterableSize(step1res.counters(), 1);
+ assertIterableSize(step1res.distributions(), 1);
+ assertIterableSize(step1res.gauges(), 1);
+
+ assertCounter(step1res, STEP1, VALUE, false);
+ assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2),
+ false);
+ assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false);
+
+ MetricQueryResults step2res =
+ metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
+
+ assertIterableSize(step2res.counters(), 1);
+ assertIterableSize(step2res.distributions(), 1);
+ assertIterableSize(step2res.gauges(), 1);
+
+ assertCounter(step2res, STEP2, VALUE * 2, false);
+ assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2),
+ false);
+ assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false);
+
+ MetricQueryResults allres =
+ metricResults.queryMetrics(MetricsFilter.builder().build());
+
+ assertIterableSize(allres.counters(), 2);
+ assertIterableSize(allres.distributions(), 2);
+ assertIterableSize(allres.gauges(), 2);
+ }
+
+ @Test
+ public void testCounterCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+ MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+ attemptedMetrics.update(STEP1, metricsContainer);
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(attemptedMetrics);
+
+ MetricQueryResults step1res =
+ metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage("This runner does not currently support committed metrics results.");
+
+ assertCounter(step1res, STEP1, VALUE, true);
+ }
+
+ @Test
+ public void testDistributionCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+ MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+ attemptedMetrics.update(STEP1, metricsContainer);
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(attemptedMetrics);
+
+ MetricQueryResults step1res =
+ metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage("This runner does not currently support committed metrics results.");
+
+ assertDistribution(step1res, STEP1, DistributionResult.ZERO, true);
+ }
+
+ @Test
+ public void testGaugeCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+ MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+ attemptedMetrics.update(STEP1, metricsContainer);
+ MetricResults metricResults =
+ asAttemptedOnlyMetricResults(attemptedMetrics);
+
+ MetricQueryResults step1res =
+ metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage("This runner does not currently support committed metrics results.");
+
+ assertGauge(step1res, STEP1, GaugeResult.empty(), true);
+ }
+
+ @Test
+ public void testAttemptedAndCommittedAccumulatedMetricResults() {
+ MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+ attemptedMetrics.update(STEP1, metricsContainer);
+ attemptedMetrics.update(STEP1, metricsContainer);
+ attemptedMetrics.update(STEP2, metricsContainer);
+ attemptedMetrics.update(STEP2, metricsContainer);
+ attemptedMetrics.update(STEP2, metricsContainer);
+
+ MetricsContainerStepMap committedMetrics = new MetricsContainerStepMap();
+ committedMetrics.update(STEP1, metricsContainer);
+ committedMetrics.update(STEP2, metricsContainer);
+ committedMetrics.update(STEP2, metricsContainer);
+
+ MetricResults metricResults =
+ asMetricResults(attemptedMetrics, committedMetrics);
+
+ MetricQueryResults step1res =
+ metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+ assertIterableSize(step1res.counters(), 1);
+ assertIterableSize(step1res.distributions(), 1);
+ assertIterableSize(step1res.gauges(), 1);
+
+ assertCounter(step1res, STEP1, VALUE * 2, false);
+ assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2),
+ false);
+ assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false);
+
+ assertCounter(step1res, STEP1, VALUE, true);
+ assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2),
+ true);
+ assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), true);
+
+ MetricQueryResults step2res =
+ metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
+
+ assertIterableSize(step2res.counters(), 1);
+ assertIterableSize(step2res.distributions(), 1);
+ assertIterableSize(step2res.gauges(), 1);
+
+ assertCounter(step2res, STEP2, VALUE * 3, false);
+ assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 9, 6, VALUE, VALUE * 2),
+ false);
+ assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false);
+
+ assertCounter(step2res, STEP2, VALUE * 2, true);
+ assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2),
+ true);
+ assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), true);
+
+ MetricQueryResults allres =
+ metricResults.queryMetrics(MetricsFilter.builder().build());
+
+ assertIterableSize(allres.counters(), 2);
+ assertIterableSize(allres.distributions(), 2);
+ assertIterableSize(allres.gauges(), 2);
+ }
+
+ private <T> void assertIterableSize(Iterable<T> iterable, int size) {
+ assertThat(iterable, IsIterableWithSize.<T>iterableWithSize(size));
+ }
+
+ private void assertCounter(
+ MetricQueryResults metricQueryResults,
+ String step,
+ Long expected,
+ boolean isCommitted) {
+ assertThat(
+ metricQueryResults.counters(),
+ hasItem(metricsResult(NAMESPACE, counter.getName().name(), step, expected, isCommitted)));
+ }
+
+ private void assertDistribution(
+ MetricQueryResults metricQueryResults,
+ String step,
+ DistributionResult expected,
+ boolean isCommitted) {
+ assertThat(
+ metricQueryResults.distributions(),
+ hasItem(metricsResult(NAMESPACE, distribution.getName().name(), step, expected,
+ isCommitted)));
+ }
+
+ private void assertGauge(
+ MetricQueryResults metricQueryResults,
+ String step,
+ GaugeResult expected,
+ boolean isCommitted) {
+ assertThat(
+ metricQueryResults.gauges(),
+ hasItem(metricsResult(NAMESPACE, gauge.getName().name(), step, expected, isCommitted)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/46c2f935/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
index 58797ce..38c00d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
@@ -47,8 +47,8 @@ public class MetricsContainerTest {
assertThat("After commit no counters should be dirty",
container.getUpdates().counterUpdates(), emptyIterable());
- c1.inc(5L);
- c2.inc(4L);
+ c1.update(5L);
+ c2.update(4L);
assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder(
metricUpdate("name1", 5L),
@@ -63,7 +63,7 @@ public class MetricsContainerTest {
assertThat("After commit there are no updates",
container.getUpdates().counterUpdates(), emptyIterable());
- c1.inc(8L);
+ c1.update(8L);
assertThat(container.getUpdates().counterUpdates(), contains(
metricUpdate("name1", 13L)));
}
@@ -73,9 +73,9 @@ public class MetricsContainerTest {
MetricsContainer container = new MetricsContainer("step1");
CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
- c1.inc(2L);
- c2.inc(4L);
- c1.inc(3L);
+ c1.update(2L);
+ c2.update(4L);
+ c1.update(3L);
container.getUpdates();
container.commitUpdates();
@@ -84,7 +84,7 @@ public class MetricsContainerTest {
metricUpdate("name1", 5L),
metricUpdate("name2", 4L)));
- c1.inc(8L);
+ c1.update(8L);
assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder(
metricUpdate("name1", 13L),
metricUpdate("name2", 4L)));