You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/10/13 22:39:34 UTC

[1/5] incubator-beam git commit: Implement Metrics in the DirectRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master e969f3d38 -> 3c731707b


Implement Metrics in the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/834933c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/834933c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/834933c5

Branch: refs/heads/master
Commit: 834933c520997b4f83cf8b04219c2c63dac61e61
Parents: 51fee39
Author: bchambers <bc...@google.com>
Authored: Wed Oct 12 10:55:53 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Oct 13 15:29:29 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectMetrics.java      | 331 +++++++++++++++++++
 .../beam/runners/direct/DirectRunner.java       |   8 +-
 .../beam/runners/direct/EvaluationContext.java  |  10 +
 .../direct/ExecutorServiceParallelExecutor.java |   1 +
 .../direct/ImmutableListBundleFactory.java      |  10 +
 .../runners/direct/StepTransformResult.java     |  49 ++-
 .../beam/runners/direct/TransformExecutor.java  |  35 +-
 .../beam/runners/direct/TransformResult.java    |  12 +
 .../beam/runners/direct/DirectMetricsTest.java  | 133 ++++++++
 .../beam/runners/direct/DirectRunnerTest.java   |  36 ++
 .../runners/direct/TransformExecutorTest.java   |  12 +
 11 files changed, 602 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
new file mode 100644
index 0000000..a749a76
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static java.util.Arrays.asList;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricUpdates;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.MetricsMap;
+
+/**
+ * Implementation of {@link MetricResults} for the Direct Runner.
+ */
+class DirectMetrics extends MetricResults {
+
+  // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
+  private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool();
+
+  private interface MetricAggregation<UpdateT, ResultT> {
+    UpdateT zero();
+    UpdateT combine(Iterable<UpdateT> updates);
+    ResultT extract(UpdateT data);
+  }
+
+  /**
+   * Implementation of a metric in the direct runner.
+   *
+   * @param <UpdateT> The type of raw data received and aggregated across updates.
+   * @param <ResultT> The type of result extracted from the data.
+   */
+  private static class DirectMetric<UpdateT, ResultT> {
+    private final MetricAggregation<UpdateT, ResultT> aggregation;
+
+    private final AtomicReference<UpdateT> finishedCommitted;
+
+    private final Object attemptedLock = new Object();
+    @GuardedBy("attemptedLock")
+    private volatile UpdateT finishedAttempted;
+    @GuardedBy("attemptedLock")
+    private final ConcurrentMap<CommittedBundle<?>, UpdateT> inflightAttempted =
+        new ConcurrentHashMap<>();
+
+    public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation) {
+      this.aggregation = aggregation;
+      finishedCommitted = new AtomicReference<>(aggregation.zero());
+      finishedAttempted = aggregation.zero();
+    }
+
+    /**
+     * Add the given {@code tentativeCumulative} update to the physical aggregate.
+     *
+     * @param bundle The bundle receiving an update.
+     * @param tentativeCumulative The new cumulative value for the given bundle.
+     */
+    public void updatePhysical(CommittedBundle<?> bundle, UpdateT tentativeCumulative) {
+      // Add (or update) the cumulatiev value for the given bundle.
+      inflightAttempted.put(bundle, tentativeCumulative);
+    }
+
+    /**
+     * Commit a physical value for the given {@code bundle}.
+     *
+     * @param bundle The bundle being committed.
+     * @param finalCumulative The final cumulative value for the given bundle.
+     */
+    public void commitPhysical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) {
+      // To prevent a query from blocking the commit, we perform the commit in two steps.
+      // 1. We perform a non-blocking write to the uncommitted table to make the new vaule
+      //    available immediately.
+      // 2. We submit a runnable that will commit the update and remove the tentative value in
+      //    a synchronized block.
+      inflightAttempted.put(bundle, finalCumulative);
+      COUNTER_COMMITTER.submit(new Runnable() {
+        @Override
+        public void run() {
+          synchronized (attemptedLock) {
+            finishedAttempted = aggregation.combine(asList(finishedAttempted, finalCumulative));
+            inflightAttempted.remove(bundle);
+          }
+        }
+      });
+    }
+
+    /** Extract the latest values from all attempted and in-progress bundles. */
+    public ResultT extractLatestAttempted() {
+      ArrayList<UpdateT> updates = new ArrayList<>(inflightAttempted.size() + 1);
+      // Within this block we know that will be consistent. Specifically, the only change that can
+      // happen concurrently is the addition of new (larger) values to inflightAttempted.
+      synchronized (attemptedLock) {
+        updates.add(finishedAttempted);
+        updates.addAll(inflightAttempted.values());
+      }
+      return aggregation.extract(aggregation.combine(updates));
+    }
+
+    /**
+     * Commit a logical value for the given {@code bundle}.
+     *
+     * @param bundle The bundle being committed.
+     * @param finalCumulative The final cumulative value for the given bundle.
+     */
+    public void commitLogical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) {
+      UpdateT current;
+      do {
+        current = finishedCommitted.get();
+      } while (!finishedCommitted.compareAndSet(current,
+          aggregation.combine(asList(current, finalCumulative))));
+    }
+
+    /** Extract the value from all successfully committed bundles. */
+    public ResultT extractCommitted() {
+      return aggregation.extract(finishedCommitted.get());
+    }
+  }
+
+  private static final MetricAggregation<Long, Long> COUNTER =
+      new MetricAggregation<Long, Long>() {
+    @Override
+    public Long zero() {
+      return 0L;
+    }
+
+    @Override
+    public Long combine(Iterable<Long> updates) {
+      long value = 0;
+      for (long update : updates) {
+        value += update;
+      }
+      return value;
+    }
+
+    @Override
+    public Long extract(Long data) {
+      return data;
+    }
+  };
+
+  private static final MetricAggregation<DistributionData, DistributionResult> DISTRIBUTION =
+      new MetricAggregation<DistributionData, DistributionResult>() {
+        @Override
+        public DistributionData zero() {
+          return DistributionData.EMPTY;
+        }
+
+        @Override
+        public DistributionData combine(Iterable<DistributionData> updates) {
+          DistributionData result = DistributionData.EMPTY;
+          for (DistributionData update : updates) {
+            result = result.combine(update);
+          }
+          return result;
+        }
+
+        @Override
+        public DistributionResult extract(DistributionData data) {
+          return data.extractResult();
+        }
+      };
+
+  /** The current values of counters in memory. */
+  private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters =
+      new MetricsMap<>(new MetricsMap.Factory<MetricKey, DirectMetric<Long, Long>>() {
+        @Override
+        public DirectMetric<Long, Long> createInstance(MetricKey unusedKey) {
+          return new DirectMetric<>(COUNTER);
+        }
+      });
+  private MetricsMap<MetricKey, DirectMetric<DistributionData, DistributionResult>> distributions =
+      new MetricsMap<>(
+          new MetricsMap.Factory<MetricKey, DirectMetric<DistributionData, DistributionResult>>() {
+        @Override
+        public DirectMetric<DistributionData, DistributionResult> createInstance(
+            MetricKey unusedKey) {
+          return new DirectMetric<>(DISTRIBUTION);
+        }
+      });
+
+  @AutoValue
+  abstract static class DirectMetricQueryResults implements MetricQueryResults {
+    public static MetricQueryResults create(
+        Iterable<MetricResult<Long>> counters,
+        Iterable<MetricResult<DistributionResult>> distributions) {
+      return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions);
+    }
+  }
+
+  @AutoValue
+  abstract static class DirectMetricResult<T> implements MetricResult<T> {
+    public static <T> MetricResult<T> create(MetricName name, String scope,
+        T committed, T attempted) {
+      return new AutoValue_DirectMetrics_DirectMetricResult<T>(
+          name, scope, committed, attempted);
+    }
+  }
+
+  @Override
+  public MetricQueryResults queryMetrics(MetricsFilter filter) {
+    ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
+    for (Entry<MetricKey, DirectMetric<Long, Long>> counter : counters.entries()) {
+      maybeExtractResult(filter, counterResults, counter);
+    }
+    ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
+        ImmutableList.builder();
+    for (Entry<MetricKey, DirectMetric<DistributionData, DistributionResult>> distribution
+        : distributions.entries()) {
+      maybeExtractResult(filter, distributionResults, distribution);
+    }
+
+    return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build());
+  }
+
+  private <ResultT> void maybeExtractResult(
+      MetricsFilter filter,
+      ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder,
+      Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) {
+    if (matches(filter, entry.getKey())) {
+      resultsBuilder.add(DirectMetricResult.create(
+          entry.getKey().metricName(),
+          entry.getKey().stepName(),
+          entry.getValue().extractCommitted(),
+          entry.getValue().extractLatestAttempted()));
+    }
+  }
+
+  // Matching logic is implemented here rather than in MetricsFilter because we would like
+  // MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with
+  // a Proto/JSON/etc. schema object.
+  private boolean matches(MetricsFilter filter, MetricKey key) {
+    return matchesName(key.metricName(), filter.names())
+        && matchesScope(key.stepName(), filter.steps());
+  }
+
+  private boolean matchesScope(String actualScope, Set<String> scopes) {
+    if (scopes.isEmpty() || scopes.contains(actualScope)) {
+      return true;
+    }
+
+    for (String scope : scopes) {
+      if (actualScope.startsWith(scope)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) {
+    if (nameFilters.isEmpty()) {
+      return true;
+    }
+
+    for (MetricNameFilter nameFilter : nameFilters) {
+      if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name()))
+          && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /** Apply metric updates that represent physical counter deltas to the current metric values. */
+  public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
+    for (MetricUpdate<Long> counter : updates.counterUpdates()) {
+      counters.get(counter.getKey()).updatePhysical(bundle, counter.getUpdate());
+    }
+    for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) {
+      distributions.get(distribution.getKey())
+          .updatePhysical(bundle, distribution.getUpdate());
+    }
+  }
+
+  public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
+    for (MetricUpdate<Long> counter : updates.counterUpdates()) {
+      counters.get(counter.getKey()).commitPhysical(bundle, counter.getUpdate());
+    }
+    for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) {
+      distributions.get(distribution.getKey())
+          .commitPhysical(bundle, distribution.getUpdate());
+    }
+  }
+
+  /** Apply metric updates that represent new logical values from a bundle being committed. */
+  public void commitLogical(CommittedBundle<?> bundle, MetricUpdates updates) {
+    for (MetricUpdate<Long> counter : updates.counterUpdates()) {
+      counters.get(counter.getKey()).commitLogical(bundle, counter.getUpdate());
+    }
+    for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) {
+      distributions.get(distribution.getKey())
+          .commitLogical(bundle, distribution.getUpdate());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index e13046d..8941093 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
@@ -226,6 +227,7 @@ public class DirectRunner
 
   @Override
   public DirectPipelineResult run(Pipeline pipeline) {
+    MetricsEnvironment.setMetricsSupported(true);
     ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
     pipeline.traverseTopologically(consumerTrackingVisitor);
     for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
@@ -268,8 +270,7 @@ public class DirectRunner
 
     Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
         pipeline.getAggregatorSteps();
-    DirectPipelineResult result =
-        new DirectPipelineResult(executor, context, aggregatorSteps);
+    DirectPipelineResult result = new DirectPipelineResult(executor, context, aggregatorSteps);
     if (options.isBlockOnRun()) {
       try {
         result.awaitCompletion();
@@ -383,8 +384,7 @@ public class DirectRunner
 
     @Override
     public MetricResults metrics() {
-      throw new UnsupportedOperationException(
-          "The DirectRunner does not currently support metrics.");
+      return evaluationContext.getMetrics();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 2901254..e5a30d4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -96,6 +96,8 @@ class EvaluationContext {
 
   private final AggregatorContainer mergedAggregators;
 
+  private final DirectMetrics metrics;
+
   public static EvaluationContext create(
       DirectOptions options,
       Clock clock,
@@ -130,6 +132,7 @@ class EvaluationContext {
 
     this.applicationStateInternals = new ConcurrentHashMap<>();
     this.mergedAggregators = AggregatorContainer.create();
+    this.metrics = new DirectMetrics();
 
     this.callbackExecutor =
         WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
@@ -161,6 +164,8 @@ class EvaluationContext {
       TransformResult result) {
     Iterable<? extends CommittedBundle<?>> committedBundles =
         commitBundles(result.getOutputBundles());
+    metrics.commitLogical(completedBundle, result.getLogicalMetricUpdates());
+
     // Update watermarks and timers
     EnumSet<OutputType> outputTypes = EnumSet.copyOf(result.getOutputTypes());
     if (Iterables.isEmpty(committedBundles)) {
@@ -367,6 +372,11 @@ class EvaluationContext {
     return mergedAggregators;
   }
 
+  /** Returns the metrics container for this pipeline. */
+  public DirectMetrics getMetrics() {
+    return metrics;
+  }
+
   @VisibleForTesting
   void forceRefresh() {
     watermarkManager.refreshAll();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index fab6a33..3761574 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -212,6 +212,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
 
     TransformExecutor<T> callable =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             enforcements,
             bundle,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index 4972340..db92542 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -123,5 +123,15 @@ class ImmutableListBundleFactory implements BundleFactory {
           ImmutableList.copyOf(elements),
           getSynchronizedProcessingOutputWatermark());
     }
+
+    @Override
+    public int hashCode() {
+      return System.identityHashCode(this);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return this == obj;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 1829e4a..989109f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -22,10 +22,10 @@ import com.google.common.collect.ImmutableList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -37,31 +37,6 @@ import org.joda.time.Instant;
  */
 @AutoValue
 public abstract class StepTransformResult implements TransformResult {
-  @Override
-  public abstract AppliedPTransform<?, ?, ?> getTransform();
-
-  @Override
-  public abstract Iterable<? extends UncommittedBundle<?>> getOutputBundles();
-
-  @Override
-  public abstract Iterable<? extends WindowedValue<?>> getUnprocessedElements();
-
-  @Override
-  @Nullable
-  public abstract AggregatorContainer.Mutator getAggregatorChanges();
-
-  @Override
-  public abstract Instant getWatermarkHold();
-
-  @Nullable
-  @Override
-  public abstract CopyOnAccessInMemoryStateInternals<?> getState();
-
-  @Override
-  public abstract TimerUpdate getTimerUpdate();
-
-  @Override
-  public abstract Set<OutputType> getOutputTypes();
 
   public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
     return new Builder(transform, watermarkHold);
@@ -71,6 +46,20 @@ public abstract class StepTransformResult implements TransformResult {
     return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
 
+  @Override
+  public TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates) {
+    return new AutoValue_StepTransformResult(
+        getTransform(),
+        getOutputBundles(),
+        getUnprocessedElements(),
+        getAggregatorChanges(),
+        metricUpdates,
+        getWatermarkHold(),
+        getState(),
+        getTimerUpdate(),
+        getOutputTypes());
+  }
+
   /**
    * A builder for creating instances of {@link StepTransformResult}.
    */
@@ -78,6 +67,7 @@ public abstract class StepTransformResult implements TransformResult {
     private final AppliedPTransform<?, ?, ?> transform;
     private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
     private final ImmutableList.Builder<WindowedValue<?>> unprocessedElementsBuilder;
+    private MetricUpdates metricUpdates;
     private CopyOnAccessInMemoryStateInternals<?> state;
     private TimerUpdate timerUpdate;
     private AggregatorContainer.Mutator aggregatorChanges;
@@ -91,6 +81,7 @@ public abstract class StepTransformResult implements TransformResult {
       this.producedOutputs = EnumSet.noneOf(OutputType.class);
       this.unprocessedElementsBuilder = ImmutableList.builder();
       this.timerUpdate = TimerUpdate.builder(null).build();
+      this.metricUpdates = MetricUpdates.EMPTY;
     }
 
     public StepTransformResult build() {
@@ -99,6 +90,7 @@ public abstract class StepTransformResult implements TransformResult {
           bundlesBuilder.build(),
           unprocessedElementsBuilder.build(),
           aggregatorChanges,
+          metricUpdates,
           watermarkHold,
           state,
           timerUpdate,
@@ -110,6 +102,11 @@ public abstract class StepTransformResult implements TransformResult {
       return this;
     }
 
+    public Builder withMetricUpdates(MetricUpdates metricUpdates) {
+      this.metricUpdates = metricUpdates;
+      return this;
+    }
+
     public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) {
       this.state = state;
       return this;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index aaee9a5..03f615b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -25,6 +25,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.metrics.MetricUpdates;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 
@@ -38,6 +41,7 @@ import org.apache.beam.sdk.util.WindowedValue;
  */
 class TransformExecutor<T> implements Runnable {
   public static <T> TransformExecutor<T> create(
+      EvaluationContext context,
       TransformEvaluatorFactory factory,
       Iterable<? extends ModelEnforcementFactory> modelEnforcements,
       CommittedBundle<T> inputBundle,
@@ -45,6 +49,7 @@ class TransformExecutor<T> implements Runnable {
       CompletionCallback completionCallback,
       TransformExecutorService transformEvaluationState) {
     return new TransformExecutor<>(
+        context,
         factory,
         modelEnforcements,
         inputBundle,
@@ -63,10 +68,12 @@ class TransformExecutor<T> implements Runnable {
 
   private final CompletionCallback onComplete;
   private final TransformExecutorService transformEvaluationState;
+  private final EvaluationContext context;
 
   private final AtomicReference<Thread> thread;
 
   private TransformExecutor(
+      EvaluationContext context,
       TransformEvaluatorFactory factory,
       Iterable<? extends ModelEnforcementFactory> modelEnforcements,
       CommittedBundle<T> inputBundle,
@@ -82,11 +89,14 @@ class TransformExecutor<T> implements Runnable {
     this.onComplete = completionCallback;
 
     this.transformEvaluationState = transformEvaluationState;
+    this.context = context;
     this.thread = new AtomicReference<>();
   }
 
   @Override
   public void run() {
+    MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName());
+    MetricsEnvironment.setMetricsContainer(metricsContainer);
     checkState(
         thread.compareAndSet(null, Thread.currentThread()),
         "Tried to execute %s for %s on thread %s, but is already executing on thread %s",
@@ -108,9 +118,9 @@ class TransformExecutor<T> implements Runnable {
         return;
       }
 
-      processElements(evaluator, enforcements);
+      processElements(evaluator, metricsContainer, enforcements);
 
-      finishBundle(evaluator, enforcements);
+      finishBundle(evaluator, metricsContainer, enforcements);
     } catch (Throwable t) {
       onComplete.handleThrowable(inputBundle, t);
       if (t instanceof RuntimeException) {
@@ -118,6 +128,10 @@ class TransformExecutor<T> implements Runnable {
       }
       throw new RuntimeException(t);
     } finally {
+      // Report the physical metrics from the end of this step.
+      context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative());
+
+      MetricsEnvironment.unsetMetricsContainer();
       transformEvaluationState.complete(this);
     }
   }
@@ -127,7 +141,9 @@ class TransformExecutor<T> implements Runnable {
    * necessary {@link ModelEnforcement ModelEnforcements}.
    */
   private void processElements(
-      TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
+      TransformEvaluator<T> evaluator,
+      MetricsContainer metricsContainer,
+      Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
     if (inputBundle != null) {
       for (WindowedValue<T> value : inputBundle.getElements()) {
@@ -137,6 +153,13 @@ class TransformExecutor<T> implements Runnable {
 
         evaluator.processElement(value);
 
+        // Report the physical metrics after each element
+        MetricUpdates deltas = metricsContainer.getUpdates();
+        if (deltas != null) {
+          context.getMetrics().updatePhysical(inputBundle, deltas);
+          metricsContainer.commitUpdates();
+        }
+
         for (ModelEnforcement<T> enforcement : enforcements) {
           enforcement.afterElement(value);
         }
@@ -152,9 +175,11 @@ class TransformExecutor<T> implements Runnable {
    *         {@link TransformEvaluator#finishBundle()}
    */
   private TransformResult finishBundle(
-      TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
+      TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
+      Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
-    TransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle()
+        .withLogicalMetricUpdates(metricsContainer.getCumulative());
     CommittedResult outputs = onComplete.handleResult(inputBundle, result);
     for (ModelEnforcement<T> enforcement : enforcements) {
       enforcement.afterFinish(inputBundle, result, outputs.getOutputs());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index ba2d48e..ac1e395 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -22,6 +22,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -57,6 +58,11 @@ public interface TransformResult {
   @Nullable AggregatorContainer.Mutator getAggregatorChanges();
 
   /**
+   * Returns the logical metric updates.
+   */
+  MetricUpdates getLogicalMetricUpdates();
+
+  /**
    * Returns the Watermark Hold for the transform at the time this result was produced.
    *
    * <p>If the transform does not set any watermark hold, returns
@@ -86,4 +92,10 @@ public interface TransformResult {
    * {@link OutputType#BUNDLE}, as empty bundles may be dropped when the transform is committed.
    */
   Set<OutputType> getOutputTypes();
+
+  /**
+   * Returns a new TransformResult based on this one but overwriting any existing logical metric
+   * updates with {@code metricUpdates}.
+   */
+  TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
new file mode 100644
index 0000000..df01244
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult;
+import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricUpdates;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link DirectMetrics}.
+ */
+@RunWith(JUnit4.class)
+public class DirectMetricsTest {
+
+  @Mock
+  private CommittedBundle<Object> bundle1;
+  @Mock
+  private CommittedBundle<Object> bundle2;
+
+  private static final MetricName NAME1 = MetricName.named("ns1", "name1");
+  private static final MetricName NAME2 = MetricName.named("ns1", "name2");
+  private static final MetricName NAME3 = MetricName.named("ns2", "name1");
+
+  private DirectMetrics metrics = new DirectMetrics();
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testApplyLogicalQueryNoFilter() {
+    metrics.commitLogical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
+            MetricUpdate.create(MetricKey.create("step1", NAME2), 8L)),
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME1),
+                DistributionData.create(8, 2, 3, 5)))));
+    metrics.commitLogical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
+            MetricUpdate.create(MetricKey.create("step1", NAME2), 4L)),
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME1),
+                DistributionData.create(4, 1, 4, 4)))));
+
+    MetricQueryResults results = metrics.queryMetrics(MetricsFilter.builder().build());
+    assertThat(results.counters(), containsInAnyOrder(
+        metricResult("ns1", "name1", "step1", 5L, 0L),
+        metricResult("ns1", "name2", "step1", 12L, 0L),
+        metricResult("ns1", "name1", "step2", 7L, 0L)));
+    assertThat(results.distributions(), contains(
+        metricResult("ns1", "name1", "step1",
+            DistributionResult.create(12, 3, 3, 5),
+            DistributionResult.ZERO)));
+  }
+
+  @Test
+  public void testApplyPhysicalCountersQueryOneNamespace() {
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
+            MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
+            MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+
+    assertThat(metrics.queryMetrics(
+        MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build()).counters(),
+        containsInAnyOrder(
+            metricResult("ns1", "name1", "step1", 0L, 5L),
+            metricResult("ns1", "name1", "step2", 0L, 7L)));
+  }
+
+  @Test
+  public void testApplyPhysicalQueryCompositeScope() {
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
+            MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
+            MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+
+    assertThat(metrics.queryMetrics(
+        MetricsFilter.builder().addStep("Outer1").build()).counters(),
+        containsInAnyOrder(
+            metricResult("ns1", "name1", "Outer1/Inner1", 0L, 12L),
+            metricResult("ns1", "name1", "Outer1/Inner2", 0L, 8L)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 4768fb0..d93dd7a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
@@ -35,12 +37,20 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -444,4 +454,30 @@ public class DirectRunnerTest implements Serializable {
       throw new CoderException("Cannot decode a long");
     }
   }
+
+  public void testMetrics() throws Exception {
+    Pipeline pipeline = getPipeline();
+    pipeline
+        .apply(Create.of(5, 8, 13))
+        .apply("MyStep", ParDo.of(new DoFn<Integer, Void>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            Counter count = Metrics.counter(DirectRunnerTest.class, "count");
+            Distribution values = Metrics.distribution(DirectRunnerTest.class, "input");
+
+            count.inc();
+            values.update(c.element());
+          }
+        }));
+    PipelineResult result = pipeline.run();
+    MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder()
+        .addNameFilter(MetricNameFilter.inNamespace(DirectRunnerTest.class))
+        .build());
+    assertThat(metrics.counters(), contains(
+        metricResult(DirectRunnerTest.class.getName(), "count", "MyStep", 3L, 3L)));
+    assertThat(metrics.distributions(), contains(
+        metricResult(DirectRunnerTest.class.getName(), "input", "MyStep",
+            DistributionResult.create(26L, 3L, 5L, 13L),
+            DistributionResult.create(26L, 3L, 5L, 13L))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index c63e9bd..5015e5a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -72,6 +72,7 @@ public class TransformExecutorTest {
   private RegisteringCompletionCallback completionCallback;
   private TransformExecutorService transformEvaluationState;
   private BundleFactory bundleFactory;
+  @Mock private DirectMetrics metrics;
   @Mock private EvaluationContext evaluationContext;
   @Mock private TransformEvaluatorRegistry registry;
 
@@ -90,6 +91,8 @@ public class TransformExecutorTest {
     TestPipeline p = TestPipeline.create();
     created = p.apply(Create.of("foo", "spam", "third"));
     downstream = created.apply(WithKeys.<Integer, String>of(3));
+
+    when(evaluationContext.getMetrics()).thenReturn(metrics);
   }
 
   @Test
@@ -116,6 +119,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<Object> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             null,
@@ -135,6 +139,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<Object> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             null,
@@ -177,6 +182,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             inputBundle,
@@ -219,6 +225,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             inputBundle,
@@ -254,6 +261,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             inputBundle,
@@ -294,6 +302,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             null,
@@ -335,6 +344,7 @@ public class TransformExecutorTest {
     TestEnforcementFactory enforcement = new TestEnforcementFactory();
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>singleton(enforcement),
             inputBundle,
@@ -392,6 +402,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<byte[]> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
             inputBundle,
@@ -448,6 +459,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<byte[]> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
             inputBundle,



[4/5] incubator-beam git commit: Initial Metrics API for Beam Java

Posted by bc...@apache.org.
Initial Metrics API for Beam Java

This includes a simple Counter metric and a Distribution metric that
reports the SUM, COUNT, MIN, MAX and MEAN of the reported values.

The API is labeled @Experimental since metrics will only be reported
and queryable with the DirectRunner, and the API may change as it is
implemented on other runners.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8524ed95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8524ed95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8524ed95

Branch: refs/heads/master
Commit: 8524ed9545f5af4bdeb54601f333549b34eb35aa
Parents: e969f3d
Author: bchambers <bc...@google.com>
Authored: Wed Oct 12 10:29:50 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Oct 13 15:29:29 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/annotations/Experimental.java      |   3 +
 .../org/apache/beam/sdk/metrics/Counter.java    |  40 +++++
 .../apache/beam/sdk/metrics/CounterCell.java    |  76 ++++++++++
 .../org/apache/beam/sdk/metrics/DirtyState.java |  98 ++++++++++++
 .../apache/beam/sdk/metrics/Distribution.java   |  30 ++++
 .../beam/sdk/metrics/DistributionCell.java      |  58 +++++++
 .../beam/sdk/metrics/DistributionData.java      |  59 ++++++++
 .../beam/sdk/metrics/DistributionResult.java    |  42 ++++++
 .../org/apache/beam/sdk/metrics/Metric.java     |  24 +++
 .../org/apache/beam/sdk/metrics/MetricCell.java |  47 ++++++
 .../org/apache/beam/sdk/metrics/MetricKey.java  |  40 +++++
 .../org/apache/beam/sdk/metrics/MetricName.java |  46 ++++++
 .../beam/sdk/metrics/MetricNameFilter.java      |  60 ++++++++
 .../beam/sdk/metrics/MetricQueryResults.java    |  33 ++++
 .../apache/beam/sdk/metrics/MetricResult.java   |  45 ++++++
 .../apache/beam/sdk/metrics/MetricResults.java  |  34 +++++
 .../apache/beam/sdk/metrics/MetricUpdates.java  |  72 +++++++++
 .../org/apache/beam/sdk/metrics/Metrics.java    | 110 ++++++++++++++
 .../beam/sdk/metrics/MetricsContainer.java      | 150 +++++++++++++++++++
 .../beam/sdk/metrics/MetricsEnvironment.java    |  85 +++++++++++
 .../apache/beam/sdk/metrics/MetricsFilter.java  |  86 +++++++++++
 .../org/apache/beam/sdk/metrics/MetricsMap.java |  86 +++++++++++
 .../apache/beam/sdk/metrics/package-info.java   |  28 ++++
 .../beam/sdk/metrics/CounterCellTest.java       |  55 +++++++
 .../apache/beam/sdk/metrics/DirtyStateTest.java |  56 +++++++
 .../beam/sdk/metrics/DistributionCellTest.java  |  53 +++++++
 .../apache/beam/sdk/metrics/MetricMatchers.java |  99 ++++++++++++
 .../beam/sdk/metrics/MetricsContainerTest.java  | 129 ++++++++++++++++
 .../sdk/metrics/MetricsEnvironmentTest.java     |  63 ++++++++
 .../apache/beam/sdk/metrics/MetricsMapTest.java | 103 +++++++++++++
 .../apache/beam/sdk/metrics/MetricsTest.java    |  98 ++++++++++++
 31 files changed, 2008 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 789f4b2..14d2358 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -83,5 +83,8 @@ public @interface Experimental {
      * Do not use: API is unstable and runner support is incomplete.
      */
     SPLITTABLE_DO_FN,
+
+    /** Metrics-related experimental APIs. */
+    METRICS
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java
new file mode 100644
index 0000000..9f48016
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A metric that reports a single long value and can be incremented or decremented.
+ */
+@Experimental(Kind.METRICS)
+public interface Counter extends Metric {
+
+  /** Increment the counter. */
+  void inc();
+
+  /** Increment the counter by the given amount. */
+  void inc(long n);
+
+  /* Decrement the counter. */
+  void dec();
+
+  /* Decrement the counter by the given amount. */
+  void dec(long n);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
new file mode 100644
index 0000000..bb65833
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Tracks the current value (and delta) for a Counter metric for a specific context and bundle.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is within a runner where
+ * a counter is being reported for a specific step (rather than the counter in the current context).
+ */
+@Experimental(Kind.METRICS)
+class CounterCell implements MetricCell<Counter, Long>, Counter {
+
+  private final DirtyState dirty = new DirtyState();
+  private final AtomicLong value = new AtomicLong();
+
+  /** Increment the counter by the given amount. */
+  private void add(long n) {
+    value.addAndGet(n);
+    dirty.afterModification();
+  }
+
+  @Override
+  public DirtyState getDirty() {
+    return dirty;
+  }
+
+  @Override
+  public Long getCumulative() {
+    return value.get();
+  }
+
+  @Override
+  public Counter getInterface() {
+    return this;
+  }
+
+  @Override
+  public void inc() {
+    add(1);
+  }
+
+  @Override
+  public void inc(long n) {
+    add(n);
+  }
+
+  @Override
+  public void dec() {
+    add(-1);
+  }
+
+  @Override
+  public void dec(long n) {
+    add(-n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
new file mode 100644
index 0000000..6706be8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Atomically tracks the dirty-state of a metric.
+ *
+ * <p>Reporting an update is split into two parts such that only changes made before the call to
+ * {@link #beforeCommit()} are committed when {@link #afterCommit()} is invoked. This allows for
+ * a two-step commit process of gathering all the dirty updates (calling {#link beforeCommit()})
+ * followed by committing and calling {#link afterCommit()}.
+ *
+ * <p>The tracking of dirty states is done conservatively -- sometimes {@link #beforeCommit()}
+ * will return true (indicating a dirty metric) even if there have been no changes since the last
+ * commit.
+ *
+ * <p>There is also a possible race when the underlying metric is modified but the call to
+ * {@link #afterModification()} hasn't happened before the call to {@link #beforeCommit()}. In this
+ * case the next round of metric updating will see the changes. If this was for the final commit,
+ * then the metric updates shouldn't be extracted until all possible user modifications have
+ * completed.
+ */
+@Experimental(Kind.METRICS)
+class DirtyState {
+  private enum State {
+    /** Indicates that there have been changes to the MetricCell since last commit. */
+    DIRTY,
+    /** Indicates that there have been no changes to the MetricCell since last commit. */
+    CLEAN,
+    /** Indicates that a commit of the current value is in progress. */
+    COMMITTING
+  }
+
+  private final AtomicReference<State> dirty = new AtomicReference<>(State.DIRTY);
+
+  /**
+   * Indicate that changes have been made to the metric being tracked by this {@link DirtyState}.
+   *
+   * <p>Should be called <b>after</b> modification of the value.
+   */
+  public void afterModification() {
+    dirty.set(State.DIRTY);
+  }
+
+  /**
+   * Check the dirty state and mark the metric as committing.
+   *
+   * <p>If the state was {@code CLEAN}, this returns {@code false}. If the state was {@code DIRTY}
+   * or {@code COMMITTING} this returns {@code true} and sets the state to {@code COMMITTING}.
+   *
+   * @return {@code false} if the state is clean and {@code true} otherwise.
+   */
+  public boolean beforeCommit() {
+    // After this loop, we want the state to be either CLEAN or COMMITTING.
+    // If the state was CLEAN, we don't need to do anything (and exit the loop early)
+    // If the state was DIRTY, we will attempt to do a CAS(DIRTY, COMMITTING). This will only
+    // fail if another thread is getting updates which generally shouldn't be the case.
+    // If the state was COMMITTING, we will attempt to do a CAS(COMMITTING, COMMITTING). This will
+    // fail if another thread commits updates (which shouldn't be the case) or if the user code
+    // updates the metric, in which case it will transition to DIRTY and the next iteration will
+    // successfully update it.
+    State state;
+    do {
+      state = dirty.get();
+    } while (state != State.CLEAN && !dirty.compareAndSet(state, State.COMMITTING));
+
+    return state != State.CLEAN;
+  }
+
+  /**
+   * Mark any changes up to the most recently call to {@link #beforeCommit()}} as committed.
+   * The next call to {@link #beforeCommit()} will return {@code false} unless there have
+   * been changes made since the previous call to {@link #beforeCommit()}.
+   */
+  public void afterCommit() {
+    dirty.compareAndSet(State.COMMITTING, State.CLEAN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
new file mode 100644
index 0000000..b789020
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A metric that reports information about the distribution of reported values.
+ */
+@Experimental(Kind.METRICS)
+public interface Distribution extends Metric {
+  /** Add an observation to this distribution. */
+  void update(long value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
new file mode 100644
index 0000000..f0074a9
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Tracks the current value (and delta) for a Distribution metric.
+ */
+@Experimental(Kind.METRICS)
+class DistributionCell implements MetricCell<Distribution, DistributionData>, Distribution {
+
+  private final DirtyState dirty = new DirtyState();
+  private final AtomicReference<DistributionData> value =
+      new AtomicReference<DistributionData>(DistributionData.EMPTY);
+
+  /** Increment the counter by the given amount. */
+  @Override
+  public void update(long n) {
+    DistributionData original;
+    do {
+      original = value.get();
+    } while (!value.compareAndSet(original, original.combine(DistributionData.singleton(n))));
+    dirty.afterModification();
+  }
+
+  @Override
+  public DirtyState getDirty() {
+    return dirty;
+  }
+
+  @Override
+  public DistributionData getCumulative() {
+    return value.get();
+  }
+
+  @Override
+  public Distribution getInterface() {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
new file mode 100644
index 0000000..59c7fbd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * Data describing the the distribution. This should retain enough detail that it can be combined
+ * with other {@link DistributionData}.
+ *
+ * <p>This is kept distinct from {@link DistributionResult} since this may be extended to include
+ * data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include
+ * the approximate value of those quantiles.
+ */
+@AutoValue
+public abstract class DistributionData {
+
+  public abstract long sum();
+  public abstract long count();
+  public abstract long min();
+  public abstract long max();
+
+  public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE);
+
+  public static DistributionData create(long sum, long count, long min, long max) {
+    return new AutoValue_DistributionData(sum, count, min, max);
+  }
+
+  public static DistributionData singleton(long value) {
+    return create(value, 1, value, value);
+  }
+
+  public DistributionData combine(DistributionData value) {
+    return create(
+        sum() + value.sum(),
+        count() + value.count(),
+        Math.min(value.min(), min()),
+        Math.max(value.max(), max()));
+  }
+
+  public DistributionResult extractResult() {
+    return DistributionResult.create(sum(), count(), min(), max());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
new file mode 100644
index 0000000..27c242c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * The result of a {@link Distribution} metric.
+ */
+@AutoValue
+public abstract class DistributionResult {
+
+  public abstract long sum();
+  public abstract long count();
+  public abstract long min();
+  public abstract long max();
+
+  public double mean() {
+    return (1.0 * sum()) / count();
+  }
+
+  public static final DistributionResult ZERO = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE);
+
+  public static DistributionResult create(long sum, long count, long min, long max) {
+    return new AutoValue_DistributionResult(sum, count, min, max);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
new file mode 100644
index 0000000..37a5f65
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+/**
+ * Marker interface for all user-facing metrics.
+ */
+public interface Metric { }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
new file mode 100644
index 0000000..211b2dd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a
+ * specific metric name in a single context.
+ *
+ * @param <UserT> The type of the user interface for reporting changes to this cell.
+ * @param <DataT> The type of metric data stored (and extracted) from this cell.
+ */
+@Experimental(Kind.METRICS)
+interface MetricCell<UserT extends Metric, DataT> {
+
+  /**
+   * Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes.
+   */
+  DirtyState getDirty();
+
+  /**
+   * Return the cumulative value of this metric.
+   */
+  DataT getCumulative();
+
+  /**
+   * Return the user-facing mutator for this cell.
+   */
+  UserT getInterface();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
new file mode 100644
index 0000000..bfa4df5
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Metrics are keyed by the step name they are associated with and the name of the metric.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricKey {
+
+  /** The step name that is associated with this metric. */
+  public abstract String stepName();
+
+  /** The name of the metric. */
+  public abstract MetricName metricName();
+
+  public static MetricKey create(String stepName, MetricName metricName) {
+    return new AutoValue_MetricKey(stepName, metricName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
new file mode 100644
index 0000000..843a885
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The name of a metric consists of a {@link #namespace} and a {@link #name}. The {@link #namespace}
+ * allows grouping related metrics together and also prevents collisions between multiple metrics
+ * with the same name.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricName {
+
+  /** The namespace associated with this metric. */
+  public abstract String namespace();
+
+  /** The name of this metric. */
+  public abstract String name();
+
+  public static MetricName named(String namespace, String name) {
+    return new AutoValue_MetricName(namespace, name);
+  }
+
+  public static MetricName named(Class<?> namespace, String name) {
+    return new AutoValue_MetricName(namespace.getName(), name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
new file mode 100644
index 0000000..a2c3798
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The name of a metric.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricNameFilter {
+
+  /** The inNamespace that a metric must be in to match this {@link MetricNameFilter}. */
+  public abstract String getNamespace();
+
+  /** If set, the metric must have this name to match this {@link MetricNameFilter}. */
+  @Nullable
+  public abstract String getName();
+
+  public static MetricNameFilter inNamespace(String namespace) {
+    return new AutoValue_MetricNameFilter(namespace, null);
+  }
+
+  public static MetricNameFilter inNamespace(Class<?> namespace) {
+    return new AutoValue_MetricNameFilter(namespace.getName(), null);
+  }
+
+  public static MetricNameFilter named(String namespace, String name) {
+    checkNotNull(name, "Must specify a name");
+    return new AutoValue_MetricNameFilter(namespace, name);
+  }
+
+  public static MetricNameFilter named(Class<?> namespace, String name) {
+    checkNotNull(namespace, "Must specify a inNamespace");
+    checkNotNull(name, "Must specify a name");
+    return new AutoValue_MetricNameFilter(namespace.getSimpleName(), name);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
new file mode 100644
index 0000000..2241ba8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The results of a query for metrics. Allows accessing all of the metrics that matched the filter.
+ */
+@Experimental(Kind.METRICS)
+public interface MetricQueryResults {
+  /** Return the metric results for the counters that matched the filter. */
+  Iterable<MetricResult<Long>> counters();
+
+  /** Return the metric results for the distributions that matched the filter. */
+  Iterable<MetricResult<DistributionResult>> distributions();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
new file mode 100644
index 0000000..9a3971a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The results of a single current metric.
+ */
+@Experimental(Kind.METRICS)
+public interface MetricResult<T> {
+  /** Return the name of the metric. */
+  MetricName name();
+  /** Return the step context to which this metric result applies. */
+  String step();
+
+  /**
+   * Return the value of this metric across all successfully completed parts of the pipeline.
+   *
+   * <p>Not all runners will support committed metrics. If they are not supported, the runner will
+   * throw an {@link UnsupportedOperationException}.
+   */
+  T committed();
+
+  /**
+   * Return the value of this metric across all attempts of executing all parts of the pipeline.
+   */
+  T attempted();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java
new file mode 100644
index 0000000..dab65ea
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Methods for interacting with the metrics of a pipeline that has been executed. Accessed via
+ * {@link PipelineResult#metrics()}.
+ */
+@Experimental(Kind.METRICS)
+public abstract class MetricResults {
+  /**
+   * Query for all metrics that match the filter.
+   */
+  public abstract MetricQueryResults queryMetrics(MetricsFilter filter);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
new file mode 100644
index 0000000..e84dc66
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Representation of multiple metric updates.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricUpdates {
+
+  public static final MetricUpdates EMPTY = MetricUpdates.create(
+      Collections.<MetricUpdate<Long>>emptyList(),
+      Collections.<MetricUpdate<DistributionData>>emptyList());
+
+  /**
+   * Representation of a single metric update.
+   * @param <T> The type of value representing the update.
+   */
+  @AutoValue
+  public abstract static class MetricUpdate<T> {
+
+    /** The key being updated. */
+    public abstract MetricKey getKey();
+    /** The value of the update. */
+    public abstract T getUpdate();
+
+    public static <T> MetricUpdate<T> create(MetricKey key, T update) {
+      return new AutoValue_MetricUpdates_MetricUpdate(key, update);
+    }
+  }
+
+  /** Returns true if there are no updates in this MetricUpdates object. */
+  public boolean isEmpty() {
+    return Iterables.isEmpty(counterUpdates())
+        && Iterables.isEmpty(distributionUpdates());
+  }
+
+  /** All of the counter updates. */
+  public abstract Iterable<MetricUpdate<Long>> counterUpdates();
+
+  /** All of the distribution updates. */
+  public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates();
+
+  /** Create a new {@link MetricUpdates} bundle. */
+  public static MetricUpdates create(
+      Iterable<MetricUpdate<Long>> counterUpdates,
+      Iterable<MetricUpdate<DistributionData>> distributionUpdates) {
+    return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
new file mode 100644
index 0000000..b72a0b2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The <code>Metrics</code> is a utility class for producing various kinds of metrics for
+ * reporting properties of an executing pipeline.
+ */
+@Experimental(Kind.METRICS)
+public class Metrics {
+
+  private Metrics() {}
+
+  /**
+   * Create a metric that can be incremented and decremented, and is aggregated by taking the sum.
+   */
+  public static Counter counter(String namespace, String name) {
+    return new DelegatingCounter(MetricName.named(namespace, name));
+  }
+
+  /**
+   * Create a metric that can be incremented and decremented, and is aggregated by taking the sum.
+   */
+  public static Counter counter(Class<?> namespace, String name) {
+    return new DelegatingCounter(MetricName.named(namespace, name));
+  }
+
+  /**
+   * Create a metric that records various statistics about the distribution of reported values.
+   */
+  public static Distribution distribution(String namespace, String name) {
+    return new DelegatingDistribution(MetricName.named(namespace, name));
+  }
+
+  /**
+   * Create a metric that records various statistics about the distribution of reported values.
+   */
+  public static Distribution distribution(Class<?> namespace, String name) {
+    return new DelegatingDistribution(MetricName.named(namespace, name));
+  }
+
+  /** Implementation of {@link Counter} that delegates to the instance for the current context. */
+  private static class DelegatingCounter implements Counter {
+    private final MetricName name;
+
+    private DelegatingCounter(MetricName name) {
+      this.name = name;
+    }
+
+    /** Increment the counter. */
+    @Override public void inc() {
+      inc(1);
+    }
+
+    /** Increment the counter by the given amount. */
+    @Override public void inc(long n) {
+      MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+      if (container != null) {
+        container.getCounter(name).inc(n);
+      }
+    }
+
+    /* Decrement the counter. */
+    @Override public void dec() {
+      inc(-1);
+    }
+
+    /* Decrement the counter by the given amount. */
+    @Override public void dec(long n) {
+      inc(-1 * n);
+    }
+  }
+
+  /**
+   * Implementation of {@link Distribution} that delegates to the instance for the current context.
+   */
+  private static class DelegatingDistribution implements Distribution {
+    private final MetricName name;
+
+    private DelegatingDistribution(MetricName name) {
+      this.name = name;
+    }
+
+    @Override
+    public void update(long value) {
+      MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+      if (container != null) {
+        container.getDistribution(name).update(value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
new file mode 100644
index 0000000..10032a2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+
+/**
+ * Holds the metrics for a single step and unit-of-commit (bundle).
+ *
+ * <p>This class is thread-safe. It is intended to be used with 1 (or more) threads are updating
+ * metrics and at-most 1 thread is extracting updates by calling {@link #getUpdates} and
+ * {@link #commitUpdates}. Outside of this it is still safe. Although races in the update extraction
+ * may cause updates that don't actually have any changes, it will never lose an update.
+ *
+ * <p>For consistency, all threads that update metrics should finish before getting the final
+ * cumulative values/updates.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsContainer {
+
+  private final String stepName;
+
+  private MetricsMap<MetricName, CounterCell> counters =
+      new MetricsMap<>(new MetricsMap.Factory<MetricName, CounterCell>() {
+        @Override
+        public CounterCell createInstance(MetricName unusedKey) {
+          return new CounterCell();
+        }
+      });
+
+  private MetricsMap<MetricName, DistributionCell> distributions =
+      new MetricsMap<>(new MetricsMap.Factory<MetricName, DistributionCell>() {
+        @Override
+        public DistributionCell createInstance(MetricName unusedKey) {
+          return new DistributionCell();
+        }
+      });
+
+  /**
+   * Create a new {@link MetricsContainer} associated with the given {@code stepName}.
+   */
+  public MetricsContainer(String stepName) {
+    this.stepName = stepName;
+  }
+
+  /**
+   * Return the {@link CounterCell} that should be used for implementing the given
+   * {@code metricName} in this container.
+   */
+  public CounterCell getCounter(MetricName metricName) {
+    return counters.get(metricName);
+  }
+
+  public DistributionCell getDistribution(MetricName metricName) {
+    return distributions.get(metricName);
+  }
+
+  private <UpdateT, CellT extends MetricCell<?, UpdateT>>
+  ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
+      MetricsMap<MetricName, CellT> cells) {
+    ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
+    for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+      if (cell.getValue().getDirty().beforeCommit()) {
+        updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()),
+            cell.getValue().getCumulative()));
+      }
+    }
+    return updates.build();
+  }
+
+  /**
+   * Return the cumulative values for any metrics that have changed since the last time updates were
+   * committed.
+   */
+  public MetricUpdates getUpdates() {
+    return MetricUpdates.create(
+        extractUpdates(counters),
+        extractUpdates(distributions));
+  }
+
+  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) {
+    for (MetricCell<?, ?> cell : cells.values()) {
+      cell.getDirty().afterCommit();
+    }
+  }
+
+  /**
+   * Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as
+   * committed.
+   */
+  public void commitUpdates() {
+    commitUpdates(counters);
+    commitUpdates(distributions);
+  }
+
+  private <UpdateT, CellT extends MetricCell<?, UpdateT>>
+  ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
+      MetricsMap<MetricName, CellT> cells) {
+    ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
+    for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+      UpdateT update = checkNotNull(cell.getValue().getCumulative());
+      updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), update));
+    }
+    return updates.build();
+  }
+
+  /**
+   * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this
+   * container.
+   */
+  public MetricUpdates getCumulative() {
+    ImmutableList.Builder<MetricUpdate<Long>> counterUpdates = ImmutableList.builder();
+    for (Map.Entry<MetricName, CounterCell> counter : counters.entries()) {
+      counterUpdates.add(MetricUpdate.create(
+          MetricKey.create(stepName, counter.getKey()), counter.getValue().getCumulative()));
+    }
+
+    ImmutableList.Builder<MetricUpdate<DistributionData>> distributionUpdates =
+        ImmutableList.builder();
+    for (Map.Entry<MetricName, DistributionCell> distribution : distributions.entries()) {
+      distributionUpdates.add(MetricUpdate.create(
+          MetricKey.create(stepName, distribution.getKey()),
+          distribution.getValue().getCumulative()));
+    }
+    return MetricUpdates.create(
+        extractCumulatives(counters),
+        extractCumulatives(distributions));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
new file mode 100644
index 0000000..ef2660a8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages and provides the metrics container associated with each thread.
+ *
+ * <p>Users should not interact directly with this class. Instead, use {@link Metrics} and the
+ * returned objects to create and modify metrics.
+ *
+ * <p>The runner should create {@link MetricsContainer} for each context in which metrics are
+ * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that
+ * may update metrics within that step.
+ *
+ * <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to
+ * the previous value) when exiting code that set the metrics container.
+ */
+public class MetricsEnvironment {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MetricsContainer.class);
+
+  private static final AtomicBoolean METRICS_SUPPORTED = new AtomicBoolean(false);
+  private static final AtomicBoolean REPORTED_MISSING_CONTAINER = new AtomicBoolean(false);
+
+  private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD =
+      new ThreadLocal<MetricsContainer>();
+
+  /** Set the {@link MetricsContainer} for the current thread. */
+  public static void setMetricsContainer(MetricsContainer container) {
+    CONTAINER_FOR_THREAD.set(container);
+  }
+
+
+  /** Clear the {@link MetricsContainer} for the current thread. */
+  public static void unsetMetricsContainer() {
+    CONTAINER_FOR_THREAD.remove();
+  }
+
+  /** Called by the run to indicate whether metrics reporting is supported. */
+  public static void setMetricsSupported(boolean supported) {
+    METRICS_SUPPORTED.set(supported);
+  }
+
+  /**
+   * Return the {@link MetricsContainer} for the current thread.
+   *
+   * <p>May return null if metrics are not supported by the current runner or if the current thread
+   * is not a work-execution thread. The first time this happens in a given thread it will log a
+   * diagnostic message.
+   */
+  @Nullable
+  public static MetricsContainer getCurrentContainer() {
+    MetricsContainer container = CONTAINER_FOR_THREAD.get();
+    if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, true)) {
+      if (METRICS_SUPPORTED.get()) {
+        LOGGER.error(
+            "Unable to update metrics on the current thread. "
+                + "Most likely caused by using metrics outside the managed work-execution thread.");
+      } else {
+        LOGGER.warn("Reporting metrics are not supported in the current execution environment.");
+      }
+    }
+    return container;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java
new file mode 100644
index 0000000..ec81251
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Simple POJO representing a filter for querying metrics.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricsFilter {
+
+  public Set<String> steps() {
+    return immutableSteps();
+  }
+
+  public Set<MetricNameFilter> names() {
+    return immutableNames();
+  }
+
+  protected abstract ImmutableSet<String> immutableSteps();
+  protected abstract ImmutableSet<MetricNameFilter> immutableNames();
+
+  public static Builder builder() {
+    return new AutoValue_MetricsFilter.Builder();
+  }
+
+  /**
+   * Builder for creating a {@link MetricsFilter}.
+   */
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    protected abstract ImmutableSet.Builder<MetricNameFilter> immutableNamesBuilder();
+    protected abstract ImmutableSet.Builder<String> immutableStepsBuilder();
+
+    /**
+     * Add a {@link MetricNameFilter}.
+     *
+     * <p>If no name filters are specified then all metric names will be inculded.
+     *
+     *
+     * <p>If one or more name filters are specified, then only metrics that match one or more of the
+     * filters will be included.
+     */
+    public Builder addNameFilter(MetricNameFilter nameFilter) {
+      immutableNamesBuilder().add(nameFilter);
+      return this;
+    }
+
+    /**
+     * Add a step filter.
+     *
+     * <p>If no steps are specified then metrics will be included for all steps.
+     *
+     * <p>If one or more steps are specified, then metrics will be included if they are part of
+     * any of the specified steps.
+     */
+    public Builder addStep(String step) {
+      immutableStepsBuilder().add(step);
+      return this;
+    }
+
+    public abstract MetricsFilter build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
new file mode 100644
index 0000000..5a02106
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A map from {@code K} to {@code T} that supports getting or creating values associated with a key
+ * in a thread-safe manner.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsMap<K, T> {
+
+  /** Interface for creating instances to populate the {@link MetricsMap}. */
+  public interface Factory<K, T> {
+    /**
+     * Create an instance of {@code T} to use with the given {@code key}.
+     *
+     * <p>It must be safe to call this from multiple threads.
+     */
+    T createInstance(K key);
+  }
+
+  private final Factory<K, T> factory;
+  private final ConcurrentMap<K, T> metrics = new ConcurrentHashMap<>();
+
+  public MetricsMap(Factory<K, T> factory) {
+    this.factory = factory;
+  }
+
+  /**
+   * Get or create the value associated with the given key.
+   */
+  public T get(K key) {
+    T metric = metrics.get(key);
+    if (metric == null) {
+      metric = factory.createInstance(key);
+      metric = MoreObjects.firstNonNull(metrics.putIfAbsent(key, metric), metric);
+    }
+    return metric;
+  }
+
+  /**
+   * Get the value associated with the given key, if it exists.
+   */
+  @Nullable
+  public T tryGet(K key) {
+    return metrics.get(key);
+  }
+
+  /**
+   * Return an iterable over the entries in the current {@link  MetricsMap}.
+   */
+  public Iterable<Map.Entry<K, T>> entries() {
+    return Iterables.unmodifiableIterable(metrics.entrySet());
+  }
+
+  /**
+   * Return an iterable over the values in the current {@link MetricsMap}.
+   */
+  public Iterable<T> values() {
+    return Iterables.unmodifiableIterable(metrics.values());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java
new file mode 100644
index 0000000..f71dc7a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Metrics allow exporting information about the execution of a pipeline.
+ * They are intended to be used for monitoring and understanding the
+ * execution.
+ *
+ * <p>Metrics may also be queried from the {@link org.apache.beam.sdk.PipelineResult} object.
+ *
+ * <p>Runners should look at {@link org.apache.beam.sdk.metrics.MetricsContainer} for details on
+ * how to support metrics.
+ */
+package org.apache.beam.sdk.metrics;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
new file mode 100644
index 0000000..408f145
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link CounterCell}.
+ */
+@RunWith(JUnit4.class)
+public class CounterCellTest {
+
+  private CounterCell cell = new CounterCell();
+
+  @Test
+  public void testDeltaAndCumulative() {
+    cell.inc(5);
+    cell.inc(7);
+    assertThat(cell.getCumulative(), equalTo(12L));
+    assertThat("getCumulative is idempotent", cell.getCumulative(), equalTo(12L));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+    assertThat(cell.getCumulative(), equalTo(12L));
+
+    cell.inc(30);
+    assertThat(cell.getCumulative(), equalTo(42L));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
new file mode 100644
index 0000000..d00f8cd
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DirtyStateTest}.
+ */
+@RunWith(JUnit4.class)
+public class DirtyStateTest {
+
+  private final DirtyState dirty = new DirtyState();
+
+  @Test
+  public void basicPath() {
+    assertThat("Should start dirty", dirty.beforeCommit(), is(true));
+    dirty.afterCommit();
+    assertThat("Should be clean after commit", dirty.beforeCommit(), is(false));
+
+    dirty.afterModification();
+    assertThat("Should be dirty after change", dirty.beforeCommit(), is(true));
+    dirty.afterCommit();
+    assertThat("Should be clean after commit", dirty.beforeCommit(), is(false));
+  }
+
+  @Test
+  public void changeAfterBeforeCommit() {
+    assertThat("Should start dirty", dirty.beforeCommit(), is(true));
+    dirty.afterModification();
+    dirty.afterCommit();
+    assertThat("Changes after beforeCommit should be dirty after afterCommit",
+        dirty.beforeCommit(), is(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
new file mode 100644
index 0000000..07e0b26
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DistributionCell}.
+ */
+@RunWith(JUnit4.class)
+public class DistributionCellTest {
+  private DistributionCell cell = new DistributionCell();
+
+  @Test
+  public void testDeltaAndCumulative() {
+    cell.update(5);
+    cell.update(7);
+    assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7)));
+    assertThat("getCumulative is idempotent",
+        cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7)));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+    cell.update(30);
+    assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 30)));
+
+    assertThat("Adding a new value made the cell dirty",
+        cell.getDirty().beforeCommit(), equalTo(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
new file mode 100644
index 0000000..bdcb94f
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matchers for metrics.
+ */
+public class MetricMatchers {
+
+  public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, final T update) {
+    return new TypeSafeMatcher<MetricUpdate<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricUpdate<T> item) {
+        return Objects.equals(name, item.getKey().metricName().name())
+            && Objects.equals(update, item.getUpdate());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricUpdate{name=").appendValue(name)
+            .appendText(", update=").appendValue(update)
+            .appendText("}");
+      }
+    };
+  }
+
+  public static <T> Matcher<MetricUpdate<T>> metricUpdate(
+      final String namespace, final String name, final String step, final T update) {
+    return new TypeSafeMatcher<MetricUpdate<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricUpdate<T> item) {
+        return Objects.equals(namespace, item.getKey().metricName().namespace())
+            && Objects.equals(name, item.getKey().metricName().name())
+            && Objects.equals(step, item.getKey().stepName())
+            && Objects.equals(update, item.getUpdate());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricUpdate{inNamespace=").appendValue(namespace)
+            .appendText(", name=").appendValue(name)
+            .appendText(", step=").appendValue(step)
+            .appendText(", update=").appendValue(update)
+            .appendText("}");
+      }
+    };
+  }
+
+  public static <T> Matcher<MetricResult<T>> metricResult(
+      final String namespace, final String name, final String step,
+      final T logical, final T physical) {
+    return new TypeSafeMatcher<MetricResult<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricResult<T> item) {
+        return Objects.equals(namespace, item.name().namespace())
+            && Objects.equals(name, item.name().name())
+            && Objects.equals(step, item.step())
+            && Objects.equals(logical, item.committed())
+            && Objects.equals(physical, item.attempted());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricResult{inNamespace=").appendValue(namespace)
+            .appendText(", name=").appendValue(name)
+            .appendText(", step=").appendValue(step)
+            .appendText(", logical=").appendValue(logical)
+            .appendText(", physical=").appendValue(physical)
+            .appendText("}");
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
new file mode 100644
index 0000000..58797ce
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricsContainer}.
+ */
+@RunWith(JUnit4.class)
+public class MetricsContainerTest {
+
+  @Test
+  public void testCounterDeltas() {
+    MetricsContainer container = new MetricsContainer("step1");
+    CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
+    CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
+    assertThat("All counters should start out dirty",
+        container.getUpdates().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 0L),
+        metricUpdate("name2", 0L)));
+    container.commitUpdates();
+    assertThat("After commit no counters should be dirty",
+        container.getUpdates().counterUpdates(), emptyIterable());
+
+    c1.inc(5L);
+    c2.inc(4L);
+
+    assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 5L),
+        metricUpdate("name2", 4L)));
+
+    assertThat("Since we haven't committed, updates are still included",
+        container.getUpdates().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 5L),
+        metricUpdate("name2", 4L)));
+
+    container.commitUpdates();
+    assertThat("After commit there are no updates",
+        container.getUpdates().counterUpdates(), emptyIterable());
+
+    c1.inc(8L);
+    assertThat(container.getUpdates().counterUpdates(), contains(
+        metricUpdate("name1", 13L)));
+  }
+
+  @Test
+  public void testCounterCumulatives() {
+    MetricsContainer container = new MetricsContainer("step1");
+    CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
+    CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
+    c1.inc(2L);
+    c2.inc(4L);
+    c1.inc(3L);
+
+    container.getUpdates();
+    container.commitUpdates();
+    assertThat("Committing updates shouldn't affect cumulative counter values",
+        container.getCumulative().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 5L),
+        metricUpdate("name2", 4L)));
+
+    c1.inc(8L);
+    assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 13L),
+        metricUpdate("name2", 4L)));
+  }
+
+  @Test
+  public void testDistributionDeltas() {
+    MetricsContainer container = new MetricsContainer("step1");
+    DistributionCell c1 = container.getDistribution(MetricName.named("ns", "name1"));
+    DistributionCell c2 = container.getDistribution(MetricName.named("ns", "name2"));
+
+    assertThat("Initial update includes initial zero-values",
+        container.getUpdates().distributionUpdates(), containsInAnyOrder(
+        metricUpdate("name1", DistributionData.EMPTY),
+        metricUpdate("name2", DistributionData.EMPTY)));
+
+    container.commitUpdates();
+    assertThat("No updates after commit",
+        container.getUpdates().distributionUpdates(), emptyIterable());
+
+    c1.update(5L);
+    c2.update(4L);
+
+    assertThat(container.getUpdates().distributionUpdates(), containsInAnyOrder(
+        metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
+        metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
+    assertThat("Updates stay the same without commit",
+        container.getUpdates().distributionUpdates(), containsInAnyOrder(
+        metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
+        metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
+
+    container.commitUpdates();
+    assertThat("No updatess after commit",
+        container.getUpdates().distributionUpdates(), emptyIterable());
+
+    c1.update(8L);
+    c1.update(4L);
+    assertThat(container.getUpdates().distributionUpdates(), contains(
+        metricUpdate("name1", DistributionData.create(17, 3, 4, 8))));
+    container.commitUpdates();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
new file mode 100644
index 0000000..4200a20
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricsEnvironment}.
+ */
+@RunWith(JUnit4.class)
+public class MetricsEnvironmentTest {
+  @After
+  public void teardown() {
+    MetricsEnvironment.unsetMetricsContainer();
+  }
+
+  @Test
+  public void testUsesAppropriateMetricsContainer() {
+    Counter counter = Metrics.counter("ns", "name");
+    MetricsContainer c1 = new MetricsContainer("step1");
+    MetricsContainer c2 = new MetricsContainer("step2");
+
+    MetricsEnvironment.setMetricsContainer(c1);
+    counter.inc();
+    MetricsEnvironment.setMetricsContainer(c2);
+    counter.dec();
+    MetricsEnvironment.unsetMetricsContainer();
+
+    MetricUpdates updates1 = c1.getUpdates();
+    MetricUpdates updates2 = c2.getUpdates();
+    assertThat(updates1.counterUpdates(), contains(metricUpdate("ns", "name", "step1", 1L)));
+    assertThat(updates2.counterUpdates(), contains(metricUpdate("ns", "name", "step2", -1L)));
+  }
+
+  @Test
+  public void testBehavesWithoutMetricsContainer() {
+    assertNull(MetricsEnvironment.getCurrentContainer());
+  }
+}


[2/5] incubator-beam git commit: Add the ability to query metrics on PipelineResult

Posted by bc...@apache.org.
Add the ability to query metrics on PipelineResult

All runners currently implement this by throwing an
UnsupportedOperationException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51fee39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51fee39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51fee39b

Branch: refs/heads/master
Commit: 51fee39b7bc66d7f60ea2e0ce31e3cb516a89305
Parents: 8524ed9
Author: bchambers <bc...@google.com>
Authored: Wed Oct 12 10:55:05 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Oct 13 15:29:29 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/direct/DirectRunner.java     |  7 +++++++
 .../org/apache/beam/runners/flink/FlinkRunnerResult.java |  6 ++++++
 .../beam/runners/dataflow/DataflowPipelineJob.java       |  7 +++++++
 .../runners/spark/translation/EvaluationContext.java     |  6 ++++++
 .../main/java/org/apache/beam/sdk/PipelineResult.java    | 11 +++++++++++
 5 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index a72f7ae..e13046d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
@@ -380,6 +381,12 @@ public class DirectRunner
       };
     }
 
+    @Override
+    public MetricResults metrics() {
+      throw new UnsupportedOperationException(
+          "The DirectRunner does not currently support metrics.");
+    }
+
     /**
      * Blocks until the {@link Pipeline} execution represented by this
      * {@link DirectPipelineResult} is complete, returning the terminal state.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 90bb64d..6b15485 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
@@ -86,4 +87,9 @@ public class FlinkRunnerResult implements PipelineResult {
   public State waitUntilFinish(Duration duration) {
     throw new UnsupportedOperationException("FlinkRunnerResult does not support waitUntilFinish.");
   }
+
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 269b824..bbcf11f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.joda.time.Duration;
@@ -426,6 +427,12 @@ public class DataflowPipelineJob implements PipelineResult {
     }
   }
 
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException(
+        "The DataflowRunner does not currently support metrics.");
+  }
+
   private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator)
       throws IOException {
     if (aggregatorTransforms.contains(aggregator)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 2397276..1944b6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -263,6 +264,11 @@ public class EvaluationContext implements EvaluationResult {
   }
 
   @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException("The SparkRunner does not currently support metrics.");
+  }
+
+  @Override
   public <T> Iterable<T> get(PCollection<T> pcollection) {
     @SuppressWarnings("unchecked")
     RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51fee39b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index d9cdc16..d7774bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -18,6 +18,9 @@
 package org.apache.beam.sdk;
 
 import java.io.IOException;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
@@ -127,4 +130,12 @@ public interface PipelineResult {
       return hasReplacement;
     }
   }
+
+  /**
+   * Return the object to access metrics from the pipeline.
+   *
+   * @throws UnsupportedOperationException if the runner doesn't support retrieving metrics.
+   */
+  @Experimental(Kind.METRICS)
+  MetricResults metrics();
 }


[5/5] incubator-beam git commit: Closes #1024

Posted by bc...@apache.org.
Closes #1024


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3c731707
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3c731707
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3c731707

Branch: refs/heads/master
Commit: 3c731707b2d986e7a460907c8f64bbebf1dff714
Parents: e969f3d 834933c
Author: bchambers <bc...@google.com>
Authored: Thu Oct 13 15:29:30 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Oct 13 15:29:30 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectMetrics.java      | 331 +++++++++++++++++++
 .../beam/runners/direct/DirectRunner.java       |  11 +-
 .../beam/runners/direct/EvaluationContext.java  |  10 +
 .../direct/ExecutorServiceParallelExecutor.java |   1 +
 .../direct/ImmutableListBundleFactory.java      |  10 +
 .../runners/direct/StepTransformResult.java     |  49 ++-
 .../beam/runners/direct/TransformExecutor.java  |  35 +-
 .../beam/runners/direct/TransformResult.java    |  12 +
 .../beam/runners/direct/DirectMetricsTest.java  | 133 ++++++++
 .../beam/runners/direct/DirectRunnerTest.java   |  36 ++
 .../runners/direct/TransformExecutorTest.java   |  12 +
 .../beam/runners/flink/FlinkRunnerResult.java   |   6 +
 .../runners/dataflow/DataflowPipelineJob.java   |   7 +
 .../spark/translation/EvaluationContext.java    |   6 +
 .../org/apache/beam/sdk/PipelineResult.java     |  11 +
 .../beam/sdk/annotations/Experimental.java      |   3 +
 .../org/apache/beam/sdk/metrics/Counter.java    |  40 +++
 .../apache/beam/sdk/metrics/CounterCell.java    |  76 +++++
 .../org/apache/beam/sdk/metrics/DirtyState.java |  98 ++++++
 .../apache/beam/sdk/metrics/Distribution.java   |  30 ++
 .../beam/sdk/metrics/DistributionCell.java      |  58 ++++
 .../beam/sdk/metrics/DistributionData.java      |  59 ++++
 .../beam/sdk/metrics/DistributionResult.java    |  42 +++
 .../org/apache/beam/sdk/metrics/Metric.java     |  24 ++
 .../org/apache/beam/sdk/metrics/MetricCell.java |  47 +++
 .../org/apache/beam/sdk/metrics/MetricKey.java  |  40 +++
 .../org/apache/beam/sdk/metrics/MetricName.java |  46 +++
 .../beam/sdk/metrics/MetricNameFilter.java      |  60 ++++
 .../beam/sdk/metrics/MetricQueryResults.java    |  33 ++
 .../apache/beam/sdk/metrics/MetricResult.java   |  45 +++
 .../apache/beam/sdk/metrics/MetricResults.java  |  34 ++
 .../apache/beam/sdk/metrics/MetricUpdates.java  |  72 ++++
 .../org/apache/beam/sdk/metrics/Metrics.java    | 110 ++++++
 .../beam/sdk/metrics/MetricsContainer.java      | 150 +++++++++
 .../beam/sdk/metrics/MetricsEnvironment.java    |  85 +++++
 .../apache/beam/sdk/metrics/MetricsFilter.java  |  86 +++++
 .../org/apache/beam/sdk/metrics/MetricsMap.java |  86 +++++
 .../apache/beam/sdk/metrics/package-info.java   |  28 ++
 .../beam/sdk/metrics/CounterCellTest.java       |  55 +++
 .../apache/beam/sdk/metrics/DirtyStateTest.java |  56 ++++
 .../beam/sdk/metrics/DistributionCellTest.java  |  53 +++
 .../apache/beam/sdk/metrics/MetricMatchers.java |  99 ++++++
 .../beam/sdk/metrics/MetricsContainerTest.java  | 129 ++++++++
 .../sdk/metrics/MetricsEnvironmentTest.java     |  63 ++++
 .../apache/beam/sdk/metrics/MetricsMapTest.java | 103 ++++++
 .../apache/beam/sdk/metrics/MetricsTest.java    |  98 ++++++
 46 files changed, 2645 insertions(+), 33 deletions(-)
----------------------------------------------------------------------



[3/5] incubator-beam git commit: Initial Metrics API for Beam Java

Posted by bc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java
new file mode 100644
index 0000000..4104f8d
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+/**
+ * Tests for {@link MetricsMap}.
+ */
+@RunWith(JUnit4.class)
+public class MetricsMapTest {
+
+  public MetricsMap<String, AtomicLong> metricsMap =
+      new MetricsMap<>(new MetricsMap.Factory<String, AtomicLong>() {
+    @Override
+    public AtomicLong createInstance(String unusedKey) {
+      return new AtomicLong();
+    }
+  });
+
+  @Test
+  public void testCreateSeparateInstances() {
+    AtomicLong foo = metricsMap.get("foo");
+    AtomicLong bar = metricsMap.get("bar");
+
+    assertThat(foo, not(sameInstance(bar)));
+  }
+
+  @Test
+  public void testReuseInstances() {
+    AtomicLong foo1 = metricsMap.get("foo");
+    AtomicLong foo2 = metricsMap.get("foo");
+
+    assertThat(foo1, sameInstance(foo2));
+  }
+
+  @Test
+  public void testGet() {
+    assertThat(metricsMap.tryGet("foo"), nullValue(AtomicLong.class));
+
+    AtomicLong foo = metricsMap.get("foo");
+    assertThat(metricsMap.tryGet("foo"), sameInstance(foo));
+  }
+
+  @Test
+  public void testGetEntries() {
+    AtomicLong foo = metricsMap.get("foo");
+    AtomicLong bar = metricsMap.get("bar");
+    assertThat(metricsMap.entries(), containsInAnyOrder(
+        hasEntry("foo", foo),
+        hasEntry("bar", bar)));
+  }
+
+  private static Matcher<Map.Entry<String, AtomicLong>> hasEntry(
+      final String key, final AtomicLong value) {
+    return new TypeSafeMatcher<Entry<String, AtomicLong>>() {
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("Map.Entry{key=").appendValue(key)
+            .appendText(", value=").appendValue(value)
+            .appendText("}");
+      }
+
+      @Override
+      protected boolean matchesSafely(Entry<String, AtomicLong> item) {
+        return Objects.equals(key, item.getKey())
+            && Objects.equals(value, item.getValue());
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
new file mode 100644
index 0000000..d11b44d
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests for {@link Metrics}.
+ */
+public class MetricsTest {
+
+  private static final String NS = "test";
+  private static final String NAME = "name";
+  private static final MetricName METRIC_NAME = MetricName.named(NS, NAME);
+
+  @After
+  public void tearDown() {
+    MetricsEnvironment.unsetMetricsContainer();
+  }
+
+  @Test
+  public void distributionWithoutContainer() {
+    assertNull(MetricsEnvironment.getCurrentContainer());
+    // Should not fail even though there is no metrics container.
+    Metrics.distribution(NS, NAME).update(5L);
+  }
+
+  @Test
+  public void counterWithoutContainer() {
+    assertNull(MetricsEnvironment.getCurrentContainer());
+    // Should not fail even though there is no metrics container.
+    Counter counter = Metrics.counter(NS, NAME);
+    counter.inc();
+    counter.inc(5L);
+    counter.dec();
+    counter.dec(5L);
+  }
+
+  @Test
+  public void distributionToCell() {
+    MetricsContainer container = new MetricsContainer("step");
+    MetricsEnvironment.setMetricsContainer(container);
+
+    Distribution distribution = Metrics.distribution(NS, NAME);
+
+    distribution.update(5L);
+
+    DistributionCell cell = container.getDistribution(METRIC_NAME);
+    assertThat(cell.getCumulative(), equalTo(DistributionData.create(5, 1, 5, 5)));
+
+    distribution.update(36L);
+    assertThat(cell.getCumulative(), equalTo(DistributionData.create(41, 2, 5, 36)));
+
+    distribution.update(1L);
+    assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 1, 36)));
+  }
+
+  @Test
+  public void counterToCell() {
+    MetricsContainer container = new MetricsContainer("step");
+    MetricsEnvironment.setMetricsContainer(container);
+    Counter counter = Metrics.counter(NS, NAME);
+    CounterCell cell = container.getCounter(METRIC_NAME);
+    counter.inc();
+    assertThat(cell.getCumulative(), CoreMatchers.equalTo(1L));
+
+    counter.inc(47L);
+    assertThat(cell.getCumulative(), CoreMatchers.equalTo(48L));
+
+    counter.dec(5L);
+    assertThat(cell.getCumulative(), CoreMatchers.equalTo(43L));
+
+    counter.dec();
+    assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L));
+  }
+}