You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/10/13 22:39:37 UTC
[4/5] incubator-beam git commit: Initial Metrics API for Beam Java
Initial Metrics API for Beam Java
This includes a simple Counter metric and a Distribution metric that
reports the SUM, COUNT, MIN, MAX and MEAN of the reported values.
The API is labeled @Experimental since metrics will only be reported
and queryable with the DirectRunner, and the API may change as it is
implemented on other runners.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8524ed95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8524ed95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8524ed95
Branch: refs/heads/master
Commit: 8524ed9545f5af4bdeb54601f333549b34eb35aa
Parents: e969f3d
Author: bchambers <bc...@google.com>
Authored: Wed Oct 12 10:29:50 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Oct 13 15:29:29 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/annotations/Experimental.java | 3 +
.../org/apache/beam/sdk/metrics/Counter.java | 40 +++++
.../apache/beam/sdk/metrics/CounterCell.java | 76 ++++++++++
.../org/apache/beam/sdk/metrics/DirtyState.java | 98 ++++++++++++
.../apache/beam/sdk/metrics/Distribution.java | 30 ++++
.../beam/sdk/metrics/DistributionCell.java | 58 +++++++
.../beam/sdk/metrics/DistributionData.java | 59 ++++++++
.../beam/sdk/metrics/DistributionResult.java | 42 ++++++
.../org/apache/beam/sdk/metrics/Metric.java | 24 +++
.../org/apache/beam/sdk/metrics/MetricCell.java | 47 ++++++
.../org/apache/beam/sdk/metrics/MetricKey.java | 40 +++++
.../org/apache/beam/sdk/metrics/MetricName.java | 46 ++++++
.../beam/sdk/metrics/MetricNameFilter.java | 60 ++++++++
.../beam/sdk/metrics/MetricQueryResults.java | 33 ++++
.../apache/beam/sdk/metrics/MetricResult.java | 45 ++++++
.../apache/beam/sdk/metrics/MetricResults.java | 34 +++++
.../apache/beam/sdk/metrics/MetricUpdates.java | 72 +++++++++
.../org/apache/beam/sdk/metrics/Metrics.java | 110 ++++++++++++++
.../beam/sdk/metrics/MetricsContainer.java | 150 +++++++++++++++++++
.../beam/sdk/metrics/MetricsEnvironment.java | 85 +++++++++++
.../apache/beam/sdk/metrics/MetricsFilter.java | 86 +++++++++++
.../org/apache/beam/sdk/metrics/MetricsMap.java | 86 +++++++++++
.../apache/beam/sdk/metrics/package-info.java | 28 ++++
.../beam/sdk/metrics/CounterCellTest.java | 55 +++++++
.../apache/beam/sdk/metrics/DirtyStateTest.java | 56 +++++++
.../beam/sdk/metrics/DistributionCellTest.java | 53 +++++++
.../apache/beam/sdk/metrics/MetricMatchers.java | 99 ++++++++++++
.../beam/sdk/metrics/MetricsContainerTest.java | 129 ++++++++++++++++
.../sdk/metrics/MetricsEnvironmentTest.java | 63 ++++++++
.../apache/beam/sdk/metrics/MetricsMapTest.java | 103 +++++++++++++
.../apache/beam/sdk/metrics/MetricsTest.java | 98 ++++++++++++
31 files changed, 2008 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 789f4b2..14d2358 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -83,5 +83,8 @@ public @interface Experimental {
* Do not use: API is unstable and runner support is incomplete.
*/
SPLITTABLE_DO_FN,
+
+ /** Metrics-related experimental APIs. */
+ METRICS
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java
new file mode 100644
index 0000000..9f48016
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A metric that reports a single long value and can be incremented or decremented.
+ */
+@Experimental(Kind.METRICS)
+public interface Counter extends Metric {
+
+ /** Increment the counter. */
+ void inc();
+
+ /** Increment the counter by the given amount. */
+ void inc(long n);
+
+ /* Decrement the counter. */
+ void dec();
+
+ /* Decrement the counter by the given amount. */
+ void dec(long n);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
new file mode 100644
index 0000000..bb65833
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Tracks the current value (and delta) for a Counter metric for a specific context and bundle.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is within a runner where
+ * a counter is being reported for a specific step (rather than the counter in the current context).
+ */
+@Experimental(Kind.METRICS)
+class CounterCell implements MetricCell<Counter, Long>, Counter {
+
+ private final DirtyState dirty = new DirtyState();
+ private final AtomicLong value = new AtomicLong();
+
+ /** Increment the counter by the given amount. */
+ private void add(long n) {
+ value.addAndGet(n);
+ dirty.afterModification();
+ }
+
+ @Override
+ public DirtyState getDirty() {
+ return dirty;
+ }
+
+ @Override
+ public Long getCumulative() {
+ return value.get();
+ }
+
+ @Override
+ public Counter getInterface() {
+ return this;
+ }
+
+ @Override
+ public void inc() {
+ add(1);
+ }
+
+ @Override
+ public void inc(long n) {
+ add(n);
+ }
+
+ @Override
+ public void dec() {
+ add(-1);
+ }
+
+ @Override
+ public void dec(long n) {
+ add(-n);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
new file mode 100644
index 0000000..6706be8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Atomically tracks the dirty-state of a metric.
+ *
+ * <p>Reporting an update is split into two parts such that only changes made before the call to
+ * {@link #beforeCommit()} are committed when {@link #afterCommit()} is invoked. This allows for
+ * a two-step commit process of gathering all the dirty updates (calling {#link beforeCommit()})
+ * followed by committing and calling {#link afterCommit()}.
+ *
+ * <p>The tracking of dirty states is done conservatively -- sometimes {@link #beforeCommit()}
+ * will return true (indicating a dirty metric) even if there have been no changes since the last
+ * commit.
+ *
+ * <p>There is also a possible race when the underlying metric is modified but the call to
+ * {@link #afterModification()} hasn't happened before the call to {@link #beforeCommit()}. In this
+ * case the next round of metric updating will see the changes. If this was for the final commit,
+ * then the metric updates shouldn't be extracted until all possible user modifications have
+ * completed.
+ */
+@Experimental(Kind.METRICS)
+class DirtyState {
+ private enum State {
+ /** Indicates that there have been changes to the MetricCell since last commit. */
+ DIRTY,
+ /** Indicates that there have been no changes to the MetricCell since last commit. */
+ CLEAN,
+ /** Indicates that a commit of the current value is in progress. */
+ COMMITTING
+ }
+
+ private final AtomicReference<State> dirty = new AtomicReference<>(State.DIRTY);
+
+ /**
+ * Indicate that changes have been made to the metric being tracked by this {@link DirtyState}.
+ *
+ * <p>Should be called <b>after</b> modification of the value.
+ */
+ public void afterModification() {
+ dirty.set(State.DIRTY);
+ }
+
+ /**
+ * Check the dirty state and mark the metric as committing.
+ *
+ * <p>If the state was {@code CLEAN}, this returns {@code false}. If the state was {@code DIRTY}
+ * or {@code COMMITTING} this returns {@code true} and sets the state to {@code COMMITTING}.
+ *
+ * @return {@code false} if the state is clean and {@code true} otherwise.
+ */
+ public boolean beforeCommit() {
+ // After this loop, we want the state to be either CLEAN or COMMITTING.
+ // If the state was CLEAN, we don't need to do anything (and exit the loop early)
+ // If the state was DIRTY, we will attempt to do a CAS(DIRTY, COMMITTING). This will only
+ // fail if another thread is getting updates which generally shouldn't be the case.
+ // If the state was COMMITTING, we will attempt to do a CAS(COMMITTING, COMMITTING). This will
+ // fail if another thread commits updates (which shouldn't be the case) or if the user code
+ // updates the metric, in which case it will transition to DIRTY and the next iteration will
+ // successfully update it.
+ State state;
+ do {
+ state = dirty.get();
+ } while (state != State.CLEAN && !dirty.compareAndSet(state, State.COMMITTING));
+
+ return state != State.CLEAN;
+ }
+
+ /**
+ * Mark any changes up to the most recently call to {@link #beforeCommit()}} as committed.
+ * The next call to {@link #beforeCommit()} will return {@code false} unless there have
+ * been changes made since the previous call to {@link #beforeCommit()}.
+ */
+ public void afterCommit() {
+ dirty.compareAndSet(State.COMMITTING, State.CLEAN);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
new file mode 100644
index 0000000..b789020
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A metric that reports information about the distribution of reported values.
+ */
+@Experimental(Kind.METRICS)
+public interface Distribution extends Metric {
+ /** Add an observation to this distribution. */
+ void update(long value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
new file mode 100644
index 0000000..f0074a9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Tracks the current value (and delta) for a Distribution metric.
+ */
+@Experimental(Kind.METRICS)
+class DistributionCell implements MetricCell<Distribution, DistributionData>, Distribution {
+
+ private final DirtyState dirty = new DirtyState();
+ private final AtomicReference<DistributionData> value =
+ new AtomicReference<DistributionData>(DistributionData.EMPTY);
+
+ /** Increment the counter by the given amount. */
+ @Override
+ public void update(long n) {
+ DistributionData original;
+ do {
+ original = value.get();
+ } while (!value.compareAndSet(original, original.combine(DistributionData.singleton(n))));
+ dirty.afterModification();
+ }
+
+ @Override
+ public DirtyState getDirty() {
+ return dirty;
+ }
+
+ @Override
+ public DistributionData getCumulative() {
+ return value.get();
+ }
+
+ @Override
+ public Distribution getInterface() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
new file mode 100644
index 0000000..59c7fbd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * Data describing the the distribution. This should retain enough detail that it can be combined
+ * with other {@link DistributionData}.
+ *
+ * <p>This is kept distinct from {@link DistributionResult} since this may be extended to include
+ * data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include
+ * the approximate value of those quantiles.
+ */
+@AutoValue
+public abstract class DistributionData {
+
+ public abstract long sum();
+ public abstract long count();
+ public abstract long min();
+ public abstract long max();
+
+ public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE);
+
+ public static DistributionData create(long sum, long count, long min, long max) {
+ return new AutoValue_DistributionData(sum, count, min, max);
+ }
+
+ public static DistributionData singleton(long value) {
+ return create(value, 1, value, value);
+ }
+
+ public DistributionData combine(DistributionData value) {
+ return create(
+ sum() + value.sum(),
+ count() + value.count(),
+ Math.min(value.min(), min()),
+ Math.max(value.max(), max()));
+ }
+
+ public DistributionResult extractResult() {
+ return DistributionResult.create(sum(), count(), min(), max());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
new file mode 100644
index 0000000..27c242c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * The result of a {@link Distribution} metric.
+ */
+@AutoValue
+public abstract class DistributionResult {
+
+ public abstract long sum();
+ public abstract long count();
+ public abstract long min();
+ public abstract long max();
+
+ public double mean() {
+ return (1.0 * sum()) / count();
+ }
+
+ public static final DistributionResult ZERO = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE);
+
+ public static DistributionResult create(long sum, long count, long min, long max) {
+ return new AutoValue_DistributionResult(sum, count, min, max);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
new file mode 100644
index 0000000..37a5f65
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+/**
+ * Marker interface for all user-facing metrics.
+ */
+public interface Metric { }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
new file mode 100644
index 0000000..211b2dd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a
+ * specific metric name in a single context.
+ *
+ * @param <UserT> The type of the user interface for reporting changes to this cell.
+ * @param <DataT> The type of metric data stored (and extracted) from this cell.
+ */
+@Experimental(Kind.METRICS)
+interface MetricCell<UserT extends Metric, DataT> {
+
+ /**
+ * Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes.
+ */
+ DirtyState getDirty();
+
+ /**
+ * Return the cumulative value of this metric.
+ */
+ DataT getCumulative();
+
+ /**
+ * Return the user-facing mutator for this cell.
+ */
+ UserT getInterface();
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
new file mode 100644
index 0000000..bfa4df5
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Metrics are keyed by the step name they are associated with and the name of the metric.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricKey {
+
+ /** The step name that is associated with this metric. */
+ public abstract String stepName();
+
+ /** The name of the metric. */
+ public abstract MetricName metricName();
+
+ public static MetricKey create(String stepName, MetricName metricName) {
+ return new AutoValue_MetricKey(stepName, metricName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
new file mode 100644
index 0000000..843a885
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The name of a metric consists of a {@link #namespace} and a {@link #name}. The {@link #namespace}
+ * allows grouping related metrics together and also prevents collisions between multiple metrics
+ * with the same name.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricName {
+
+ /** The namespace associated with this metric. */
+ public abstract String namespace();
+
+ /** The name of this metric. */
+ public abstract String name();
+
+ public static MetricName named(String namespace, String name) {
+ return new AutoValue_MetricName(namespace, name);
+ }
+
+ public static MetricName named(Class<?> namespace, String name) {
+ return new AutoValue_MetricName(namespace.getName(), name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
new file mode 100644
index 0000000..a2c3798
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The name of a metric.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricNameFilter {
+
+ /** The inNamespace that a metric must be in to match this {@link MetricNameFilter}. */
+ public abstract String getNamespace();
+
+ /** If set, the metric must have this name to match this {@link MetricNameFilter}. */
+ @Nullable
+ public abstract String getName();
+
+ public static MetricNameFilter inNamespace(String namespace) {
+ return new AutoValue_MetricNameFilter(namespace, null);
+ }
+
+ public static MetricNameFilter inNamespace(Class<?> namespace) {
+ return new AutoValue_MetricNameFilter(namespace.getName(), null);
+ }
+
+ public static MetricNameFilter named(String namespace, String name) {
+ checkNotNull(name, "Must specify a name");
+ return new AutoValue_MetricNameFilter(namespace, name);
+ }
+
+ public static MetricNameFilter named(Class<?> namespace, String name) {
+ checkNotNull(namespace, "Must specify a inNamespace");
+ checkNotNull(name, "Must specify a name");
+ return new AutoValue_MetricNameFilter(namespace.getSimpleName(), name);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
new file mode 100644
index 0000000..2241ba8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The results of a query for metrics. Allows accessing all of the metrics that matched the filter.
+ */
+@Experimental(Kind.METRICS)
+public interface MetricQueryResults {
+ /** Return the metric results for the counters that matched the filter. */
+ Iterable<MetricResult<Long>> counters();
+
+ /** Return the metric results for the distributions that matched the filter. */
+ Iterable<MetricResult<DistributionResult>> distributions();
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
new file mode 100644
index 0000000..9a3971a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The results of a single current metric.
+ */
+@Experimental(Kind.METRICS)
+public interface MetricResult<T> {
+ /** Return the name of the metric. */
+ MetricName name();
+ /** Return the step context to which this metric result applies. */
+ String step();
+
+ /**
+ * Return the value of this metric across all successfully completed parts of the pipeline.
+ *
+ * <p>Not all runners will support committed metrics. If they are not supported, the runner will
+ * throw an {@link UnsupportedOperationException}.
+ */
+ T committed();
+
+ /**
+ * Return the value of this metric across all attempts of executing all parts of the pipeline.
+ */
+ T attempted();
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java
new file mode 100644
index 0000000..dab65ea
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Methods for interacting with the metrics of a pipeline that has been executed. Accessed via
+ * {@link PipelineResult#metrics()}.
+ */
+@Experimental(Kind.METRICS)
+public abstract class MetricResults {
+ /**
+ * Query for all metrics that match the filter.
+ */
+ public abstract MetricQueryResults queryMetrics(MetricsFilter filter);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
new file mode 100644
index 0000000..e84dc66
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Representation of multiple metric updates.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricUpdates {
+
+ public static final MetricUpdates EMPTY = MetricUpdates.create(
+ Collections.<MetricUpdate<Long>>emptyList(),
+ Collections.<MetricUpdate<DistributionData>>emptyList());
+
+ /**
+ * Representation of a single metric update.
+ * @param <T> The type of value representing the update.
+ */
+ @AutoValue
+ public abstract static class MetricUpdate<T> {
+
+ /** The key being updated. */
+ public abstract MetricKey getKey();
+ /** The value of the update. */
+ public abstract T getUpdate();
+
+ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
+ return new AutoValue_MetricUpdates_MetricUpdate(key, update);
+ }
+ }
+
+ /** Returns true if there are no updates in this MetricUpdates object. */
+ public boolean isEmpty() {
+ return Iterables.isEmpty(counterUpdates())
+ && Iterables.isEmpty(distributionUpdates());
+ }
+
+ /** All of the counter updates. */
+ public abstract Iterable<MetricUpdate<Long>> counterUpdates();
+
+ /** All of the distribution updates. */
+ public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates();
+
+ /** Create a new {@link MetricUpdates} bundle. */
+ public static MetricUpdates create(
+ Iterable<MetricUpdate<Long>> counterUpdates,
+ Iterable<MetricUpdate<DistributionData>> distributionUpdates) {
+ return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
new file mode 100644
index 0000000..b72a0b2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The <code>Metrics</code> is a utility class for producing various kinds of metrics for
+ * reporting properties of an executing pipeline.
+ */
+@Experimental(Kind.METRICS)
+public class Metrics {
+
+ private Metrics() {}
+
+ /**
+ * Create a metric that can be incremented and decremented, and is aggregated by taking the sum.
+ */
+ public static Counter counter(String namespace, String name) {
+ return new DelegatingCounter(MetricName.named(namespace, name));
+ }
+
+ /**
+ * Create a metric that can be incremented and decremented, and is aggregated by taking the sum.
+ */
+ public static Counter counter(Class<?> namespace, String name) {
+ return new DelegatingCounter(MetricName.named(namespace, name));
+ }
+
+ /**
+ * Create a metric that records various statistics about the distribution of reported values.
+ */
+ public static Distribution distribution(String namespace, String name) {
+ return new DelegatingDistribution(MetricName.named(namespace, name));
+ }
+
+ /**
+ * Create a metric that records various statistics about the distribution of reported values.
+ */
+ public static Distribution distribution(Class<?> namespace, String name) {
+ return new DelegatingDistribution(MetricName.named(namespace, name));
+ }
+
+ /** Implementation of {@link Counter} that delegates to the instance for the current context. */
+ private static class DelegatingCounter implements Counter {
+ private final MetricName name;
+
+ private DelegatingCounter(MetricName name) {
+ this.name = name;
+ }
+
+ /** Increment the counter. */
+ @Override public void inc() {
+ inc(1);
+ }
+
+ /** Increment the counter by the given amount. */
+ @Override public void inc(long n) {
+ MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+ if (container != null) {
+ container.getCounter(name).inc(n);
+ }
+ }
+
+ /* Decrement the counter. */
+ @Override public void dec() {
+ inc(-1);
+ }
+
+ /* Decrement the counter by the given amount. */
+ @Override public void dec(long n) {
+ inc(-1 * n);
+ }
+ }
+
+ /**
+ * Implementation of {@link Distribution} that delegates to the instance for the current context.
+ */
+ private static class DelegatingDistribution implements Distribution {
+ private final MetricName name;
+
+ private DelegatingDistribution(MetricName name) {
+ this.name = name;
+ }
+
+ @Override
+ public void update(long value) {
+ MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+ if (container != null) {
+ container.getDistribution(name).update(value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
new file mode 100644
index 0000000..10032a2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+
+/**
+ * Holds the metrics for a single step and unit-of-commit (bundle).
+ *
+ * <p>This class is thread-safe. It is intended to be used with 1 (or more) threads are updating
+ * metrics and at-most 1 thread is extracting updates by calling {@link #getUpdates} and
+ * {@link #commitUpdates}. Outside of this it is still safe. Although races in the update extraction
+ * may cause updates that don't actually have any changes, it will never lose an update.
+ *
+ * <p>For consistency, all threads that update metrics should finish before getting the final
+ * cumulative values/updates.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsContainer {
+
+ private final String stepName;
+
+ private MetricsMap<MetricName, CounterCell> counters =
+ new MetricsMap<>(new MetricsMap.Factory<MetricName, CounterCell>() {
+ @Override
+ public CounterCell createInstance(MetricName unusedKey) {
+ return new CounterCell();
+ }
+ });
+
+ private MetricsMap<MetricName, DistributionCell> distributions =
+ new MetricsMap<>(new MetricsMap.Factory<MetricName, DistributionCell>() {
+ @Override
+ public DistributionCell createInstance(MetricName unusedKey) {
+ return new DistributionCell();
+ }
+ });
+
+ /**
+ * Create a new {@link MetricsContainer} associated with the given {@code stepName}.
+ */
+ public MetricsContainer(String stepName) {
+ this.stepName = stepName;
+ }
+
+ /**
+ * Return the {@link CounterCell} that should be used for implementing the given
+ * {@code metricName} in this container.
+ */
+ public CounterCell getCounter(MetricName metricName) {
+ return counters.get(metricName);
+ }
+
+ public DistributionCell getDistribution(MetricName metricName) {
+ return distributions.get(metricName);
+ }
+
+ private <UpdateT, CellT extends MetricCell<?, UpdateT>>
+ ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
+ MetricsMap<MetricName, CellT> cells) {
+ ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
+ for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+ if (cell.getValue().getDirty().beforeCommit()) {
+ updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()),
+ cell.getValue().getCumulative()));
+ }
+ }
+ return updates.build();
+ }
+
+ /**
+ * Return the cumulative values for any metrics that have changed since the last time updates were
+ * committed.
+ */
+ public MetricUpdates getUpdates() {
+ return MetricUpdates.create(
+ extractUpdates(counters),
+ extractUpdates(distributions));
+ }
+
+ private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) {
+ for (MetricCell<?, ?> cell : cells.values()) {
+ cell.getDirty().afterCommit();
+ }
+ }
+
+ /**
+ * Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as
+ * committed.
+ */
+ public void commitUpdates() {
+ commitUpdates(counters);
+ commitUpdates(distributions);
+ }
+
+ private <UpdateT, CellT extends MetricCell<?, UpdateT>>
+ ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
+ MetricsMap<MetricName, CellT> cells) {
+ ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
+ for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+ UpdateT update = checkNotNull(cell.getValue().getCumulative());
+ updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), update));
+ }
+ return updates.build();
+ }
+
+ /**
+ * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this
+ * container.
+ */
+ public MetricUpdates getCumulative() {
+ ImmutableList.Builder<MetricUpdate<Long>> counterUpdates = ImmutableList.builder();
+ for (Map.Entry<MetricName, CounterCell> counter : counters.entries()) {
+ counterUpdates.add(MetricUpdate.create(
+ MetricKey.create(stepName, counter.getKey()), counter.getValue().getCumulative()));
+ }
+
+ ImmutableList.Builder<MetricUpdate<DistributionData>> distributionUpdates =
+ ImmutableList.builder();
+ for (Map.Entry<MetricName, DistributionCell> distribution : distributions.entries()) {
+ distributionUpdates.add(MetricUpdate.create(
+ MetricKey.create(stepName, distribution.getKey()),
+ distribution.getValue().getCumulative()));
+ }
+ return MetricUpdates.create(
+ extractCumulatives(counters),
+ extractCumulatives(distributions));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
new file mode 100644
index 0000000..ef2660a8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages and provides the metrics container associated with each thread.
+ *
+ * <p>Users should not interact directly with this class. Instead, use {@link Metrics} and the
+ * returned objects to create and modify metrics.
+ *
+ * <p>The runner should create {@link MetricsContainer} for each context in which metrics are
+ * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that
+ * may update metrics within that step.
+ *
+ * <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to
+ * the previous value) when exiting code that set the metrics container.
+ */
+public class MetricsEnvironment {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MetricsContainer.class);
+
+ private static final AtomicBoolean METRICS_SUPPORTED = new AtomicBoolean(false);
+ private static final AtomicBoolean REPORTED_MISSING_CONTAINER = new AtomicBoolean(false);
+
+ private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD =
+ new ThreadLocal<MetricsContainer>();
+
+ /** Set the {@link MetricsContainer} for the current thread. */
+ public static void setMetricsContainer(MetricsContainer container) {
+ CONTAINER_FOR_THREAD.set(container);
+ }
+
+
+ /** Clear the {@link MetricsContainer} for the current thread. */
+ public static void unsetMetricsContainer() {
+ CONTAINER_FOR_THREAD.remove();
+ }
+
+ /** Called by the run to indicate whether metrics reporting is supported. */
+ public static void setMetricsSupported(boolean supported) {
+ METRICS_SUPPORTED.set(supported);
+ }
+
+ /**
+ * Return the {@link MetricsContainer} for the current thread.
+ *
+ * <p>May return null if metrics are not supported by the current runner or if the current thread
+ * is not a work-execution thread. The first time this happens in a given thread it will log a
+ * diagnostic message.
+ */
+ @Nullable
+ public static MetricsContainer getCurrentContainer() {
+ MetricsContainer container = CONTAINER_FOR_THREAD.get();
+ if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, true)) {
+ if (METRICS_SUPPORTED.get()) {
+ LOGGER.error(
+ "Unable to update metrics on the current thread. "
+ + "Most likely caused by using metrics outside the managed work-execution thread.");
+ } else {
+ LOGGER.warn("Reporting metrics are not supported in the current execution environment.");
+ }
+ }
+ return container;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java
new file mode 100644
index 0000000..ec81251
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Simple POJO representing a filter for querying metrics.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricsFilter {
+
+ public Set<String> steps() {
+ return immutableSteps();
+ }
+
+ public Set<MetricNameFilter> names() {
+ return immutableNames();
+ }
+
+ protected abstract ImmutableSet<String> immutableSteps();
+ protected abstract ImmutableSet<MetricNameFilter> immutableNames();
+
+ public static Builder builder() {
+ return new AutoValue_MetricsFilter.Builder();
+ }
+
+ /**
+ * Builder for creating a {@link MetricsFilter}.
+ */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ protected abstract ImmutableSet.Builder<MetricNameFilter> immutableNamesBuilder();
+ protected abstract ImmutableSet.Builder<String> immutableStepsBuilder();
+
+ /**
+ * Add a {@link MetricNameFilter}.
+ *
+ * <p>If no name filters are specified then all metric names will be inculded.
+ *
+ *
+ * <p>If one or more name filters are specified, then only metrics that match one or more of the
+ * filters will be included.
+ */
+ public Builder addNameFilter(MetricNameFilter nameFilter) {
+ immutableNamesBuilder().add(nameFilter);
+ return this;
+ }
+
+ /**
+ * Add a step filter.
+ *
+ * <p>If no steps are specified then metrics will be included for all steps.
+ *
+ * <p>If one or more steps are specified, then metrics will be included if they are part of
+ * any of the specified steps.
+ */
+ public Builder addStep(String step) {
+ immutableStepsBuilder().add(step);
+ return this;
+ }
+
+ public abstract MetricsFilter build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
new file mode 100644
index 0000000..5a02106
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A map from {@code K} to {@code T} that supports getting or creating values associated with a key
+ * in a thread-safe manner.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsMap<K, T> {
+
+ /** Interface for creating instances to populate the {@link MetricsMap}. */
+ public interface Factory<K, T> {
+ /**
+ * Create an instance of {@code T} to use with the given {@code key}.
+ *
+ * <p>It must be safe to call this from multiple threads.
+ */
+ T createInstance(K key);
+ }
+
+ private final Factory<K, T> factory;
+ private final ConcurrentMap<K, T> metrics = new ConcurrentHashMap<>();
+
+ public MetricsMap(Factory<K, T> factory) {
+ this.factory = factory;
+ }
+
+ /**
+ * Get or create the value associated with the given key.
+ */
+ public T get(K key) {
+ T metric = metrics.get(key);
+ if (metric == null) {
+ metric = factory.createInstance(key);
+ metric = MoreObjects.firstNonNull(metrics.putIfAbsent(key, metric), metric);
+ }
+ return metric;
+ }
+
+ /**
+ * Get the value associated with the given key, if it exists.
+ */
+ @Nullable
+ public T tryGet(K key) {
+ return metrics.get(key);
+ }
+
+ /**
+ * Return an iterable over the entries in the current {@link MetricsMap}.
+ */
+ public Iterable<Map.Entry<K, T>> entries() {
+ return Iterables.unmodifiableIterable(metrics.entrySet());
+ }
+
+ /**
+ * Return an iterable over the values in the current {@link MetricsMap}.
+ */
+ public Iterable<T> values() {
+ return Iterables.unmodifiableIterable(metrics.values());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java
new file mode 100644
index 0000000..f71dc7a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Metrics allow exporting information about the execution of a pipeline.
+ * They are intended to be used for monitoring and understanding the
+ * execution.
+ *
+ * <p>Metrics may also be queried from the {@link org.apache.beam.sdk.PipelineResult} object.
+ *
+ * <p>Runners should look at {@link org.apache.beam.sdk.metrics.MetricsContainer} for details on
+ * how to support metrics.
+ */
+package org.apache.beam.sdk.metrics;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
new file mode 100644
index 0000000..408f145
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link CounterCell}.
+ */
+@RunWith(JUnit4.class)
+public class CounterCellTest {
+
+ private CounterCell cell = new CounterCell();
+
+ @Test
+ public void testDeltaAndCumulative() {
+ cell.inc(5);
+ cell.inc(7);
+ assertThat(cell.getCumulative(), equalTo(12L));
+ assertThat("getCumulative is idempotent", cell.getCumulative(), equalTo(12L));
+
+ assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+ cell.getDirty().afterCommit();
+ assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+ assertThat(cell.getCumulative(), equalTo(12L));
+
+ cell.inc(30);
+ assertThat(cell.getCumulative(), equalTo(42L));
+
+ assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+ cell.getDirty().afterCommit();
+ assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
new file mode 100644
index 0000000..d00f8cd
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DirtyStateTest}.
+ */
+@RunWith(JUnit4.class)
+public class DirtyStateTest {
+
+ private final DirtyState dirty = new DirtyState();
+
+ @Test
+ public void basicPath() {
+ assertThat("Should start dirty", dirty.beforeCommit(), is(true));
+ dirty.afterCommit();
+ assertThat("Should be clean after commit", dirty.beforeCommit(), is(false));
+
+ dirty.afterModification();
+ assertThat("Should be dirty after change", dirty.beforeCommit(), is(true));
+ dirty.afterCommit();
+ assertThat("Should be clean after commit", dirty.beforeCommit(), is(false));
+ }
+
+ @Test
+ public void changeAfterBeforeCommit() {
+ assertThat("Should start dirty", dirty.beforeCommit(), is(true));
+ dirty.afterModification();
+ dirty.afterCommit();
+ assertThat("Changes after beforeCommit should be dirty after afterCommit",
+ dirty.beforeCommit(), is(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
new file mode 100644
index 0000000..07e0b26
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DistributionCell}.
+ */
+@RunWith(JUnit4.class)
+public class DistributionCellTest {
+ private DistributionCell cell = new DistributionCell();
+
+ @Test
+ public void testDeltaAndCumulative() {
+ cell.update(5);
+ cell.update(7);
+ assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7)));
+ assertThat("getCumulative is idempotent",
+ cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7)));
+
+ assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+ cell.getDirty().afterCommit();
+ assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+ cell.update(30);
+ assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 30)));
+
+ assertThat("Adding a new value made the cell dirty",
+ cell.getDirty().beforeCommit(), equalTo(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
new file mode 100644
index 0000000..bdcb94f
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matchers for metrics.
+ */
+public class MetricMatchers {
+
+ public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, final T update) {
+ return new TypeSafeMatcher<MetricUpdate<T>>() {
+ @Override
+ protected boolean matchesSafely(MetricUpdate<T> item) {
+ return Objects.equals(name, item.getKey().metricName().name())
+ && Objects.equals(update, item.getUpdate());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("MetricUpdate{name=").appendValue(name)
+ .appendText(", update=").appendValue(update)
+ .appendText("}");
+ }
+ };
+ }
+
+ public static <T> Matcher<MetricUpdate<T>> metricUpdate(
+ final String namespace, final String name, final String step, final T update) {
+ return new TypeSafeMatcher<MetricUpdate<T>>() {
+ @Override
+ protected boolean matchesSafely(MetricUpdate<T> item) {
+ return Objects.equals(namespace, item.getKey().metricName().namespace())
+ && Objects.equals(name, item.getKey().metricName().name())
+ && Objects.equals(step, item.getKey().stepName())
+ && Objects.equals(update, item.getUpdate());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("MetricUpdate{inNamespace=").appendValue(namespace)
+ .appendText(", name=").appendValue(name)
+ .appendText(", step=").appendValue(step)
+ .appendText(", update=").appendValue(update)
+ .appendText("}");
+ }
+ };
+ }
+
+ public static <T> Matcher<MetricResult<T>> metricResult(
+ final String namespace, final String name, final String step,
+ final T logical, final T physical) {
+ return new TypeSafeMatcher<MetricResult<T>>() {
+ @Override
+ protected boolean matchesSafely(MetricResult<T> item) {
+ return Objects.equals(namespace, item.name().namespace())
+ && Objects.equals(name, item.name().name())
+ && Objects.equals(step, item.step())
+ && Objects.equals(logical, item.committed())
+ && Objects.equals(physical, item.attempted());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("MetricResult{inNamespace=").appendValue(namespace)
+ .appendText(", name=").appendValue(name)
+ .appendText(", step=").appendValue(step)
+ .appendText(", logical=").appendValue(logical)
+ .appendText(", physical=").appendValue(physical)
+ .appendText("}");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
new file mode 100644
index 0000000..58797ce
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricsContainer}.
+ */
+@RunWith(JUnit4.class)
+public class MetricsContainerTest {
+
+ @Test
+ public void testCounterDeltas() {
+ MetricsContainer container = new MetricsContainer("step1");
+ CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
+ CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
+ assertThat("All counters should start out dirty",
+ container.getUpdates().counterUpdates(), containsInAnyOrder(
+ metricUpdate("name1", 0L),
+ metricUpdate("name2", 0L)));
+ container.commitUpdates();
+ assertThat("After commit no counters should be dirty",
+ container.getUpdates().counterUpdates(), emptyIterable());
+
+ c1.inc(5L);
+ c2.inc(4L);
+
+ assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder(
+ metricUpdate("name1", 5L),
+ metricUpdate("name2", 4L)));
+
+ assertThat("Since we haven't committed, updates are still included",
+ container.getUpdates().counterUpdates(), containsInAnyOrder(
+ metricUpdate("name1", 5L),
+ metricUpdate("name2", 4L)));
+
+ container.commitUpdates();
+ assertThat("After commit there are no updates",
+ container.getUpdates().counterUpdates(), emptyIterable());
+
+ c1.inc(8L);
+ assertThat(container.getUpdates().counterUpdates(), contains(
+ metricUpdate("name1", 13L)));
+ }
+
+ @Test
+ public void testCounterCumulatives() {
+ MetricsContainer container = new MetricsContainer("step1");
+ CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
+ CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
+ c1.inc(2L);
+ c2.inc(4L);
+ c1.inc(3L);
+
+ container.getUpdates();
+ container.commitUpdates();
+ assertThat("Committing updates shouldn't affect cumulative counter values",
+ container.getCumulative().counterUpdates(), containsInAnyOrder(
+ metricUpdate("name1", 5L),
+ metricUpdate("name2", 4L)));
+
+ c1.inc(8L);
+ assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder(
+ metricUpdate("name1", 13L),
+ metricUpdate("name2", 4L)));
+ }
+
+ @Test
+ public void testDistributionDeltas() {
+ MetricsContainer container = new MetricsContainer("step1");
+ DistributionCell c1 = container.getDistribution(MetricName.named("ns", "name1"));
+ DistributionCell c2 = container.getDistribution(MetricName.named("ns", "name2"));
+
+ assertThat("Initial update includes initial zero-values",
+ container.getUpdates().distributionUpdates(), containsInAnyOrder(
+ metricUpdate("name1", DistributionData.EMPTY),
+ metricUpdate("name2", DistributionData.EMPTY)));
+
+ container.commitUpdates();
+ assertThat("No updates after commit",
+ container.getUpdates().distributionUpdates(), emptyIterable());
+
+ c1.update(5L);
+ c2.update(4L);
+
+ assertThat(container.getUpdates().distributionUpdates(), containsInAnyOrder(
+ metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
+ metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
+ assertThat("Updates stay the same without commit",
+ container.getUpdates().distributionUpdates(), containsInAnyOrder(
+ metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
+ metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
+
+ container.commitUpdates();
+ assertThat("No updatess after commit",
+ container.getUpdates().distributionUpdates(), emptyIterable());
+
+ c1.update(8L);
+ c1.update(4L);
+ assertThat(container.getUpdates().distributionUpdates(), contains(
+ metricUpdate("name1", DistributionData.create(17, 3, 4, 8))));
+ container.commitUpdates();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
new file mode 100644
index 0000000..4200a20
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricsEnvironment}.
+ */
+@RunWith(JUnit4.class)
+public class MetricsEnvironmentTest {
+ @After
+ public void teardown() {
+ MetricsEnvironment.unsetMetricsContainer();
+ }
+
+ @Test
+ public void testUsesAppropriateMetricsContainer() {
+ Counter counter = Metrics.counter("ns", "name");
+ MetricsContainer c1 = new MetricsContainer("step1");
+ MetricsContainer c2 = new MetricsContainer("step2");
+
+ MetricsEnvironment.setMetricsContainer(c1);
+ counter.inc();
+ MetricsEnvironment.setMetricsContainer(c2);
+ counter.dec();
+ MetricsEnvironment.unsetMetricsContainer();
+
+ MetricUpdates updates1 = c1.getUpdates();
+ MetricUpdates updates2 = c2.getUpdates();
+ assertThat(updates1.counterUpdates(), contains(metricUpdate("ns", "name", "step1", 1L)));
+ assertThat(updates2.counterUpdates(), contains(metricUpdate("ns", "name", "step2", -1L)));
+ }
+
+ @Test
+ public void testBehavesWithoutMetricsContainer() {
+ assertNull(MetricsEnvironment.getCurrentContainer());
+ }
+}