You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/07/29 01:01:57 UTC

[1/2] incubator-beam git commit: Implement Aggregators directly for direct-java

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9c447510a -> 1df6f5f97


Implement Aggregators directly for direct-java

Previously, we relied on conversion to Counter rather than just
running the specified CombineFn. This aligns the direct runner
more closely with the model.

This PR also parameterizes DoFnRunner on an AggregatorFactory to
implement aggregators, allowing each runner to provide the appropriate
implementation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15eb67bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15eb67bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15eb67bb

Branch: refs/heads/master
Commit: 15eb67bb06f7cf0f225249810df8b521649f4f23
Parents: 9c44751
Author: bchambers <bc...@google.com>
Authored: Tue Jul 26 10:33:40 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Jul 28 17:20:44 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |  28 +--
 .../org/apache/beam/sdk/util/DoFnRunners.java   |  92 +++++++++-
 .../apache/beam/sdk/util/SimpleDoFnRunner.java  |   7 +-
 .../runners/direct/AggregatorContainer.java     | 183 +++++++++++++++++++
 .../beam/runners/direct/DirectRunner.java       |  14 +-
 .../beam/runners/direct/EvaluationContext.java  |  32 ++--
 .../beam/runners/direct/ParDoEvaluator.java     |  16 +-
 .../runners/direct/StepTransformResult.java     |  15 +-
 .../beam/runners/direct/TransformResult.java    |   9 +-
 .../runners/direct/AggregatorContainerTest.java | 134 ++++++++++++++
 .../beam/runners/direct/DirectRunnerTest.java   |   4 -
 .../runners/direct/EvaluationContextTest.java   |  35 ++--
 .../beam/runners/direct/ParDoEvaluatorTest.java |  11 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  |  31 ++--
 .../direct/ParDoSingleEvaluatorFactoryTest.java |  31 ++--
 .../apache/beam/sdk/transforms/Aggregator.java  |  24 ++-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   7 +-
 .../apache/beam/sdk/util/CounterAggregator.java |  35 +++-
 .../apache/beam/sdk/transforms/DoFnTest.java    |  35 ++++
 19 files changed, 592 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index e267a31..2696020 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
@@ -33,19 +34,15 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import org.joda.time.Instant;
 import org.joda.time.format.PeriodFormat;
-
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -75,7 +72,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       StepContext stepContext,
-      CounterSet.AddCounterMutator addCounterMutator,
+      AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = fn;
     this.context = new DoFnContext<>(
@@ -86,13 +83,13 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
         mainOutputTag,
         sideOutputTags,
         stepContext,
-        addCounterMutator,
+        aggregatorFactory,
         windowingStrategy == null ? null : windowingStrategy.getWindowFn());
   }
 
   /**
    * An implementation of {@code OutputManager} using simple lists, for testing and in-memory
-   * contexts such as the {@link DirectRunner}.
+   * contexts such as the {@code DirectRunner}.
    */
   public static class ListOutputManager implements OutputManager {
 
@@ -180,7 +177,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
     final OutputManager outputManager;
     final TupleTag<OutputT> mainOutputTag;
     final StepContext stepContext;
-    final CounterSet.AddCounterMutator addCounterMutator;
+    final AggregatorFactory aggregatorFactory;
     final WindowFn<?, ?> windowFn;
 
     /**
@@ -196,7 +193,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
                        TupleTag<OutputT> mainOutputTag,
                        List<TupleTag<?>> sideOutputTags,
                        StepContext stepContext,
-                       CounterSet.AddCounterMutator addCounterMutator,
+                       AggregatorFactory aggregatorFactory,
                        WindowFn<?, ?> windowFn) {
       fn.super();
       this.options = options;
@@ -212,7 +209,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
       }
 
       this.stepContext = stepContext;
-      this.addCounterMutator = addCounterMutator;
+      this.aggregatorFactory = aggregatorFactory;
       this.windowFn = windowFn;
       super.setupDelegateAggregators();
     }
@@ -344,18 +341,11 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
       sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
     }
 
-    private String generateInternalAggregatorName(String userName) {
-      boolean system = fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
-      return (system ? "" : "user-") + stepContext.getStepName() + "-" + userName;
-    }
-
     @Override
     protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      checkNotNull(combiner,
-          "Combiner passed to createAggregator cannot be null");
-      return new CounterAggregator<>(generateInternalAggregatorName(name),
-          combiner, addCounterMutator);
+      checkNotNull(combiner, "Combiner passed to createAggregatorForDoFn cannot be null");
+      return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
index 648a281..cb96da2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
@@ -26,7 +27,6 @@ import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
-
 import java.util.List;
 
 /**
@@ -56,7 +56,7 @@ public class DoFnRunners {
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       StepContext stepContext,
-      CounterSet.AddCounterMutator addCounterMutator,
+      AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     return new SimpleDoFnRunner<>(
         options,
@@ -66,7 +66,33 @@ public class DoFnRunners {
         mainOutputTag,
         sideOutputTags,
         stepContext,
-        addCounterMutator,
+        aggregatorFactory,
+        windowingStrategy);
+  }
+
+  /**
+   * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}.
+   *
+   * <p>It invokes {@link DoFn#processElement} for each input.
+   */
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+      PipelineOptions options,
+      DoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      CounterSet.AddCounterMutator addCounterMutator,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    return simpleRunner(options,
+        fn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        CounterAggregator.factoryFor(addCounterMutator),
         windowingStrategy);
   }
 
@@ -84,7 +110,7 @@ public class DoFnRunners {
           TupleTag<KV<K, OutputT>> mainOutputTag,
           List<TupleTag<?>> sideOutputTags,
           StepContext stepContext,
-          CounterSet.AddCounterMutator addCounterMutator,
+          AggregatorFactory aggregatorFactory,
           WindowingStrategy<?, W> windowingStrategy) {
     DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
         simpleRunner(
@@ -95,7 +121,7 @@ public class DoFnRunners {
             mainOutputTag,
             sideOutputTags,
             stepContext,
-            addCounterMutator,
+            aggregatorFactory,
             windowingStrategy);
     return new LateDataDroppingDoFnRunner<>(
         simpleDoFnRunner,
@@ -104,6 +130,34 @@ public class DoFnRunners {
         reduceFnExecutor.getDroppedDueToLatenessAggregator());
   }
 
+  /**
+   * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
+   *
+   * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
+   */
+  public static <K, InputT, OutputT, W extends BoundedWindow>
+  DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
+      PipelineOptions options,
+      ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<KV<K, OutputT>> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      CounterSet.AddCounterMutator addCounterMutator,
+      WindowingStrategy<?, W> windowingStrategy) {
+    return lateDataDroppingRunner(
+        options,
+        reduceFnExecutor,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        CounterAggregator.factoryFor(addCounterMutator),
+        windowingStrategy);
+  }
+
   public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
       PipelineOptions options,
       DoFn<InputT, OutputT> doFn,
@@ -112,7 +166,7 @@ public class DoFnRunners {
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       StepContext stepContext,
-      AddCounterMutator addCounterMutator,
+      AggregatorFactory aggregatorFactory,
       WindowingStrategy<?, ?> windowingStrategy) {
     if (doFn instanceof ReduceFnExecutor) {
       @SuppressWarnings("rawtypes")
@@ -126,7 +180,7 @@ public class DoFnRunners {
           (TupleTag) mainOutputTag,
           sideOutputTags,
           stepContext,
-          addCounterMutator,
+          aggregatorFactory,
           (WindowingStrategy) windowingStrategy);
       return runner;
     }
@@ -138,7 +192,29 @@ public class DoFnRunners {
         mainOutputTag,
         sideOutputTags,
         stepContext,
-        addCounterMutator,
+        aggregatorFactory,
+        windowingStrategy);
+  }
+
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
+      PipelineOptions options,
+      DoFn<InputT, OutputT> doFn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      AddCounterMutator addCounterMutator,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    return createDefault(
+        options,
+        doFn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        CounterAggregator.factoryFor(addCounterMutator),
         windowingStrategy);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
index 78377c8..e034638 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
@@ -18,12 +18,11 @@
 package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
 import org.apache.beam.sdk.values.TupleTag;
-
 import java.util.List;
 
 /**
@@ -38,9 +37,9 @@ public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, Ou
       SideInputReader sideInputReader,
       OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
-      AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> windowingStrategy) {
+      AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) {
     super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext,
-        addCounterMutator, windowingStrategy);
+        aggregatorFactory, windowingStrategy);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
new file mode 100644
index 0000000..75e6558
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.util.ExecutionContext;
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * AccumT container for the current values associated with {@link Aggregator Aggregators}.
+ */
+public class AggregatorContainer {
+
+  private static class AggregatorInfo<InputT, AccumT, OutputT>
+      implements Aggregator<InputT, OutputT> {
+    private final String stepName;
+    private final String name;
+    private final CombineFn<InputT, AccumT, OutputT> combiner;
+    @GuardedBy("this")
+    private AccumT accumulator = null;
+    private boolean committed = false;
+
+    private AggregatorInfo(
+        String stepName, String name, CombineFn<InputT, AccumT, OutputT> combiner) {
+      this.stepName = stepName;
+      this.name = name;
+      this.combiner = combiner;
+    }
+
+    @Override
+    public synchronized void addValue(InputT input) {
+      Preconditions.checkState(!committed, "Cannot addValue after committing");
+      if (accumulator == null) {
+        accumulator = combiner.createAccumulator();
+      }
+      accumulator = combiner.addInput(accumulator, input);
+    }
+
+    public synchronized OutputT getOutput() {
+      return accumulator == null ? null : combiner.extractOutput(accumulator);
+    }
+
+    private void merge(AggregatorInfo<?, ?, ?> other) {
+      // Aggregators are only merged if they are the same (same step, same name).
+      // As a result, they should also have the same CombineFn, so this is safe.
+      AggregatorInfo<InputT, AccumT, OutputT> otherSafe =
+          (AggregatorInfo<InputT, AccumT, OutputT>) other;
+      mergeSafe(otherSafe);
+    }
+
+    private synchronized void mergeSafe(AggregatorInfo<InputT, AccumT, OutputT> other) {
+      if (accumulator == null) {
+        accumulator = other.accumulator;
+      } else if (other.accumulator != null) {
+        accumulator = combiner.mergeAccumulators(Arrays.asList(accumulator, other.accumulator));
+      }
+    }
+
+    public String getStepName() {
+      return name;
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public CombineFn<InputT, ?, OutputT> getCombineFn() {
+      return combiner;
+    }
+  }
+
+  private final ConcurrentMap<AggregatorKey, AggregatorInfo<?, ?, ?>> accumulators =
+      new ConcurrentHashMap<>();
+
+  private AggregatorContainer() {
+  }
+
+  public static AggregatorContainer create() {
+    return new AggregatorContainer();
+  }
+
+  @Nullable
+  <OutputT> OutputT getAggregate(String stepName, String aggregatorName) {
+    AggregatorInfo<?, ?, OutputT> aggregatorInfo =
+        (AggregatorInfo<?, ?, OutputT>) accumulators.get(
+            AggregatorKey.create(stepName, aggregatorName));
+    return aggregatorInfo == null ? null : aggregatorInfo.getOutput();
+  }
+
+  public Mutator createMutator() {
+    return new Mutator(this);
+  }
+
+  /**
+   * AccumT class for mutations to the aggregator values.
+   */
+  public static class Mutator implements AggregatorFactory {
+
+    private final Map<AggregatorKey, AggregatorInfo<?, ?, ?>> accumulatorDeltas = new HashMap<>();
+    private final AggregatorContainer container;
+    private boolean committed = false;
+
+    private Mutator(AggregatorContainer container) {
+      this.container = container;
+    }
+
+    public void commit() {
+      Preconditions.checkState(!committed, "Should not be already committed");
+      committed = true;
+
+      for (Map.Entry<AggregatorKey, AggregatorInfo<?, ?, ?>> entry : accumulatorDeltas.entrySet()) {
+        AggregatorInfo<?, ?, ?> previous = container.accumulators.get(entry.getKey());
+        entry.getValue().committed = true;
+        if (previous == null) {
+          previous = container.accumulators.putIfAbsent(entry.getKey(), entry.getValue());
+        }
+        if (previous != null) {
+          previous.merge(entry.getValue());
+          previous.committed = true;
+        }
+      }
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+        Class<?> fnClass, ExecutionContext.StepContext step,
+        String name, CombineFn<InputT, AccumT, OutputT> combine) {
+      Preconditions.checkState(!committed, "Cannot create aggregators after committing");
+      AggregatorKey key = AggregatorKey.create(step.getStepName(), name);
+      AggregatorInfo<?, ?, ?> aggregatorInfo = accumulatorDeltas.get(key);
+      if (aggregatorInfo != null) {
+        AggregatorInfo<InputT, ?, OutputT> typedAggregatorInfo =
+            (AggregatorInfo<InputT, ?, OutputT>) aggregatorInfo;
+        return typedAggregatorInfo;
+      } else {
+        AggregatorInfo<InputT, ?, OutputT> typedAggregatorInfo =
+            new AggregatorInfo<>(step.getStepName(), name, combine);
+        accumulatorDeltas.put(key, typedAggregatorInfo);
+        return typedAggregatorInfo;
+      }
+    }
+  }
+
+  /**
+   * Aggregators are identified by a step name and an aggregator name.
+   */
+  @AutoValue
+  public abstract static class AggregatorKey {
+    public static AggregatorKey create(String stepName, String aggregatorName)  {
+      return new AutoValue_AggregatorContainer_AggregatorKey(stepName, aggregatorName);
+    }
+
+    public abstract String getStepName();
+    public abstract String aggregatorName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7fd38c2..72194da 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -40,8 +40,6 @@ import org.apache.beam.sdk.util.MapAggregatorValues;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -319,17 +317,15 @@ public class DirectRunner
     @Override
     public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
         throws AggregatorRetrievalException {
-      CounterSet counters = evaluationContext.getCounters();
+      AggregatorContainer aggregators = evaluationContext.getAggregatorContainer();
       Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
       Map<String, T> stepValues = new HashMap<>();
       for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
         if (steps.contains(transform.getTransform())) {
-          String stepName =
-              String.format(
-                  "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName());
-          Counter<T> counter = (Counter<T>) counters.getExistingCounter(stepName);
-          if (counter != null) {
-            stepValues.put(transform.getFullName(), counter.getAggregate());
+          T aggregate = aggregators.getAggregate(
+              evaluationContext.getStepName(transform), aggregator.getName());
+          if (aggregate != null) {
+            stepValues.put(transform.getFullName(), aggregate);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index bee878b..ea713fa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -64,7 +63,7 @@ import javax.annotation.Nullable;
  * <p>{@link EvaluationContext} contains shared state for an execution of the
  * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This
  * consists of views into underlying state and watermark implementations, access to read and write
- * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and
+ * {@link PCollectionView PCollectionViews}, and managing the {@link AggregatorContainer} and
  * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
  * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
  * known to be empty).
@@ -94,7 +93,7 @@ class EvaluationContext {
 
   private final SideInputContainer sideInputContainer;
 
-  private final CounterSet mergedCounters;
+  private final AggregatorContainer mergedAggregators;
 
   public static EvaluationContext create(
       DirectOptions options,
@@ -128,7 +127,7 @@ class EvaluationContext {
     this.sideInputContainer = SideInputContainer.create(this, views);
 
     this.applicationStateInternals = new ConcurrentHashMap<>();
-    this.mergedCounters = new CounterSet();
+    this.mergedAggregators = AggregatorContainer.create();
 
     this.callbackExecutor =
         WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
@@ -166,9 +165,9 @@ class EvaluationContext {
         result.getTimerUpdate().withCompletedTimers(completedTimers),
         committedResult,
         result.getWatermarkHold());
-    // Update counters
-    if (result.getCounters() != null) {
-      mergedCounters.merge(result.getCounters());
+    // Commit aggregator changes
+    if (result.getAggregatorChanges() != null) {
+      result.getAggregatorChanges().commit();
     }
     // Update state internals
     CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
@@ -340,25 +339,18 @@ class EvaluationContext {
     return sideInputContainer.createReaderForViews(sideInputs);
   }
 
-
   /**
-   * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
-   * of all other {@link CounterSet CounterSets} created by this call.
-   *
-   * The {@link EvaluationContext} is responsible for unifying the counters present in
-   * all created {@link CounterSet CounterSets} when the transforms that call this method
-   * complete.
+   * Returns a new mutator for the {@link AggregatorContainer}.
    */
-  public CounterSet createCounterSet() {
-    return new CounterSet();
+  public AggregatorContainer.Mutator getAggregatorMutator() {
+    return mergedAggregators.createMutator();
   }
 
   /**
-   * Returns all of the counters that have been merged into this context via calls to
-   * {@link CounterSet#merge(CounterSet)}.
+   * Returns the counter container for this context.
    */
-  public CounterSet getCounters() {
-    return mergedCounters;
+  public AggregatorContainer getAggregatorContainer() {
+    return mergedAggregators;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 485cf4b..dd1cf37 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -57,7 +56,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
     DirectExecutionContext executionContext =
         evaluationContext.getExecutionContext(application, inputBundle.getKey());
 
-    CounterSet counters = evaluationContext.createCounterSet();
+    AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
 
     Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
     for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
@@ -77,7 +76,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
             mainOutputTag,
             sideOutputTags,
             stepContext,
-            counters.getAddCounterMutator(),
+            aggregatorChanges,
             application.getInput().getWindowingStrategy());
     PushbackSideInputDoFnRunner<InputT, OutputT> runner =
         PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
@@ -89,14 +88,14 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
     }
 
     return new ParDoEvaluator<>(
-        runner, application, counters, outputBundles.values(), stepContext);
+        runner, application, aggregatorChanges, outputBundles.values(), stepContext);
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
 
   private final PushbackSideInputDoFnRunner<T, ?> fnRunner;
   private final AppliedPTransform<PCollection<T>, ?, ?> transform;
-  private final CounterSet counters;
+  private final AggregatorContainer.Mutator aggregatorChanges;
   private final Collection<UncommittedBundle<?>> outputBundles;
   private final DirectStepContext stepContext;
 
@@ -105,15 +104,14 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
   private ParDoEvaluator(
       PushbackSideInputDoFnRunner<T, ?> fnRunner,
       AppliedPTransform<PCollection<T>, ?, ?> transform,
-      CounterSet counters,
+      AggregatorContainer.Mutator aggregatorChanges,
       Collection<UncommittedBundle<?>> outputBundles,
       DirectStepContext stepContext) {
     this.fnRunner = fnRunner;
     this.transform = transform;
-    this.counters = counters;
     this.outputBundles = outputBundles;
     this.stepContext = stepContext;
-
+    this.aggregatorChanges = aggregatorChanges;
     this.unprocessedElements = ImmutableList.builder();
   }
 
@@ -146,7 +144,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
     return resultBuilder
         .addOutput(outputBundles)
         .withTimerUpdate(stepContext.getTimerUpdate())
-        .withCounters(counters)
+        .withAggregatorChanges(aggregatorChanges)
         .addUnprocessedElements(unprocessedElements.build())
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index e2bacbe..176bb14 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -22,16 +22,11 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
-
 import org.joda.time.Instant;
-
 import java.util.Collection;
-
 import javax.annotation.Nullable;
 
 /**
@@ -50,7 +45,7 @@ public abstract class StepTransformResult implements TransformResult {
 
   @Override
   @Nullable
-  public abstract CounterSet getCounters();
+  public abstract AggregatorContainer.Mutator getAggregatorChanges();
 
   @Override
   public abstract Instant getWatermarkHold();
@@ -79,7 +74,7 @@ public abstract class StepTransformResult implements TransformResult {
     private final ImmutableList.Builder<WindowedValue<?>> unprocessedElementsBuilder;
     private CopyOnAccessInMemoryStateInternals<?> state;
     private TimerUpdate timerUpdate;
-    private CounterSet counters;
+    private AggregatorContainer.Mutator aggregatorChanges;
     private final Instant watermarkHold;
 
     private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
@@ -95,14 +90,14 @@ public abstract class StepTransformResult implements TransformResult {
           transform,
           bundlesBuilder.build(),
           unprocessedElementsBuilder.build(),
-          counters,
+          aggregatorChanges,
           watermarkHold,
           state,
           timerUpdate);
     }
 
-    public Builder withCounters(CounterSet counters) {
-      this.counters = counters;
+    public Builder withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) {
+      this.aggregatorChanges = aggregatorChanges;
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index c1e502d..65f9629 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -23,11 +23,8 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-
 import org.joda.time.Instant;
-
 import javax.annotation.Nullable;
 
 /**
@@ -52,10 +49,10 @@ public interface TransformResult {
   Iterable<? extends WindowedValue<?>> getUnprocessedElements();
 
   /**
-   * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did
-   * not use a {@link CounterSet}.
+   * Returns the {@link AggregatorContainer.Mutator} used by this {@link PTransform}, or null if
+   * this transform did not use an {@link AggregatorContainer.Mutator}.
    */
-  @Nullable CounterSet getCounters();
+  @Nullable AggregatorContainer.Mutator getAggregatorChanges();
 
   /**
    * Returns the Watermark Hold for the transform at the time this result was produced.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
new file mode 100644
index 0000000..035a1b0
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link AggregatorContainer}.
+ */
+public class AggregatorContainerTest {
+
+  @Rule
+  public final ExpectedException thrown = ExpectedException.none();
+  private final AggregatorContainer container = AggregatorContainer.create();
+
+  private static final String STEP_NAME = "step";
+  private final Class<?> fn = getClass();
+
+  @Mock
+  private StepContext stepContext;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    when(stepContext.getStepName()).thenReturn(STEP_NAME);
+  }
+
+  @Test
+  public void addsAggregatorsOnCommit() {
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+    mutator.commit();
+
+    assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
+
+    mutator = container.createMutator();
+    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(8);
+
+    assertThat("Shouldn't update value until commit",
+        (Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(5));
+    mutator.commit();
+    assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(13));
+  }
+
+  @Test
+  public void failToCreateAfterCommit() {
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    mutator.commit();
+
+    thrown.expect(IllegalStateException.class);
+    mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+  }
+
+  @Test
+  public void failToAddValueAfterCommit() {
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    Aggregator<Integer, ?> aggregator =
+        mutator.createAggregatorForDoFn(fn, stepContext, "sum_int", new SumIntegerFn());
+    mutator.commit();
+
+    thrown.expect(IllegalStateException.class);
+    aggregator.addValue(5);
+  }
+
+  @Test
+  public void failToAddValueAfterCommitWithPrevious() {
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    mutator.createAggregatorForDoFn(
+        fn, stepContext, "sum_int", new SumIntegerFn()).addValue(5);
+    mutator.commit();
+
+    mutator = container.createMutator();
+    Aggregator<Integer, ?> aggregator = mutator.createAggregatorForDoFn(
+        fn, stepContext, "sum_int", new SumIntegerFn());
+    mutator.commit();
+
+    thrown.expect(IllegalStateException.class);
+    aggregator.addValue(5);
+  }
+
+  @Test
+  public void concurrentWrites() throws InterruptedException {
+    ExecutorService executor = Executors.newFixedThreadPool(20);
+    int sum = 0;
+    for (int i = 0; i < 100; i++) {
+      sum += i;
+      final int value = i;
+      final AggregatorContainer.Mutator mutator = container.createMutator();
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          mutator.createAggregatorForDoFn(
+              fn, stepContext, "sum_int", new SumIntegerFn()).addValue(value);
+          mutator.commit();
+        }
+      });
+    }
+    executor.shutdown();
+    assertThat("Expected all threads to complete after 5 seconds",
+        executor.awaitTermination(5, TimeUnit.SECONDS), equalTo(true));
+
+    assertThat((Integer) container.getAggregate(STEP_NAME, "sum_int"), equalTo(sum));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 1de38df..09707bd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -45,11 +45,8 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TypeDescriptor;
-
 import com.google.common.collect.ImmutableMap;
-
 import com.fasterxml.jackson.annotation.JsonValue;
-
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -57,7 +54,6 @@ import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 1726866..cae7ffd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -50,9 +51,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateNamespaces;
@@ -63,17 +61,14 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -243,33 +238,35 @@ public class EvaluationContextTest {
   }
 
   @Test
-  public void handleResultMergesCounters() {
-    CounterSet counters = context.createCounterSet();
-    Counter<Long> myCounter = Counter.longs("foo", AggregationKind.SUM);
-    counters.addCounter(myCounter);
+  public void handleResultCommitsAggregators() {
+    Class<?> fn = getClass();
+    DirectExecutionContext fooContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), null);
+    DirectExecutionContext.StepContext stepContext = fooContext.createStepContext(
+        "STEP", created.getProducingTransformInternal().getTransform().getName());
+    AggregatorContainer container = context.getAggregatorContainer();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L);
 
-    myCounter.addValue(4L);
     TransformResult result =
         StepTransformResult.withoutHold(created.getProducingTransformInternal())
-            .withCounters(counters)
+            .withAggregatorChanges(mutator)
             .build();
     context.handleResult(null, ImmutableList.<TimerData>of(), result);
-    assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L));
+    assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(4L));
 
-    CounterSet againCounters = context.createCounterSet();
-    Counter<Long> myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM);
-    againCounters.add(myLongCounterAgain);
-    myLongCounterAgain.addValue(8L);
+    AggregatorContainer.Mutator mutatorAgain = container.createMutator();
+    mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L);
 
     TransformResult secondResult =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
-            .withCounters(againCounters)
+            .withAggregatorChanges(mutatorAgain)
             .build();
     context.handleResult(
         context.createRootBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         secondResult);
-    assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L));
+    assertThat((Long) context.getAggregatorContainer().getAggregate("STEP", "foo"), equalTo(16L));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index bce37e4..07f478d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -41,16 +41,13 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -60,11 +57,9 @@ import org.junit.runners.JUnit4;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-
 import javax.annotation.Nullable;
 
 /**
@@ -156,7 +151,11 @@ public class ParDoEvaluatorTest {
             evaluationContext.getExecutionContext(
                 Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class)))
         .thenReturn(executionContext);
-    when(evaluationContext.createCounterSet()).thenReturn(new CounterSet());
+
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     return ParDoEvaluator.create(
         evaluationContext,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index 6206c22..c0ab4df 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -42,7 +42,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
@@ -54,14 +53,12 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-
 import java.io.Serializable;
 
 /**
@@ -116,8 +113,10 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         new DirectExecutionContext(null, null, null, null);
     when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
         inputBundle.getKey())).thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
         new ParDoMultiEvaluatorFactory()
@@ -136,7 +135,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         Matchers.<UncommittedBundle<?>>containsInAnyOrder(
             lengthOutputBundle, mainOutputBundle, elementOutputBundle));
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getCounters(), equalTo(counters));
+    assertThat(result.getAggregatorChanges(), equalTo(mutator));
 
     assertThat(
         mainOutputBundle.commit(Instant.now()).getElements(),
@@ -201,8 +200,10 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         new DirectExecutionContext(null, null, null, null);
     when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
         inputBundle.getKey())).thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
         new ParDoMultiEvaluatorFactory()
@@ -220,7 +221,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         result.getOutputBundles(),
         Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle));
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getCounters(), equalTo(counters));
+    assertThat(result.getAggregatorChanges(), equalTo(mutator));
 
     assertThat(
         mainOutputBundle.commit(Instant.now()).getElements(),
@@ -293,8 +294,10 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         null);
     when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
         inputBundle.getKey())).thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
         new ParDoMultiEvaluatorFactory()
@@ -404,8 +407,10 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         null, null);
     when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
         inputBundle.getKey())).thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
         new ParDoMultiEvaluatorFactory()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index c378cf4..d778da6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -41,7 +41,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
@@ -51,14 +50,12 @@ import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
-
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-
 import java.io.Serializable;
 
 /**
@@ -92,8 +89,10 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
         new DirectExecutionContext(null, null, null, null);
     when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
         inputBundle.getKey())).thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
         new ParDoSingleEvaluatorFactory()
@@ -109,7 +108,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     TransformResult result = evaluator.finishBundle();
     assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getCounters(), equalTo(counters));
+    assertThat(result.getAggregatorChanges(), equalTo(mutator));
 
     assertThat(
         outputBundle.commit(Instant.now()).getElements(),
@@ -144,8 +143,10 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
         new DirectExecutionContext(null, null, null, null);
     when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
         inputBundle.getKey())).thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
         new ParDoSingleEvaluatorFactory()
@@ -162,7 +163,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     assertThat(
         result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
-    assertThat(result.getCounters(), equalTo(counters));
+    assertThat(result.getAggregatorChanges(), equalTo(mutator));
   }
 
   @Test
@@ -210,8 +211,10 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
         inputBundle.getKey()))
         .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
         new ParDoSingleEvaluatorFactory()
@@ -311,8 +314,10 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
         inputBundle.getKey()))
         .thenReturn(executionContext);
-    CounterSet counters = new CounterSet();
-    when(evaluationContext.createCounterSet()).thenReturn(counters);
+    AggregatorContainer container = AggregatorContainer.create();
+    AggregatorContainer.Mutator mutator = container.createMutator();
+    when(evaluationContext.getAggregatorContainer()).thenReturn(container);
+    when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
         new ParDoSingleEvaluatorFactory()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index 75b7d9e..c8aad78 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -18,12 +18,13 @@
 package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.util.ExecutionContext;
 
 /**
  * An {@code Aggregator<InputT>} enables monitoring of values of type {@code InputT},
  * to be combined across all bundles.
  *
- * <p>Aggregators are created by calling {@link DoFn#createAggregator DoFn.createAggregator},
+ * <p>Aggregators are created by calling {@link DoFn#createAggregator DoFn.createAggregatorForDoFn},
  * typically from the {@link DoFn} constructor. Elements can be added to the
  * {@code Aggregator} by calling {@link Aggregator#addValue}.
  *
@@ -39,7 +40,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
  *   private Aggregator<Integer, Integer> myAggregator;
  *
  *   public MyDoFn() {
- *     myAggregator = createAggregator("myAggregator", new Sum.SumIntegerFn());
+ *     myAggregator = createAggregatorForDoFn("myAggregator", new Sum.SumIntegerFn());
  *   }
  *
  *   @Override
@@ -70,10 +71,25 @@ public interface Aggregator<InputT, OutputT> {
    */
   CombineFn<InputT, ?, OutputT> getCombineFn();
 
+  /**
+   * A factory for creating aggregators.
+   */
+  interface AggregatorFactory {
+    /**
+     * Create an aggregator with the given {@code name} and {@link CombineFn}.
+     *
+     *  <p>This method is called to create an aggregator for a {@link DoFn}. It receives the class
+     *  of the {@link DoFn} being executed and the context of the step it is being executed in.
+     */
+    <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+        Class<?> fnClass, ExecutionContext.StepContext stepContext,
+        String aggregatorName, CombineFn<InputT, AccumT, OutputT> combine);
+  }
+
   // TODO: Consider the following additional API conveniences:
-  // - In addition to createAggregator(), consider adding getAggregator() to
+  // - In addition to createAggregatorForDoFn(), consider adding getAggregator() to
   //   avoid the need to store the aggregator locally in a DoFn, i.e., create
   //   if not already present.
   // - Add a shortcut for the most common aggregator:
-  //   c.createAggregator("name", new Sum.SumIntegerFn()).
+  //   c.createAggregatorForDoFn("name", new Sum.SumIntegerFn()).
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index c8bd5de..a136632 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -33,15 +33,12 @@ import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
-
 import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-
 import org.joda.time.Instant;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -646,8 +643,8 @@ public class DoFnTester<InputT, OutputT> {
     protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new IllegalStateException("Aggregators should not be created within ProcessContext. "
-          + "Instead, create an aggregator at DoFn construction time with createAggregator, and "
-          + "ensure they are set up by the time startBundle is called "
+          + "Instead, create an aggregator at DoFn construction time with createAggregatorForDoFn,"
+          + " and ensure they are set up by the time startBundle is called "
           + "with setupDelegateAggregators.");
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
index 5fd04f5..5bde8ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
@@ -25,6 +25,8 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.common.Counter;
 import org.apache.beam.sdk.util.common.CounterProvider;
 import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * An implementation of the {@code Aggregator} interface that uses a
@@ -35,12 +37,40 @@ import org.apache.beam.sdk.util.common.CounterSet;
  * @param <AccumT> the type of accumulator values
  * @param <OutputT> the type of output value
  */
-public class CounterAggregator<InputT, AccumT, OutputT> implements Aggregator<InputT, OutputT> {
+public class CounterAggregator<InputT, AccumT, OutputT>
+    implements Aggregator<InputT, OutputT> {
+
+  private static class CounterAggregatorFactory implements AggregatorFactory {
+    private final AddCounterMutator addCounterMutator;
+
+    private CounterAggregatorFactory(CounterSet.AddCounterMutator addCounterMutator) {
+      this.addCounterMutator = addCounterMutator;
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+        Class<?> fnClass, ExecutionContext.StepContext stepContext,
+        String userName, CombineFn<InputT, AccumT, OutputT> combine) {
+      boolean isSystem = fnClass.isAnnotationPresent(SystemDoFnInternal.class);
+      String mangledName = (isSystem ? "" : "user-") + stepContext.getStepName() + "-" + userName;
+
+      return new CounterAggregator<>(mangledName, combine, addCounterMutator);
+    }
+  }
 
   private final Counter<InputT> counter;
   private final CombineFn<InputT, AccumT, OutputT> combiner;
 
   /**
+   * Create a factory for producing {@link CounterAggregator CounterAggregators} backed by the given
+   * {@link CounterSet.AddCounterMutator}.
+   */
+  public static AggregatorFactory factoryFor(
+      CounterSet.AddCounterMutator addCounterMutator) {
+    return new CounterAggregatorFactory(addCounterMutator);
+  }
+
+  /**
    * Constructs a new aggregator with the given name and aggregation logic
    * specified in the CombineFn argument. The underlying counter is
    * automatically added into the provided CounterSet.
@@ -48,7 +78,8 @@ public class CounterAggregator<InputT, AccumT, OutputT> implements Aggregator<In
    *  <p>If a counter with the same name already exists, it will be reused, as
    * long as it has the same type.
    */
-  public CounterAggregator(String name, CombineFn<? super InputT, AccumT, OutputT> combiner,
+  @VisibleForTesting CounterAggregator(
+      String name, CombineFn<? super InputT, AccumT, OutputT> combiner,
       CounterSet.AddCounterMutator addCounterMutator) {
     // Safe contravariant cast
     this(constructCounter(name, combiner), addCounterMutator,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15eb67bb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index dbcc1fe..9242ece 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -18,18 +18,24 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
+import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
+import com.google.common.collect.ImmutableMap;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -38,6 +44,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.io.Serializable;
+import java.util.Map;
 
 /**
  * Tests for DoFn.
@@ -204,4 +211,32 @@ public class DoFnTest implements Serializable {
     DisplayData data = DisplayData.from(usesDefault);
     assertThat(data.items(), empty());
   }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAggregators() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+
+    CountOddsFn countOdds = new CountOddsFn();
+    pipeline
+        .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
+        .apply(ParDo.of(countOdds));
+    PipelineResult result = pipeline.run();
+
+    AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator);
+    assertThat(values.getValuesAtSteps(),
+        equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4)));
+  }
+
+  private static class CountOddsFn extends DoFn<Integer, Void> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      if (c.element() % 2 == 1) {
+        aggregator.addValue(1);
+      }
+    }
+
+    Aggregator<Integer, Integer> aggregator =
+        createAggregator("odds", new SumIntegerFn());
+  }
 }


[2/2] incubator-beam git commit: Implement aggregators directly in the direct runner

Posted by bc...@apache.org.
Implement aggregators directly in the direct runner

This closes #735


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1df6f5f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1df6f5f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1df6f5f9

Branch: refs/heads/master
Commit: 1df6f5f976108aa584abbb8a9412578e1f6a621b
Parents: 9c44751 15eb67b
Author: bchambers <bc...@google.com>
Authored: Thu Jul 28 17:22:03 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Jul 28 17:22:03 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |  28 +--
 .../org/apache/beam/sdk/util/DoFnRunners.java   |  92 +++++++++-
 .../apache/beam/sdk/util/SimpleDoFnRunner.java  |   7 +-
 .../runners/direct/AggregatorContainer.java     | 183 +++++++++++++++++++
 .../beam/runners/direct/DirectRunner.java       |  14 +-
 .../beam/runners/direct/EvaluationContext.java  |  32 ++--
 .../beam/runners/direct/ParDoEvaluator.java     |  16 +-
 .../runners/direct/StepTransformResult.java     |  15 +-
 .../beam/runners/direct/TransformResult.java    |   9 +-
 .../runners/direct/AggregatorContainerTest.java | 134 ++++++++++++++
 .../beam/runners/direct/DirectRunnerTest.java   |   4 -
 .../runners/direct/EvaluationContextTest.java   |  35 ++--
 .../beam/runners/direct/ParDoEvaluatorTest.java |  11 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  |  31 ++--
 .../direct/ParDoSingleEvaluatorFactoryTest.java |  31 ++--
 .../apache/beam/sdk/transforms/Aggregator.java  |  24 ++-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   7 +-
 .../apache/beam/sdk/util/CounterAggregator.java |  35 +++-
 .../apache/beam/sdk/transforms/DoFnTest.java    |  35 ++++
 19 files changed, 592 insertions(+), 151 deletions(-)
----------------------------------------------------------------------