You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/11 18:41:18 UTC

[2/3] incubator-beam git commit: Remove Counter and associated code

Remove Counter and associated code

Aggregator is the model level concept. Counter was specific to the
Dataflow Runner, and is now not needed as part of Beam.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d20a7ada
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d20a7ada
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d20a7ada

Branch: refs/heads/master
Commit: d20a7ada7eb3ee714917e7c334e1b15ecc2c3b03
Parents: 2a1055d
Author: bchambers <bc...@google.com>
Authored: Fri Jul 29 09:41:17 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Aug 11 10:26:04 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/DoFnRunners.java   |   78 --
 .../beam/runners/dataflow/DataflowRunner.java   |    4 +-
 .../org/apache/beam/sdk/transforms/Combine.java |   13 -
 .../org/apache/beam/sdk/transforms/Max.java     |   27 +-
 .../org/apache/beam/sdk/transforms/Min.java     |   28 +-
 .../org/apache/beam/sdk/transforms/Sum.java     |   27 +-
 .../apache/beam/sdk/util/CounterAggregator.java |  128 --
 .../apache/beam/sdk/util/common/Counter.java    | 1287 ------------------
 .../beam/sdk/util/common/CounterName.java       |  153 ---
 .../beam/sdk/util/common/CounterProvider.java   |   27 -
 .../apache/beam/sdk/util/common/CounterSet.java |  179 ---
 .../util/common/ElementByteSizeObserver.java    |   24 +-
 .../beam/sdk/util/CounterAggregatorTest.java    |  256 ----
 .../beam/sdk/util/common/CounterSetTest.java    |  227 ---
 .../beam/sdk/util/common/CounterTest.java       |  736 ----------
 15 files changed, 15 insertions(+), 3179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
index a9f3cf4..6089228 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
@@ -23,8 +23,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -72,33 +70,6 @@ public class DoFnRunners {
   }
 
   /**
-   * Returns a basic implementation of {@link DoFnRunner} that works for most
-   * {@link OldDoFn OldDoFns}.
-   *
-   * <p>It invokes {@link OldDoFn#processElement} for each input.
-   */
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      CounterSet.AddCounterMutator addCounterMutator,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return simpleRunner(options,
-        fn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        CounterAggregator.factoryFor(addCounterMutator),
-        windowingStrategy);
-  }
-
-  /**
    * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
    *
    * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
@@ -132,33 +103,6 @@ public class DoFnRunners {
         reduceFnExecutor.getDroppedDueToLatenessAggregator());
   }
 
-  /**
-   * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
-   *
-   * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
-   */
-  public static <K, InputT, OutputT, W extends BoundedWindow>
-  DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
-      PipelineOptions options,
-      ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<KV<K, OutputT>> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      CounterSet.AddCounterMutator addCounterMutator,
-      WindowingStrategy<?, W> windowingStrategy) {
-    return lateDataDroppingRunner(
-        options,
-        reduceFnExecutor,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        CounterAggregator.factoryFor(addCounterMutator),
-        windowingStrategy);
-  }
 
   public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
       PipelineOptions options,
@@ -197,26 +141,4 @@ public class DoFnRunners {
         aggregatorFactory,
         windowingStrategy);
   }
-
-  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
-      PipelineOptions options,
-      OldDoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      StepContext stepContext,
-      AddCounterMutator addCounterMutator,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    return createDefault(
-        options,
-        doFn,
-        sideInputReader,
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        stepContext,
-        CounterAggregator.factoryFor(addCounterMutator),
-        windowingStrategy);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index bea6264..667a63b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -212,9 +212,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // Default Docker container images that execute Dataflow worker harness, residing in Google
   // Container Registry, separately for Batch and Streaming.
   public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE =
-      "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160804-dofn";
+      "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160810";
   public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE =
-      "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160804-dofn";
+      "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160810";
 
   // The limit of CreateJob request size.
   private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index a825800..6ba3f8a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.Counter;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -736,10 +735,6 @@ public class Combine {
       return new int[] { value };
     }
 
-    public Counter<Integer> getCounter(@SuppressWarnings("unused") String name) {
-      throw new UnsupportedOperationException("BinaryCombineIntegerFn does not support getCounter");
-    }
-
     private static final class ToIntegerCodingFunction
         implements DelegateCoder.CodingFunction<int[], Integer> {
       @Override
@@ -839,10 +834,6 @@ public class Combine {
       return new long[] { value };
     }
 
-    public Counter<Long> getCounter(@SuppressWarnings("unused") String name) {
-      throw new UnsupportedOperationException("BinaryCombineLongFn does not support getCounter");
-    }
-
     private static final class ToLongCodingFunction
         implements DelegateCoder.CodingFunction<long[], Long> {
       @Override
@@ -944,10 +935,6 @@ public class Combine {
       return new double[] { value };
     }
 
-    public Counter<Double> getCounter(@SuppressWarnings("unused") String name) {
-      throw new UnsupportedOperationException("BinaryCombineDoubleFn does not support getCounter");
-    }
-
     private static final class ToDoubleCodingFunction
         implements DelegateCoder.CodingFunction<double[], Double> {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 52617b6..eed13fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -19,9 +19,6 @@ package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterProvider;
 
 import java.io.Serializable;
 import java.util.Comparator;
@@ -218,8 +215,7 @@ public class Max {
    * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn
-      implements CounterProvider<Integer> {
+  public static class MaxIntegerFn extends Combine.BinaryCombineIntegerFn {
     @Override
     public int apply(int left, int right) {
       return left >= right ? left : right;
@@ -229,19 +225,13 @@ public class Max {
     public int identity() {
       return Integer.MIN_VALUE;
     }
-
-    @Override
-    public Counter<Integer> getCounter(String name) {
-      return Counter.ints(name, AggregationKind.MAX);
-    }
   }
 
   /**
    * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MaxLongFn extends Combine.BinaryCombineLongFn
-      implements CounterProvider<Long> {
+  public static class MaxLongFn extends Combine.BinaryCombineLongFn {
     @Override
     public long apply(long left, long right) {
       return left >= right ? left : right;
@@ -251,19 +241,13 @@ public class Max {
     public long identity() {
       return Long.MIN_VALUE;
     }
-
-    @Override
-    public Counter<Long> getCounter(String name) {
-      return Counter.longs(name, AggregationKind.MAX);
-    }
   }
 
   /**
    * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn
-      implements CounterProvider<Double> {
+  public static class MaxDoubleFn extends Combine.BinaryCombineDoubleFn {
     @Override
     public double apply(double left, double right) {
       return left >= right ? left : right;
@@ -273,10 +257,5 @@ public class Max {
     public double identity() {
       return Double.NEGATIVE_INFINITY;
     }
-
-    @Override
-    public Counter<Double> getCounter(String name) {
-      return Counter.doubles(name, AggregationKind.MAX);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index 3159134..9c9d14f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -19,9 +19,6 @@ package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterProvider;
 
 import java.io.Serializable;
 import java.util.Comparator;
@@ -218,8 +215,7 @@ public class Min {
    * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn
-      implements CounterProvider<Integer> {
+  public static class MinIntegerFn extends Combine.BinaryCombineIntegerFn {
 
     @Override
     public int apply(int left, int right) {
@@ -230,20 +226,13 @@ public class Min {
     public int identity() {
       return Integer.MAX_VALUE;
     }
-
-    @Override
-    public Counter<Integer> getCounter(String name) {
-      return Counter.ints(name, AggregationKind.MIN);
-    }
   }
 
   /**
    * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MinLongFn extends Combine.BinaryCombineLongFn
-      implements CounterProvider<Long> {
-
+  public static class MinLongFn extends Combine.BinaryCombineLongFn {
     @Override
     public long apply(long left, long right) {
       return left <= right ? left : right;
@@ -253,19 +242,13 @@ public class Min {
     public long identity() {
       return Long.MAX_VALUE;
     }
-
-    @Override
-    public Counter<Long> getCounter(String name) {
-      return Counter.longs(name, AggregationKind.MIN);
-    }
   }
 
   /**
    * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an
    * argument to {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn
-      implements CounterProvider<Double> {
+  public static class MinDoubleFn extends Combine.BinaryCombineDoubleFn {
 
     @Override
     public double apply(double left, double right) {
@@ -276,10 +259,5 @@ public class Min {
     public double identity() {
       return Double.POSITIVE_INFINITY;
     }
-
-    @Override
-    public Counter<Double> getCounter(String name) {
-      return Counter.doubles(name, AggregationKind.MIN);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 07f78b5..27c5ced 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.Counter.AggregationKind;
-import org.apache.beam.sdk.util.common.CounterProvider;
-
 /**
  * {@code PTransform}s for computing the sum of the elements in a
  * {@code PCollection}, or the sum of the values associated with
@@ -123,8 +119,7 @@ public class Sum {
    * {@code Iterable} of {@code Integer}s, useful as an argument to
    * {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class SumIntegerFn
-      extends Combine.BinaryCombineIntegerFn implements CounterProvider<Integer> {
+  public static class SumIntegerFn extends Combine.BinaryCombineIntegerFn {
     @Override
     public int apply(int a, int b) {
       return a + b;
@@ -134,11 +129,6 @@ public class Sum {
     public int identity() {
       return 0;
     }
-
-    @Override
-    public Counter<Integer> getCounter(String name) {
-      return Counter.ints(name, AggregationKind.SUM);
-    }
   }
 
   /**
@@ -147,7 +137,7 @@ public class Sum {
    * {@link Combine#globally} or {@link Combine#perKey}.
    */
   public static class SumLongFn
-      extends Combine.BinaryCombineLongFn implements CounterProvider<Long> {
+      extends Combine.BinaryCombineLongFn {
     @Override
     public long apply(long a, long b) {
       return a + b;
@@ -157,11 +147,6 @@ public class Sum {
     public long identity() {
       return 0;
     }
-
-    @Override
-    public Counter<Long> getCounter(String name) {
-      return Counter.longs(name, AggregationKind.SUM);
-    }
   }
 
   /**
@@ -169,8 +154,7 @@ public class Sum {
    * {@code Iterable} of {@code Double}s, useful as an argument to
    * {@link Combine#globally} or {@link Combine#perKey}.
    */
-  public static class SumDoubleFn
-      extends Combine.BinaryCombineDoubleFn implements CounterProvider<Double> {
+  public static class SumDoubleFn extends Combine.BinaryCombineDoubleFn {
     @Override
     public double apply(double a, double b) {
       return a + b;
@@ -180,10 +164,5 @@ public class Sum {
     public double identity() {
       return 0;
     }
-
-    @Override
-    public Counter<Double> getCounter(String name) {
-      return Counter.doubles(name, AggregationKind.SUM);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
deleted file mode 100644
index 5bde8ef..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterProvider;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * An implementation of the {@code Aggregator} interface that uses a
- * {@link Counter} as the underlying representation. Supports {@link CombineFn}s
- * from the {@link Sum}, {@link Min} and {@link Max} classes.
- *
- * @param <InputT> the type of input values
- * @param <AccumT> the type of accumulator values
- * @param <OutputT> the type of output value
- */
-public class CounterAggregator<InputT, AccumT, OutputT>
-    implements Aggregator<InputT, OutputT> {
-
-  private static class CounterAggregatorFactory implements AggregatorFactory {
-    private final AddCounterMutator addCounterMutator;
-
-    private CounterAggregatorFactory(CounterSet.AddCounterMutator addCounterMutator) {
-      this.addCounterMutator = addCounterMutator;
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-        Class<?> fnClass, ExecutionContext.StepContext stepContext,
-        String userName, CombineFn<InputT, AccumT, OutputT> combine) {
-      boolean isSystem = fnClass.isAnnotationPresent(SystemDoFnInternal.class);
-      String mangledName = (isSystem ? "" : "user-") + stepContext.getStepName() + "-" + userName;
-
-      return new CounterAggregator<>(mangledName, combine, addCounterMutator);
-    }
-  }
-
-  private final Counter<InputT> counter;
-  private final CombineFn<InputT, AccumT, OutputT> combiner;
-
-  /**
-   * Create a factory for producing {@link CounterAggregator CounterAggregators} backed by the given
-   * {@link CounterSet.AddCounterMutator}.
-   */
-  public static AggregatorFactory factoryFor(
-      CounterSet.AddCounterMutator addCounterMutator) {
-    return new CounterAggregatorFactory(addCounterMutator);
-  }
-
-  /**
-   * Constructs a new aggregator with the given name and aggregation logic
-   * specified in the CombineFn argument. The underlying counter is
-   * automatically added into the provided CounterSet.
-   *
-   *  <p>If a counter with the same name already exists, it will be reused, as
-   * long as it has the same type.
-   */
-  @VisibleForTesting CounterAggregator(
-      String name, CombineFn<? super InputT, AccumT, OutputT> combiner,
-      CounterSet.AddCounterMutator addCounterMutator) {
-    // Safe contravariant cast
-    this(constructCounter(name, combiner), addCounterMutator,
-        (CombineFn<InputT, AccumT, OutputT>) combiner);
-  }
-
-  private CounterAggregator(Counter<InputT> counter,
-      CounterSet.AddCounterMutator addCounterMutator,
-      CombineFn<InputT, AccumT, OutputT> combiner) {
-    try {
-      this.counter = addCounterMutator.addCounter(counter);
-    } catch (IllegalArgumentException ex) {
-      throw new IllegalArgumentException(
-          "aggregator's name collides with an existing aggregator "
-          + "or system-provided counter of an incompatible type");
-    }
-    this.combiner = combiner;
-  }
-
-  private static <T> Counter<T> constructCounter(String name,
-      CombineFn<? super T, ?, ?> combiner) {
-    if (combiner instanceof CounterProvider) {
-      @SuppressWarnings("unchecked")
-      CounterProvider<T> counterProvider = (CounterProvider<T>) combiner;
-      return counterProvider.getCounter(name);
-    } else {
-      throw new IllegalArgumentException("unsupported combiner in Aggregator: "
-        + combiner.getClass().getName());
-    }
-  }
-
-  @Override
-  public void addValue(InputT value) {
-    counter.addValue(value);
-  }
-
-  @Override
-  public String getName() {
-    return counter.getFlatName();
-  }
-
-  @Override
-  public CombineFn<InputT, ?, OutputT> getCombineFn() {
-    return combiner;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
deleted file mode 100644
index 550c648..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
+++ /dev/null
@@ -1,1287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.AND;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MEAN;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.OR;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.AtomicDouble;
-
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * A Counter enables the aggregation of a stream of values over time.  The
- * cumulative aggregate value is updated as new values are added, or it can be
- * reset to a new value.  Multiple kinds of aggregation are supported depending
- * on the type of the counter.
- *
- * <p>Counters compare using value equality of their name, kind, and
- * cumulative value.  Equal counters should have equal toString()s.
- *
- * <p>After all possible mutations have completed, the reader should check
- * {@link #isDirty} for every counter, otherwise updates may be lost.
- *
- * <p>A counter may become dirty without a corresponding update to the value.
- * This generally will occur when the calls to {@code addValue()}, {@code committing()},
- * and {@code committed()} are interleaved such that the value is updated
- * between the calls to committing and the read of the value.
- *
- * @param <T> the type of values aggregated by this counter
- */
-public abstract class Counter<T> {
-  /**
-   * Possible kinds of counter aggregation.
-   */
-  public static enum AggregationKind {
-
-    /**
-     * Computes the sum of all added values.
-     * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    SUM,
-
-    /**
-     * Computes the maximum value of all added values.
-     * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    MAX,
-
-    /**
-     * Computes the minimum value of all added values.
-     * Applicable to {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    MIN,
-
-    /**
-     * Computes the arithmetic mean of all added values.  Applicable to
-     * {@link Integer}, {@link Long}, and {@link Double} values.
-     */
-    MEAN,
-
-    /**
-     * Computes boolean AND over all added values.
-     * Applicable only to {@link Boolean} values.
-     */
-    AND,
-
-    /**
-     * Computes boolean OR over all added values. Applicable only to
-     * {@link Boolean} values.
-     */
-    OR
-    // TODO: consider adding VECTOR_SUM, HISTOGRAM, KV_SET, PRODUCT, TOP.
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Integer}, values
-   * according to the desired aggregation kind. The supported aggregation kinds
-   * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
-   * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
-   * This is a convenience wrapper over a
-   * {@link Counter} implementation that aggregates {@link Long} values. This is
-   * useful when the application handles (boxed) {@link Integer} values that
-   * are not readily convertible to the (boxed) {@link Long} values otherwise
-   * expected by the {@link Counter} implementation aggregating {@link Long}
-   * values.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Integer> ints(CounterName name, AggregationKind kind) {
-    return new IntegerCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@code Counter<Integer>} with an unstructured name.
-   *
-   * @deprecated use {@link #ints(CounterName, AggregationKind)}.
-   */
-  @Deprecated
-  public static Counter<Integer> ints(String name, AggregationKind kind) {
-    return new IntegerCounter(CounterName.named(name), kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Long} values
-   * according to the desired aggregation kind. The supported aggregation kinds
-   * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
-   * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Long> longs(CounterName name, AggregationKind kind) {
-    return new LongCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@code Counter<Long>} with an unstructured name.
-   *
-   * @deprecated use {@link #longs(CounterName, AggregationKind)}.
-   */
-  @Deprecated
-  public static Counter<Long> longs(String name, AggregationKind kind) {
-    return new LongCounter(CounterName.named(name), kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Double} values
-   * according to the desired aggregation kind. The supported aggregation kinds
-   * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
-   * {@link AggregationKind#MAX}, and {@link AggregationKind#MEAN}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Double> doubles(CounterName name, AggregationKind kind) {
-    return new DoubleCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@code Counter<Double>} with an unstructured name.
-   *
-   * @deprecated use {@link #doubles(CounterName, AggregationKind)}.
-   */
-  @Deprecated
-  public static Counter<Double> doubles(String name, AggregationKind kind) {
-    return new DoubleCounter(CounterName.named(name), kind);
-  }
-
-  /**
-   * Constructs a new {@link Counter} that aggregates {@link Boolean} values
-   * according to the desired aggregation kind. The only supported aggregation
-   * kinds are {@link AggregationKind#AND} and {@link AggregationKind#OR}.
-   *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
-   */
-  public static Counter<Boolean> booleans(CounterName name, AggregationKind kind) {
-    return new BooleanCounter(name, kind);
-  }
-
-  /**
-   * Constructs a new {@code Counter<Boolean>} with an unstructured name.
-   *
-   * @deprecated use {@link #booleans(CounterName, AggregationKind)}.
-   */
-  @Deprecated
-  public static Counter<Boolean> booleans(String name, AggregationKind kind) {
-    return new BooleanCounter(CounterName.named(name), kind);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Adds a new value to the aggregation stream. Returns this (to allow method
-   * chaining).
-   */
-  public abstract Counter<T> addValue(T value);
-
-  /**
-   * Resets the aggregation stream to this new value. This aggregator must not
-   * be a MEAN aggregator. Returns this (to allow method chaining).
-   */
-  public abstract Counter<T> resetToValue(T value);
-
-  /**
-   * Resets the aggregation stream to this new value. Returns this (to allow
-   * method chaining). The value of elementCount must be non-negative, and this
-   * aggregator must be a MEAN aggregator.
-   */
-  public abstract Counter<T> resetMeanToValue(long elementCount, T value);
-
-  /**
-   * Resets the counter's delta value to have no values accumulated and returns
-   * the value of the delta prior to the reset.
-   *
-   * @return the aggregate delta at the time this method is called
-   */
-  public abstract T getAndResetDelta();
-
-  /**
-   * Resets the counter's delta value to have no values accumulated and returns
-   * the value of the delta prior to the reset, for a MEAN counter.
-   *
-   * @return the mean delta t the time this method is called
-   */
-  public abstract CounterMean<T> getAndResetMeanDelta();
-
-  /**
-   * Returns the counter's flat name.
-   */
-  public String getFlatName() {
-    return name.getFlatName();
-  }
-
-  /**
-   * Returns the counter's name.
-   *
-   * @deprecated use {@link #getFlatName}.
-   */
-  @Deprecated
-  public String getName() {
-    return name.getFlatName();
-  }
-
-  /**
-   * Returns the counter's aggregation kind.
-   */
-  public AggregationKind getKind() {
-    return kind;
-  }
-
-  /**
-   * Returns the counter's type.
-   */
-  public Class<?> getType() {
-    return new TypeDescriptor<T>(getClass()) {}.getRawType();
-  }
-
-  /**
-   * Returns the aggregated value, or the sum for MEAN aggregation, either
-   * total or, if delta, since the last update extraction or resetDelta.
-   */
-  public abstract T getAggregate();
-
-  /**
-   * The mean value of a {@code Counter}, represented as an aggregate value and
-   * a count.
-   *
-   * @param <T> the type of the aggregate
-   */
-  public static interface CounterMean<T> {
-    /**
-     * Gets the aggregate value of this {@code CounterMean}.
-     */
-    T getAggregate();
-
-    /**
-     * Gets the count of this {@code CounterMean}.
-     */
-    long getCount();
-  }
-
-  /**
-   * Returns the mean in the form of a CounterMean, or null if this is not a
-   * MEAN counter.
-   */
-  @Nullable
-  public abstract CounterMean<T> getMean();
-
-  /**
-   * Represents whether counters' values have been committed to the backend.
-   *
-   * <p>Runners can use this information to optimize counters updates.
-   * For example, if counters are committed, runners may choose to skip the updates.
-   *
-   * <p>Counters' state transition table:
-   * {@code
-   * Action\Current State         COMMITTED        DIRTY        COMMITTING
-   * addValue()                   DIRTY            DIRTY        DIRTY
-   * committing()                 None             COMMITTING   None
-   * committed()                  None             None         COMMITTED
-   * }
-   */
-  @VisibleForTesting
-  enum CommitState {
-    /**
-     * There are no local updates that haven't been committed to the backend.
-     */
-    COMMITTED,
-    /**
-     * There are local updates that haven't been committed to the backend.
-     */
-    DIRTY,
-    /**
-     * Local updates are committing to the backend, but are still pending.
-     */
-    COMMITTING,
-  }
-
-  /**
-   * Returns if the counter contains non-committed aggregate.
-   */
-  public boolean isDirty() {
-    return commitState.get() != CommitState.COMMITTED;
-  }
-
-  /**
-   * Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}.
-   *
-   * @return true if successful. False return indicates that the commit state
-   * was not in {@code CommitState.DIRTY}.
-   */
-  public boolean committing() {
-    return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING);
-  }
-
-  /**
-   * Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}.
-   *
-   * @return true if successful.
-   *
-   * <p>False return indicates that the counter was updated while the committing is pending.
-   * That counter update might or might not has been committed. The {@code commitState} has to
-   * stay in {@code CommitState.DIRTY}.
-   */
-  public boolean committed() {
-    return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED);
-  }
-
-  /**
-   * Sets the counter to {@code CommitState.DIRTY}.
-   *
-   * <p>Must be called at the end of {@link #addValue}, {@link #resetToValue},
-   * {@link #resetMeanToValue}, and {@link #merge}.
-   */
-  protected void setDirty() {
-    commitState.set(CommitState.DIRTY);
-  }
-
-  /**
-   * Returns a string representation of the Counter. Useful for debugging logs.
-   * Example return value: "ElementCount:SUM(15)".
-   */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(getFlatName());
-    sb.append(":");
-    sb.append(getKind());
-    sb.append("(");
-    switch (kind) {
-      case SUM:
-      case MAX:
-      case MIN:
-      case AND:
-      case OR:
-        sb.append(getAggregate());
-        break;
-      case MEAN:
-        sb.append(getMean());
-        break;
-      default:
-        throw illegalArgumentException();
-    }
-    sb.append(")");
-
-    return sb.toString();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    } else if (o instanceof Counter) {
-      Counter<?> that = (Counter<?>) o;
-      if (this.name.equals(that.name) && this.kind == that.kind
-          && this.getClass().equals(that.getClass())) {
-        if (kind == MEAN) {
-          CounterMean<T> thisMean = this.getMean();
-          CounterMean<?> thatMean = that.getMean();
-          return thisMean == thatMean
-              || (Objects.equals(thisMean.getAggregate(), thatMean.getAggregate())
-                     && thisMean.getCount() == thatMean.getCount());
-        } else {
-          return Objects.equals(this.getAggregate(), that.getAggregate());
-        }
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    if (kind == MEAN) {
-      CounterMean<T> mean = getMean();
-      return Objects.hash(getClass(), name, kind, mean.getAggregate(), mean.getCount());
-    } else {
-      return Objects.hash(getClass(), name, kind, getAggregate());
-    }
-  }
-
-  /**
-   * Returns whether this Counter is compatible with that Counter.  If
-   * so, they can be merged into a single Counter.
-   */
-  public boolean isCompatibleWith(Counter<?> that) {
-    return this.name.equals(that.name)
-        && this.kind == that.kind
-        && this.getClass().equals(that.getClass());
-  }
-
-  /**
-   * Merges this counter with the provided counter, returning this counter with the combined value
-   * of both counters. This may reset the delta of this counter.
-   *
-   * @throws IllegalArgumentException if the provided Counter is not compatible with this Counter
-   */
-  public abstract Counter<T> merge(Counter<T> that);
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /** The name and metadata of this counter. */
-  protected final CounterName name;
-
-  /** The kind of aggregation function to apply to this counter. */
-  protected final AggregationKind kind;
-
-  /** The commit state of this counter. */
-  protected final AtomicReference<CommitState> commitState;
-
-  protected Counter(CounterName name, AggregationKind kind) {
-    this.name = name;
-    this.kind = kind;
-    this.commitState = new AtomicReference<>(CommitState.COMMITTED);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Implements a {@link Counter} for {@link Long} values.
-   */
-  private static class LongCounter extends Counter<Long> {
-    private final AtomicLong aggregate;
-    private final AtomicLong deltaAggregate;
-    private final AtomicReference<LongCounterMean> mean;
-    private final AtomicReference<LongCounterMean> deltaMean;
-
-    /** Initializes a new {@link Counter} for {@link Long} values. */
-    private LongCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      switch (kind) {
-        case MEAN:
-          mean = new AtomicReference<>();
-          deltaMean = new AtomicReference<>();
-          getAndResetMeanDelta();
-          mean.set(deltaMean.get());
-          aggregate = deltaAggregate = null;
-          break;
-        case SUM:
-        case MAX:
-        case MIN:
-          aggregate = new AtomicLong();
-          deltaAggregate = new AtomicLong();
-          getAndResetDelta();
-          aggregate.set(deltaAggregate.get());
-          mean = deltaMean = null;
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public LongCounter addValue(Long value) {
-      try {
-        switch (kind) {
-          case SUM:
-            aggregate.addAndGet(value);
-            deltaAggregate.addAndGet(value);
-            break;
-          case MEAN:
-            addToMeanAndSet(value, mean);
-            addToMeanAndSet(value, deltaMean);
-            break;
-          case MAX:
-            maxAndSet(value, aggregate);
-            maxAndSet(value, deltaAggregate);
-            break;
-          case MIN:
-            minAndSet(value, aggregate);
-            minAndSet(value, deltaAggregate);
-            break;
-          default:
-            throw illegalArgumentException();
-        }
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    private void minAndSet(Long value, AtomicLong target) {
-      long current;
-      long update;
-      do {
-        current = target.get();
-        update = Math.min(value, current);
-      } while (update < current && !target.compareAndSet(current, update));
-    }
-
-    private void maxAndSet(Long value, AtomicLong target) {
-      long current;
-      long update;
-      do {
-        current = target.get();
-        update = Math.max(value, current);
-      } while (update > current && !target.compareAndSet(current, update));
-    }
-
-    private void addToMeanAndSet(Long value, AtomicReference<LongCounterMean> target) {
-      LongCounterMean current;
-      LongCounterMean update;
-      do {
-        current = target.get();
-        update = new LongCounterMean(current.getAggregate() + value, current.getCount() + 1L);
-      } while (!target.compareAndSet(current, update));
-    }
-
-    @Override
-    public Long getAggregate() {
-      if (kind != MEAN) {
-        return aggregate.get();
-      } else {
-        return getMean().getAggregate();
-      }
-    }
-
-    @Override
-    public Long getAndResetDelta() {
-      switch (kind) {
-        case SUM:
-          return deltaAggregate.getAndSet(0L);
-        case MAX:
-          return deltaAggregate.getAndSet(Long.MIN_VALUE);
-        case MIN:
-          return deltaAggregate.getAndSet(Long.MAX_VALUE);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Long> resetToValue(Long value) {
-      try {
-        if (kind == MEAN) {
-          throw illegalArgumentException();
-        }
-        aggregate.set(value);
-        deltaAggregate.set(value);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Counter<Long> resetMeanToValue(long elementCount, Long value) {
-      try {
-        if (kind != MEAN) {
-          throw illegalArgumentException();
-        }
-        if (elementCount < 0) {
-          throw new IllegalArgumentException("elementCount must be non-negative");
-        }
-        LongCounterMean counterMean = new LongCounterMean(value, elementCount);
-        mean.set(counterMean);
-        deltaMean.set(counterMean);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public CounterMean<Long> getAndResetMeanDelta() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return deltaMean.getAndSet(new LongCounterMean(0L, 0L));
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Long> getMean() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return mean.get();
-    }
-
-    @Override
-    public Counter<Long> merge(Counter<Long> that) {
-      try {
-        checkArgument(
-            this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-        switch (kind) {
-          case SUM:
-          case MIN:
-          case MAX:
-            return addValue(that.getAggregate());
-          case MEAN:
-            CounterMean<Long> thisCounterMean = this.getMean();
-            CounterMean<Long> thatCounterMean = that.getMean();
-            return resetMeanToValue(
-                thisCounterMean.getCount() + thatCounterMean.getCount(),
-                thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-          default:
-            throw illegalArgumentException();
-        }
-      } finally {
-        setDirty();
-      }
-    }
-
-    private static class LongCounterMean implements CounterMean<Long> {
-      private final long aggregate;
-      private final long count;
-
-      public LongCounterMean(long aggregate, long count) {
-        this.aggregate = aggregate;
-        this.count = count;
-      }
-
-      @Override
-      public Long getAggregate() {
-        return aggregate;
-      }
-
-      @Override
-      public long getCount() {
-        return count;
-      }
-
-      @Override
-      public String toString() {
-        return aggregate + "/" + count;
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link Double} values.
-   */
-  private static class DoubleCounter extends Counter<Double> {
-    AtomicDouble aggregate;
-    AtomicDouble deltaAggregate;
-    AtomicReference<DoubleCounterMean> mean;
-    AtomicReference<DoubleCounterMean> deltaMean;
-
-    /** Initializes a new {@link Counter} for {@link Double} values. */
-    private DoubleCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      switch (kind) {
-        case MEAN:
-          aggregate = deltaAggregate = null;
-          mean = new AtomicReference<>();
-          deltaMean = new AtomicReference<>();
-          getAndResetMeanDelta();
-          mean.set(deltaMean.get());
-          break;
-        case SUM:
-        case MAX:
-        case MIN:
-          mean = deltaMean = null;
-          aggregate = new AtomicDouble();
-          deltaAggregate = new AtomicDouble();
-          getAndResetDelta();
-          aggregate.set(deltaAggregate.get());
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public DoubleCounter addValue(Double value) {
-      try {
-        switch (kind) {
-          case SUM:
-            aggregate.addAndGet(value);
-            deltaAggregate.addAndGet(value);
-            break;
-          case MEAN:
-            addToMeanAndSet(value, mean);
-            addToMeanAndSet(value, deltaMean);
-            break;
-          case MAX:
-            maxAndSet(value, aggregate);
-            maxAndSet(value, deltaAggregate);
-            break;
-          case MIN:
-            minAndSet(value, aggregate);
-            minAndSet(value, deltaAggregate);
-            break;
-          default:
-            throw illegalArgumentException();
-        }
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    private void addToMeanAndSet(Double value, AtomicReference<DoubleCounterMean> target) {
-      DoubleCounterMean current;
-      DoubleCounterMean update;
-      do {
-        current = target.get();
-        update = new DoubleCounterMean(current.getAggregate() + value, current.getCount() + 1);
-      } while (!target.compareAndSet(current, update));
-    }
-
-    private void maxAndSet(Double value, AtomicDouble target) {
-      double current;
-      double update;
-      do {
-        current = target.get();
-        update = Math.max(current, value);
-      } while (update > current && !target.compareAndSet(current, update));
-    }
-
-    private void minAndSet(Double value, AtomicDouble target) {
-      double current;
-      double update;
-      do {
-        current = target.get();
-        update = Math.min(current, value);
-      } while (update < current && !target.compareAndSet(current, update));
-    }
-
-    @Override
-    public Double getAndResetDelta() {
-      switch (kind) {
-        case SUM:
-          return deltaAggregate.getAndSet(0.0);
-        case MAX:
-          return deltaAggregate.getAndSet(Double.NEGATIVE_INFINITY);
-        case MIN:
-          return deltaAggregate.getAndSet(Double.POSITIVE_INFINITY);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Double> resetToValue(Double value) {
-      try {
-        if (kind == MEAN) {
-          throw illegalArgumentException();
-        }
-        aggregate.set(value);
-        deltaAggregate.set(value);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Counter<Double> resetMeanToValue(long elementCount, Double value) {
-      try {
-        if (kind != MEAN) {
-          throw illegalArgumentException();
-        }
-        if (elementCount < 0) {
-          throw new IllegalArgumentException("elementCount must be non-negative");
-        }
-        DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount);
-        mean.set(counterMean);
-        deltaMean.set(counterMean);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public CounterMean<Double> getAndResetMeanDelta() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return deltaMean.getAndSet(new DoubleCounterMean(0.0, 0L));
-    }
-
-    @Override
-    public Double getAggregate() {
-      if (kind != MEAN) {
-        return aggregate.get();
-      } else {
-        return getMean().getAggregate();
-      }
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Double> getMean() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return mean.get();
-    }
-
-    @Override
-    public Counter<Double> merge(Counter<Double> that) {
-      try {
-        checkArgument(
-            this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-        switch (kind) {
-          case SUM:
-          case MIN:
-          case MAX:
-            return addValue(that.getAggregate());
-          case MEAN:
-            CounterMean<Double> thisCounterMean = this.getMean();
-            CounterMean<Double> thatCounterMean = that.getMean();
-            return resetMeanToValue(
-                thisCounterMean.getCount() + thatCounterMean.getCount(),
-                thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-          default:
-            throw illegalArgumentException();
-        }
-      } finally {
-        setDirty();
-      }
-    }
-
-    private static class DoubleCounterMean implements CounterMean<Double> {
-      private final double aggregate;
-      private final long count;
-
-      public DoubleCounterMean(double aggregate, long count) {
-        this.aggregate = aggregate;
-        this.count = count;
-      }
-
-      @Override
-      public Double getAggregate() {
-        return aggregate;
-      }
-
-      @Override
-      public long getCount() {
-        return count;
-      }
-
-      @Override
-      public String toString() {
-        return aggregate + "/" + count;
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link Boolean} values.
-   */
-  private static class BooleanCounter extends Counter<Boolean> {
-    private final AtomicBoolean aggregate;
-    private final AtomicBoolean deltaAggregate;
-
-    /** Initializes a new {@link Counter} for {@link Boolean} values. */
-    private BooleanCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      aggregate = new AtomicBoolean();
-      deltaAggregate = new AtomicBoolean();
-      getAndResetDelta();
-      aggregate.set(deltaAggregate.get());
-    }
-
-    @Override
-    public BooleanCounter addValue(Boolean value) {
-      try {
-        if (kind.equals(AND) && !value) {
-          aggregate.set(value);
-          deltaAggregate.set(value);
-        } else if (kind.equals(OR) && value) {
-          aggregate.set(value);
-          deltaAggregate.set(value);
-        }
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Boolean getAndResetDelta() {
-      switch (kind) {
-        case AND:
-          return deltaAggregate.getAndSet(true);
-        case OR:
-          return deltaAggregate.getAndSet(false);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Boolean> resetToValue(Boolean value) {
-      try {
-        aggregate.set(value);
-        deltaAggregate.set(value);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Counter<Boolean> resetMeanToValue(long elementCount, Boolean value) {
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public CounterMean<Boolean> getAndResetMeanDelta() {
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public Boolean getAggregate() {
-      return aggregate.get();
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Boolean> getMean() {
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public Counter<Boolean> merge(Counter<Boolean> that) {
-      try {
-        checkArgument(
-            this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-        return addValue(that.getAggregate());
-      } finally {
-        setDirty();
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link String} values.
-   */
-  private static class StringCounter extends Counter<String> {
-    /** Initializes a new {@link Counter} for {@link String} values. */
-    private StringCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      // TODO: Support MIN, MAX of Strings.
-      throw illegalArgumentException();
-    }
-
-    @Override
-    public StringCounter addValue(String value) {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<String> resetToValue(String value) {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<String> resetMeanToValue(long elementCount, String value) {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public String getAndResetDelta() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public CounterMean<String> getAndResetMeanDelta() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public String getAggregate() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<String> getMean() {
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<String> merge(Counter<String> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-      switch (kind) {
-        default:
-          throw illegalArgumentException();
-      }
-    }
-  }
-
-  /**
-   * Implements a {@link Counter} for {@link Integer} values.
-   */
-  private static class IntegerCounter extends Counter<Integer> {
-    private final AtomicInteger aggregate;
-    private final AtomicInteger deltaAggregate;
-    private final AtomicReference<IntegerCounterMean> mean;
-    private final AtomicReference<IntegerCounterMean> deltaMean;
-
-    /** Initializes a new {@link Counter} for {@link Integer} values. */
-    private IntegerCounter(CounterName name, AggregationKind kind) {
-      super(name, kind);
-      switch (kind) {
-        case MEAN:
-          aggregate = deltaAggregate = null;
-          mean = new AtomicReference<>();
-          deltaMean = new AtomicReference<>();
-          getAndResetMeanDelta();
-          mean.set(deltaMean.get());
-          break;
-        case SUM:
-        case MAX:
-        case MIN:
-          mean = deltaMean = null;
-          aggregate = new AtomicInteger();
-          deltaAggregate = new AtomicInteger();
-          getAndResetDelta();
-          aggregate.set(deltaAggregate.get());
-          break;
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public IntegerCounter addValue(Integer value) {
-      try {
-        switch (kind) {
-          case SUM:
-            aggregate.getAndAdd(value);
-            deltaAggregate.getAndAdd(value);
-            break;
-          case MEAN:
-            addToMeanAndSet(value, mean);
-            addToMeanAndSet(value, deltaMean);
-            break;
-          case MAX:
-            maxAndSet(value, aggregate);
-            maxAndSet(value, deltaAggregate);
-            break;
-          case MIN:
-            minAndSet(value, aggregate);
-            minAndSet(value, deltaAggregate);
-            break;
-          default:
-            throw illegalArgumentException();
-        }
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    private void addToMeanAndSet(int value, AtomicReference<IntegerCounterMean> target) {
-      IntegerCounterMean current;
-      IntegerCounterMean update;
-      do {
-        current = target.get();
-        update = new IntegerCounterMean(current.getAggregate() + value, current.getCount() + 1);
-      } while (!target.compareAndSet(current, update));
-    }
-
-    private void maxAndSet(int value, AtomicInteger target) {
-      int current;
-      int update;
-      do {
-        current = target.get();
-        update = Math.max(value, current);
-      } while (update > current && !target.compareAndSet(current, update));
-    }
-
-    private void minAndSet(int value, AtomicInteger target) {
-      int current;
-      int update;
-      do {
-        current = target.get();
-        update = Math.min(value, current);
-      } while (update < current && !target.compareAndSet(current, update));
-    }
-
-    @Override
-    public Integer getAndResetDelta() {
-      switch (kind) {
-        case SUM:
-          return deltaAggregate.getAndSet(0);
-        case MAX:
-          return deltaAggregate.getAndSet(Integer.MIN_VALUE);
-        case MIN:
-          return deltaAggregate.getAndSet(Integer.MAX_VALUE);
-        default:
-          throw illegalArgumentException();
-      }
-    }
-
-    @Override
-    public Counter<Integer> resetToValue(Integer value) {
-      try {
-        if (kind == MEAN) {
-          throw illegalArgumentException();
-        }
-        aggregate.set(value);
-        deltaAggregate.set(value);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public Counter<Integer> resetMeanToValue(long elementCount, Integer value) {
-      try {
-        if (kind != MEAN) {
-          throw illegalArgumentException();
-        }
-        if (elementCount < 0) {
-          throw new IllegalArgumentException("elementCount must be non-negative");
-        }
-        IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount);
-        mean.set(counterMean);
-        deltaMean.set(counterMean);
-        return this;
-      } finally {
-        setDirty();
-      }
-    }
-
-    @Override
-    public CounterMean<Integer> getAndResetMeanDelta() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return deltaMean.getAndSet(new IntegerCounterMean(0, 0L));
-    }
-
-    @Override
-    public Integer getAggregate() {
-      if (kind != MEAN) {
-        return aggregate.get();
-      } else {
-        return getMean().getAggregate();
-      }
-    }
-
-    @Override
-    @Nullable
-    public CounterMean<Integer> getMean() {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      return mean.get();
-    }
-
-    @Override
-    public Counter<Integer> merge(Counter<Integer> that) {
-      try {
-        checkArgument(
-            this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that);
-        switch (kind) {
-          case SUM:
-          case MIN:
-          case MAX:
-            return addValue(that.getAggregate());
-          case MEAN:
-            CounterMean<Integer> thisCounterMean = this.getMean();
-            CounterMean<Integer> thatCounterMean = that.getMean();
-            return resetMeanToValue(
-                thisCounterMean.getCount() + thatCounterMean.getCount(),
-                thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-          default:
-            throw illegalArgumentException();
-        }
-      } finally {
-        setDirty();
-      }
-    }
-
-    private static class IntegerCounterMean implements CounterMean<Integer> {
-      private final int aggregate;
-      private final long count;
-
-      public IntegerCounterMean(int aggregate, long count) {
-        this.aggregate = aggregate;
-        this.count = count;
-      }
-
-      @Override
-      public Integer getAggregate() {
-        return aggregate;
-      }
-
-      @Override
-      public long getCount() {
-        return count;
-      }
-
-      @Override
-      public String toString() {
-        return aggregate + "/" + count;
-      }
-    }
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Constructs an {@link IllegalArgumentException} explaining that this
-   * {@link Counter}'s aggregation kind is not supported by its value type.
-   */
-  protected IllegalArgumentException illegalArgumentException() {
-    return new IllegalArgumentException("Cannot compute " + kind
-        + " aggregation over " + getType().getSimpleName() + " values.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
deleted file mode 100644
index b46be98..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Strings;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The name of a counter identifies the user-specified name, as well as the origin,
- * the step the counter is associated with, and a prefix to add to the name.
- *
- * <p>For backwards compatibility, the {@link CounterName} will be converted to
- * a flat name (string) during the migration.
- */
-public class CounterName {
-  /**
-   * Returns a {@link CounterName} with the given name.
-   */
-  public static CounterName named(String name) {
-    return new CounterName(name, "", "", "");
-  }
-
-  /**
-   * Returns a msecs {@link CounterName}.
-   */
-  public static CounterName msecs(String name) {
-    return named(name + "-msecs");
-  }
-
-  /**
-   * Returns a {@link CounterName} identical to this, but with the given origin.
-   */
-  public CounterName withOrigin(String origin) {
-    return new CounterName(this.name, origin, this.stepName, this.prefix);
-  }
-
-  /**
-   * Returns a {@link CounterName} identical to this, but with the given step name.
-   */
-  public CounterName withStepName(String stepName) {
-    return new CounterName(this.name, this.origin, stepName, this.prefix);
-  }
-
-  /**
-   * Returns a {@link CounterName} identical to this, but with the given prefix.
-   */
-  public CounterName withPrefix(String prefix) {
-    return new CounterName(this.name, this.origin, this.stepName, prefix);
-  }
-
-  /**
-   * Name of the counter.
-   *
-   * <p>For example, process-msecs, ElementCount.
-   */
-  private final String name;
-
-  /**
-   * Origin (namespace) of counter name.
-   *
-   * <p>For example, "user" for user-defined counters.
-   * It is empty for counters defined by the SDK or the runner.
-   */
-  private final String origin;
-
-  /**
-   * System defined step name or the named-output of a step.
-   *
-   * <p>For example, {@code s1} or {@code s2.out}.
-   * It may be empty when counters don't associate with step names.
-   */
-  private final String stepName;
-
-  /**
-   * Prefix of group of counters.
-   *
-   * <p>It is empty when counters don't have general prefixes.
-   */
-  private final String prefix;
-
-  /**
-   * Flat name is the equivalent unstructured name.
-   *
-   * <p>It is null before {@link #getFlatName()} is called.
-   */
-  private AtomicReference<String> flatName;
-
-  private CounterName(String name, String origin, String stepName, String prefix) {
-    this.name = checkNotNull(name, "name");
-    this.origin = checkNotNull(origin, "origin");
-    this.stepName = checkNotNull(stepName, "stepName");
-    this.prefix = checkNotNull(prefix, "prefix");
-    this.flatName = new AtomicReference<String>();
-  }
-
-  /**
-   * Returns the flat name of a structured counter.
-   */
-  public String getFlatName() {
-    String ret = flatName.get();
-    if (ret == null) {
-      StringBuilder sb = new StringBuilder();
-      if (!Strings.isNullOrEmpty(prefix)) {
-        // Not all runner versions use "-" to concatenate prefix, it may already have it in it.
-        sb.append(prefix);
-      }
-      if (!Strings.isNullOrEmpty(origin)) {
-        sb.append(origin + "-");
-      }
-      if (!Strings.isNullOrEmpty(stepName)) {
-        sb.append(stepName + "-");
-      }
-      sb.append(name);
-      flatName.compareAndSet(null, sb.toString());
-      ret = flatName.get();
-    }
-    return ret;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    } else if (o instanceof CounterName) {
-      CounterName that = (CounterName) o;
-      return this.getFlatName().equals(that.getFlatName());
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return getFlatName().hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java
deleted file mode 100644
index c2550cd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-/**
- * A counter provider can provide {@link Counter} instances.
- *
- * @param <T> the input type of the counter.
- */
-public interface CounterProvider<T> {
-  Counter<T> getCounter(String name);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
deleted file mode 100644
index cb0ffe5..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.AbstractSet;
-import java.util.HashMap;
-import java.util.Iterator;
-
-/**
- * A CounterSet maintains a set of {@link Counter}s.
- *
- * <p>Thread-safe.
- */
-public class CounterSet extends AbstractSet<Counter<?>> {
-
-  /** Registered counters. */
-  private final HashMap<String, Counter<?>> counters = new HashMap<>();
-
-  private final AddCounterMutator addCounterMutator = new AddCounterMutator();
-
-  /**
-   * Constructs a CounterSet containing the given Counters.
-   */
-  public CounterSet(Counter<?>... counters) {
-    for (Counter<?> counter : counters) {
-      addNewCounter(counter);
-    }
-  }
-
-  /**
-   * Returns an object that supports adding additional counters into
-   * this CounterSet.
-   */
-  public AddCounterMutator getAddCounterMutator() {
-    return addCounterMutator;
-  }
-
-  /**
-   * Adds a new counter, throwing an exception if a counter of the
-   * same name already exists.
-   */
-  public void addNewCounter(Counter<?> counter) {
-    if (!addCounter(counter)) {
-      throw new IllegalArgumentException(
-          "Counter " + counter + " duplicates an existing counter in " + this);
-    }
-  }
-
-  /**
-   * Adds the given Counter to this CounterSet.
-   *
-   * <p>If a counter with the same name already exists, it will be
-   * reused, as long as it is compatible.
-   *
-   * @return the Counter that was reused, or added
-   * @throws IllegalArgumentException if a counter with the same
-   * name but an incompatible kind had already been added
-   */
-  public synchronized <T> Counter<T> addOrReuseCounter(Counter<T> counter) {
-    String flatName = counter.getFlatName();
-    Counter<?> oldCounter = counters.get(flatName);
-    if (oldCounter == null) {
-      // A new counter.
-      counters.put(flatName, counter);
-      return counter;
-    }
-    if (counter.isCompatibleWith(oldCounter)) {
-      // Return the counter to reuse.
-      @SuppressWarnings("unchecked")
-      Counter<T> compatibleCounter = (Counter<T>) oldCounter;
-      return compatibleCounter;
-    }
-    throw new IllegalArgumentException(
-        "Counter " + counter + " duplicates incompatible counter "
-        + oldCounter + " in " + this);
-  }
-
-  /**
-   * Adds a counter. Returns {@code true} if the counter was added to the set
-   * and false if the given counter was {@code null} or it already existed in
-   * the set.
-   *
-   * @param counter to register
-   */
-  public boolean addCounter(Counter<?> counter) {
-    return add(counter);
-  }
-
-  /**
-   * Returns the Counter with the given name in this CounterSet;
-   * returns null if no such Counter exists.
-   */
-  public synchronized Counter<?> getExistingCounter(String name) {
-    return counters.get(name);
-  }
-
-  @Override
-  public synchronized Iterator<Counter<?>> iterator() {
-    return counters.values().iterator();
-  }
-
-  @Override
-  public synchronized int size() {
-    return counters.size();
-  }
-
-  @Override
-  public synchronized boolean add(Counter<?> e) {
-    if (null == e) {
-      return false;
-    }
-    if (counters.containsKey(e.getFlatName())) {
-      return false;
-    }
-    counters.put(e.getFlatName(), e);
-    return true;
-  }
-
-  public synchronized void merge(CounterSet that) {
-    for (Counter<?> theirCounter : that) {
-      Counter<?> myCounter = counters.get(theirCounter.getFlatName());
-      if (myCounter != null) {
-        mergeCounters(myCounter, theirCounter);
-      } else {
-        addCounter(theirCounter);
-      }
-    }
-  }
-
-  private <T> void mergeCounters(Counter<T> mine, Counter<?> theirCounter) {
-    checkArgument(
-        mine.isCompatibleWith(theirCounter),
-        "Can't merge CounterSets containing incompatible counters with the same name: "
-            + "%s (existing) and %s (merged)",
-        mine,
-        theirCounter);
-    @SuppressWarnings("unchecked")
-    Counter<T> theirs = (Counter<T>) theirCounter;
-    mine.merge(theirs);
-  }
-
-  /**
-   * A nested class that supports adding additional counters into the
-   * enclosing CounterSet. This is useful as a mutator, hiding other
-   * public methods of the CounterSet.
-   */
-  public class AddCounterMutator {
-    /**
-     * Adds the given Counter into the enclosing CounterSet.
-     *
-     * <p>If a counter with the same name already exists, it will be
-     * reused, as long as it has the same type.
-     *
-     * @return the Counter that was reused, or added
-     * @throws IllegalArgumentException if a counter with the same
-     * name but an incompatible kind had already been added
-     */
-    public <T> Counter<T> addCounter(Counter<T> counter) {
-      return addOrReuseCounter(counter);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
index 3e7011b..388355e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java
@@ -19,37 +19,21 @@ package org.apache.beam.sdk.util.common;
 
 import java.util.Observable;
 import java.util.Observer;
-import javax.annotation.Nullable;
 
 /**
- * An observer that gets notified when additional bytes are read
- * and/or used. It adds all bytes into a local counter. When the
- * observer gets advanced via the next() call, it adds the total byte
- * count to the specified counter, and prepares for the next element.
+ * An observer that gets notified when additional bytes are read and/or used.
  */
-public class ElementByteSizeObserver implements Observer {
-  @Nullable
-  private final Counter<Long> counter;
+public abstract class ElementByteSizeObserver implements Observer {
   private boolean isLazy = false;
   private long totalSize = 0;
   private double scalingFactor = 1.0;
 
-  public ElementByteSizeObserver() {
-    this.counter = null;
-  }
-
-  public ElementByteSizeObserver(Counter<Long> counter) {
-    this.counter = counter;
-  }
+  public ElementByteSizeObserver() {}
 
   /**
    * Called to report element byte size.
    */
-  protected void reportElementSize(long elementByteSize) {
-    if (counter != null) {
-      counter.addValue(elementByteSize);
-    }
-  }
+  protected abstract void reportElementSize(long elementByteSize);
 
   /**
    * Sets byte counting for the current element as lazy. That is, the

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d20a7ada/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java
deleted file mode 100644
index 3f96cf2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CounterAggregatorTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MAX;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.MIN;
-import static org.apache.beam.sdk.util.common.Counter.AggregationKind.SUM;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.IterableCombineFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterProvider;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
-
-import com.google.common.collect.Iterables;
-
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Unit tests for the {@link Aggregator} API.
- */
-@RunWith(JUnit4.class)
-public class CounterAggregatorTest {
-  @Rule
-  public final ExpectedException expectedEx = ExpectedException.none();
-
-  private static final String AGGREGATOR_NAME = "aggregator_name";
-
-  @SuppressWarnings("rawtypes")
-  private <V, AccumT> void testAggregator(List<V> items,
-                                      Combine.CombineFn<V, AccumT, V> combiner,
-                                      Counter expectedCounter) {
-    CounterSet counters = new CounterSet();
-    Aggregator<V, V> aggregator = new CounterAggregator<>(
-        AGGREGATOR_NAME, combiner, counters.getAddCounterMutator());
-    for (V item : items) {
-      aggregator.addValue(item);
-    }
-
-    assertEquals(Iterables.getOnlyElement(counters), expectedCounter);
-  }
-
-  @Test
-  public void testGetName() {
-    String name = "testAgg";
-    CounterAggregator<Long, long[], Long> aggregator = new CounterAggregator<>(
-        name, new Sum.SumLongFn(),
-        new CounterSet().getAddCounterMutator());
-
-    assertEquals(name, aggregator.getName());
-  }
-
-  @Test
-  public void testGetCombineFn() {
-    CombineFn<Long, ?, Long> combineFn = new Min.MinLongFn();
-
-    CounterAggregator<Long, ?, Long> aggregator = new CounterAggregator<>("foo",
-        combineFn, new CounterSet().getAddCounterMutator());
-
-    assertEquals(combineFn, aggregator.getCombineFn());
-  }
-
-  @Test
-
-  public void testSumInteger() throws Exception {
-    testAggregator(Arrays.asList(2, 4, 1, 3), new Sum.SumIntegerFn(),
-                   Counter.ints(AGGREGATOR_NAME, SUM).resetToValue(10));
-  }
-
-  @Test
-  public void testSumLong() throws Exception {
-    testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Sum.SumLongFn(),
-                   Counter.longs(AGGREGATOR_NAME, SUM).resetToValue(10L));
-  }
-
-  @Test
-  public void testSumDouble() throws Exception {
-    testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Sum.SumDoubleFn(),
-                   Counter.doubles(AGGREGATOR_NAME, SUM).resetToValue(10.2));
-  }
-
-  @Test
-  public void testMinInteger() throws Exception {
-    testAggregator(Arrays.asList(2, 4, 1, 3), new Min.MinIntegerFn(),
-                   Counter.ints(AGGREGATOR_NAME, MIN).resetToValue(1));
-  }
-
-  @Test
-  public void testMinLong() throws Exception {
-    testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Min.MinLongFn(),
-                   Counter.longs(AGGREGATOR_NAME, MIN).resetToValue(1L));
-  }
-
-  @Test
-  public void testMinDouble() throws Exception {
-    testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Min.MinDoubleFn(),
-                   Counter.doubles(AGGREGATOR_NAME, MIN).resetToValue(1.0));
-  }
-
-  @Test
-  public void testMaxInteger() throws Exception {
-    testAggregator(Arrays.asList(2, 4, 1, 3), new Max.MaxIntegerFn(),
-                   Counter.ints(AGGREGATOR_NAME, MAX).resetToValue(4));
-  }
-
-  @Test
-  public void testMaxLong() throws Exception {
-    testAggregator(Arrays.asList(2L, 4L, 1L, 3L), new Max.MaxLongFn(),
-                   Counter.longs(AGGREGATOR_NAME, MAX).resetToValue(4L));
-  }
-
-  @Test
-  public void testMaxDouble() throws Exception {
-    testAggregator(Arrays.asList(2.0, 4.1, 1.0, 3.1), new Max.MaxDoubleFn(),
-                   Counter.doubles(AGGREGATOR_NAME, MAX).resetToValue(4.1));
-  }
-
-  @Test
-  public void testCounterProviderCallsProvidedCounterAddValue() {
-    @SuppressWarnings("unchecked")
-    CombineFn<String, ?, String> combiner = mock(CombineFn.class,
-        withSettings().extraInterfaces(CounterProvider.class));
-    @SuppressWarnings("unchecked")
-    CounterProvider<String> provider = (CounterProvider<String>) combiner;
-
-    @SuppressWarnings("unchecked")
-    Counter<String> mockCounter = mock(Counter.class);
-    String name = "foo";
-    when(provider.getCounter(name)).thenReturn(mockCounter);
-
-    AddCounterMutator addCounterMutator = mock(AddCounterMutator.class);
-    when(addCounterMutator.addCounter(mockCounter)).thenReturn(mockCounter);
-
-    Aggregator<String, String> aggregator =
-        new CounterAggregator<>(name, combiner, addCounterMutator);
-
-    aggregator.addValue("bar_baz");
-
-    verify(mockCounter).addValue("bar_baz");
-    verify(addCounterMutator).addCounter(mockCounter);
-  }
-
-
-  @Test
-  public void testCompatibleDuplicateNames() throws Exception {
-    CounterSet counters = new CounterSet();
-    Aggregator<Integer, Integer> aggregator1 = new CounterAggregator<>(
-        AGGREGATOR_NAME, new Sum.SumIntegerFn(),
-        counters.getAddCounterMutator());
-
-    Aggregator<Integer, Integer> aggregator2 = new CounterAggregator<>(
-        AGGREGATOR_NAME, new Sum.SumIntegerFn(),
-        counters.getAddCounterMutator());
-
-    // The duplicate aggregators should update the same counter.
-    aggregator1.addValue(3);
-    aggregator2.addValue(4);
-    Assert.assertEquals(
-        new CounterSet(Counter.ints(AGGREGATOR_NAME, SUM).resetToValue(7)),
-        counters);
-  }
-
-  @Test
-  public void testIncompatibleDuplicateNames() throws Exception {
-    CounterSet counters = new CounterSet();
-    new CounterAggregator<>(
-        AGGREGATOR_NAME, new Sum.SumIntegerFn(),
-        counters.getAddCounterMutator());
-
-    expectedEx.expect(IllegalArgumentException.class);
-    expectedEx.expectMessage(Matchers.containsString(
-        "aggregator's name collides with an existing aggregator or "
-        + "system-provided counter of an incompatible type"));
-    new CounterAggregator<>(
-        AGGREGATOR_NAME, new Sum.SumLongFn(),
-        counters.getAddCounterMutator());
-    }
-
-  @Test
-  public void testUnsupportedCombineFn() throws Exception {
-    expectedEx.expect(IllegalArgumentException.class);
-    expectedEx.expectMessage(Matchers.containsString("unsupported combiner"));
-    new CounterAggregator<>(
-        AGGREGATOR_NAME,
-        new Combine.CombineFn<Integer, List<Integer>, Integer>() {
-          @Override
-          public List<Integer> createAccumulator() {
-            return null;
-          }
-          @Override
-          public List<Integer> addInput(List<Integer> accumulator, Integer input) {
-            return null;
-          }
-          @Override
-          public List<Integer> mergeAccumulators(Iterable<List<Integer>> accumulators) {
-            return null;
-          }
-          @Override
-          public Integer extractOutput(List<Integer> accumulator) {
-            return null;
-          }
-        }, (new CounterSet()).getAddCounterMutator());
-  }
-
-  @Test
-  public void testUnsupportedSerializableFunction() throws Exception {
-    expectedEx.expect(IllegalArgumentException.class);
-    expectedEx.expectMessage(Matchers.containsString("unsupported combiner"));
-    CombineFn<Integer, List<Integer>, Integer> combiner = IterableCombineFn
-        .<Integer>of(new SerializableFunction<Iterable<Integer>, Integer>() {
-          @Override
-          public Integer apply(Iterable<Integer> input) {
-            return null;
-          }
-        });
-    new CounterAggregator<>(AGGREGATOR_NAME, combiner,
-        (new CounterSet()).getAddCounterMutator());
-  }
-}