You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/01/03 21:28:01 UTC

[3/4] beam git commit: Remove .named from Combine

Remove .named from Combine

Introduces a NameOverride interface that allows some classes to define
custom behavior for getting the name. This is necessary for
parameterized CombineFns to expose details about their parameter values.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16b26673
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16b26673
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16b26673

Branch: refs/heads/master
Commit: 16b266738ed46966400bf0ad1807359a5f763419
Parents: e5a3f75
Author: bchambers <bc...@google.com>
Authored: Thu Dec 29 13:10:13 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Jan 3 13:15:45 2017 -0800

----------------------------------------------------------------------
 .../core/UnboundedReadFromBoundedSource.java    |   2 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   4 +-
 .../DataflowUnboundedReadFromBoundedSource.java |   9 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   2 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |   4 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 175 ++++++++++---------
 .../org/apache/beam/sdk/transforms/Count.java   |   4 +-
 .../org/apache/beam/sdk/transforms/Max.java     |  20 +--
 .../org/apache/beam/sdk/transforms/Mean.java    |   4 +-
 .../org/apache/beam/sdk/transforms/Min.java     |  20 +--
 .../org/apache/beam/sdk/transforms/ParDo.java   |  14 +-
 .../org/apache/beam/sdk/transforms/Sum.java     |  12 +-
 .../org/apache/beam/sdk/transforms/Top.java     |  27 +--
 .../org/apache/beam/sdk/util/NameUtils.java     |  40 ++++-
 .../apache/beam/sdk/transforms/CombineTest.java |  23 +--
 .../apache/beam/sdk/transforms/CountTest.java   |   2 +-
 .../org/apache/beam/sdk/transforms/MaxTest.java |  14 +-
 .../apache/beam/sdk/transforms/MeanTest.java    |   5 +-
 .../org/apache/beam/sdk/transforms/MinTest.java |  15 +-
 .../org/apache/beam/sdk/transforms/SumTest.java |  12 +-
 .../org/apache/beam/sdk/transforms/TopTest.java |  13 +-
 .../org/apache/beam/sdk/util/NameUtilsTest.java |  33 ++--
 22 files changed, 239 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 645a411..3073076 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -100,7 +100,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   public String getKindString() {
-    return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
+    return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 69c9c18..03e5dfc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2308,7 +2308,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     public String getKindString() {
-      return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
+      return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
     }
 
     static {
@@ -2784,7 +2784,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           ? "streaming" : "batch";
       String name =
           transform == null
-              ? NameUtils.approximateSimpleName(doFn.getClass())
+              ? NameUtils.approximateSimpleName(doFn)
               : NameUtils.approximatePTransformName(transform.getClass());
       throw new UnsupportedOperationException(
           String.format("The DataflowRunner in %s mode does not support %s.", mode, name));

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index db87e21..a2ae799 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -105,14 +105,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
 
   @Override
   public String getKindString() {
-    String sourceName;
-    if (source.getClass().isAnonymousClass()) {
-      sourceName = "AnonymousSource";
-    } else {
-      sourceName = NameUtils.approximateSimpleName(source.getClass());
-    }
-
-    return String.format("Read(%s)", sourceName);
+    return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index ac84c5e..8b63bfd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -117,7 +117,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   public String getKindString() {
-    return "Read(" + NameUtils.approximateSimpleName(source.getClass()) + ")";
+    return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 7404cba..0e269a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -117,7 +117,7 @@ public class Read {
 
     @Override
     public String getKindString() {
-      return "Read(" + NameUtils.approximateSimpleName(source.getClass()) + ")";
+      return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
     }
 
     @Override
@@ -184,7 +184,7 @@ public class Read {
 
     @Override
     public String getKindString() {
-      return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
+      return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 3b07260..92c04ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -58,6 +58,8 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.NameUtils.NameOverride;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -473,56 +475,72 @@ public class Combine {
     @Override
     public <K> KeyedCombineFn<K, InputT, AccumT, OutputT> asKeyedFn() {
       // The key, an object, is never even looked at.
-      return new KeyedCombineFn<K, InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(K key) {
-          return CombineFn.this.createAccumulator();
-        }
+      return new KeyIgnoringCombineFn<>(this);
+    }
 
-        @Override
-        public AccumT addInput(K key, AccumT accumulator, InputT input) {
-          return CombineFn.this.addInput(accumulator, input);
-        }
+    private static class KeyIgnoringCombineFn<K, InputT, AccumT, OutputT>
+        extends KeyedCombineFn<K, InputT, AccumT, OutputT>
+        implements NameOverride {
 
-        @Override
-        public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
-          return CombineFn.this.mergeAccumulators(accumulators);
-        }
+      private final CombineFn<InputT, AccumT, OutputT> fn;
 
-        @Override
-        public OutputT extractOutput(K key, AccumT accumulator) {
-          return CombineFn.this.extractOutput(accumulator);
-        }
+      private KeyIgnoringCombineFn(CombineFn<InputT, AccumT, OutputT> fn) {
+        this.fn = fn;
+      }
 
-        @Override
-        public AccumT compact(K key, AccumT accumulator) {
-          return CombineFn.this.compact(accumulator);
-        }
+      @Override
+      public AccumT createAccumulator(K key) {
+        return fn.createAccumulator();
+      }
 
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(
-            CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return CombineFn.this.getAccumulatorCoder(registry, inputCoder);
-        }
+      @Override
+      public AccumT addInput(K key, AccumT accumulator, InputT input) {
+        return fn.addInput(accumulator, input);
+      }
 
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(
-            CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return CombineFn.this.getDefaultOutputCoder(registry, inputCoder);
-        }
+      @Override
+      public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
+        return fn.mergeAccumulators(accumulators);
+      }
 
-        @Override
-        public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
-          return CombineFn.this;
-        }
+      @Override
+      public OutputT extractOutput(K key, AccumT accumulator) {
+        return fn.extractOutput(accumulator);
+      }
 
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          builder.delegate(CombineFn.this);
-        }
-      };
+      @Override
+      public AccumT compact(K key, AccumT accumulator) {
+        return fn.compact(accumulator);
+      }
+
+      @Override
+      public Coder<AccumT> getAccumulatorCoder(
+          CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
+          throws CannotProvideCoderException {
+        return fn.getAccumulatorCoder(registry, inputCoder);
+      }
+
+      @Override
+      public Coder<OutputT> getDefaultOutputCoder(
+          CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
+          throws CannotProvideCoderException {
+        return fn.getDefaultOutputCoder(registry, inputCoder);
+      }
+
+      @Override
+      public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
+        return fn;
+      }
+
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.delegate(fn);
+      }
+
+      @Override
+      public String getNameOverride() {
+        return NameUtils.approximateSimpleName(fn);
+      }
     }
   }
 
@@ -1338,20 +1356,9 @@ public class Combine {
       this.sideInputs = ImmutableList.of();
     }
 
-    private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
-      super(name);
-      this.fn = fn;
-      this.fnDisplayData = fnDisplayData;
-      this.insertDefault = insertDefault;
-      this.fanout = fanout;
-      this.sideInputs = ImmutableList.of();
-    }
-
-    private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
+    private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout,
         List<PCollectionView<?>> sideInputs) {
-      super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.insertDefault = insertDefault;
@@ -1359,12 +1366,9 @@ public class Combine {
       this.sideInputs = sideInputs;
     }
 
-    /**
-     * Return a new {@code Globally} transform that's like this transform but with the
-     * specified name. Does not modify this transform.
-     */
-    public Globally<InputT, OutputT> named(String name) {
-      return new Globally<>(name, fn, fnDisplayData, insertDefault, fanout);
+    @Override
+    protected String getKindString() {
+      return String.format("Combine.globally(%s)", NameUtils.approximateSimpleName(fn));
     }
 
     /**
@@ -1384,7 +1388,7 @@ public class Combine {
      * is not globally windowed and the output is not being used as a side input.
      */
     public Globally<InputT, OutputT> withoutDefaults() {
-      return new Globally<>(name, fn, fnDisplayData, false, fanout);
+      return new Globally<>(fn, fnDisplayData, false, fanout);
     }
 
     /**
@@ -1395,7 +1399,7 @@ public class Combine {
      * that will be used.
      */
     public Globally<InputT, OutputT> withFanout(int fanout) {
-      return new Globally<>(name, fn, fnDisplayData, insertDefault, fanout);
+      return new Globally<>(fn, fnDisplayData, insertDefault, fanout);
     }
 
     /**
@@ -1405,7 +1409,7 @@ public class Combine {
     public Globally<InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
       checkState(fn instanceof RequiresContextInternal);
-      return new Globally<>(name, fn, fnDisplayData, insertDefault, fanout,
+      return new Globally<>(fn, fnDisplayData, insertDefault, fanout,
           ImmutableList.copyOf(sideInputs));
     }
 
@@ -1613,7 +1617,9 @@ public class Combine {
    * {@link #perKey(SerializableFunction)}, and
    * {@link #groupedValues(SerializableFunction)}.
    */
-  public static class IterableCombineFn<V> extends CombineFn<V, List<V>, V> {
+  public static class IterableCombineFn<V>
+      extends CombineFn<V, List<V>, V>
+      implements NameOverride {
     /**
      * Returns a {@code CombineFn} that uses the given
      * {@code SerializableFunction} to combine values.
@@ -1693,6 +1699,11 @@ public class Combine {
       singleton.add(combiner.apply(values));
       return singleton;
     }
+
+    @Override
+    public String getNameOverride() {
+      return NameUtils.approximateSimpleName(combiner);
+    }
   }
 
   /**
@@ -1774,33 +1785,19 @@ public class Combine {
       this.sideInputs = ImmutableList.of();
     }
 
-    private PerKey(String name,
+    private PerKey(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         boolean fewKeys, List<PCollectionView<?>> sideInputs) {
-      super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.fewKeys = fewKeys;
       this.sideInputs = sideInputs;
     }
 
-    private PerKey(
-        String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
-      super(name);
-      this.fn = fn;
-      this.fnDisplayData = fnDisplayData;
-      this.fewKeys = fewKeys;
-      this.sideInputs = ImmutableList.of();
-    }
-
-    /**
-     * Return a new {@code Globally} transform that's like this transform but with the
-     * specified name. Does not modify this transform.
-     */
-    public PerKey<K, InputT, OutputT> named(String name) {
-      return new PerKey<>(name, fn, fnDisplayData, fewKeys);
+    @Override
+    protected String getKindString() {
+      return String.format("Combine.perKey(%s)", NameUtils.approximateSimpleName(fn));
     }
 
     /**
@@ -1810,7 +1807,7 @@ public class Combine {
     public PerKey<K, InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
       checkState(fn instanceof RequiresContextInternal);
-      return new PerKey<>(name, fn, fnDisplayData, fewKeys,
+      return new PerKey<>(fn, fnDisplayData, fewKeys,
           ImmutableList.copyOf(sideInputs));
     }
 
@@ -1827,7 +1824,7 @@ public class Combine {
      */
     public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(
         SerializableFunction<? super K, Integer> hotKeyFanout) {
-      return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData, hotKeyFanout);
+      return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData, hotKeyFanout);
     }
 
     /**
@@ -1835,7 +1832,7 @@ public class Combine {
      * constant value for every key.
      */
     public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) {
-      return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData,
+      return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData,
           new SimpleFunction<K, Integer>() {
             @Override
             public void populateDisplayData(Builder builder) {
@@ -1890,17 +1887,21 @@ public class Combine {
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final SerializableFunction<? super K, Integer> hotKeyFanout;
 
-    private PerKeyWithHotKeyFanout(String name,
+    private PerKeyWithHotKeyFanout(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         SerializableFunction<? super K, Integer> hotKeyFanout) {
-      super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.hotKeyFanout = hotKeyFanout;
     }
 
     @Override
+    protected String getKindString() {
+      return String.format("Combine.perKeyWithFanout(%s)", NameUtils.approximateSimpleName(fn));
+    }
+
+    @Override
     public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
       return applyHelper(input);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 9101996..d164978 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -51,7 +51,7 @@ public class Count {
    * its input {@link PCollection}.
    */
   public static <T> Combine.Globally<T, Long> globally() {
-    return Combine.globally(new CountFn<T>()).named("Count.Globally");
+    return Combine.globally(new CountFn<T>());
   }
 
   /**
@@ -59,7 +59,7 @@ public class Count {
    * associated with each key of its input {@link PCollection}.
    */
   public static <K, V> Combine.PerKey<K, V, Long> perKey() {
-    return Combine.<K, V, Long>perKey(new CountFn<V>()).named("Count.PerKey");
+    return Combine.<K, V, Long>perKey(new CountFn<V>());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index c44d9b6..0990ca4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -52,7 +52,7 @@ public class Max {
    * elements, or {@code Integer.MIN_VALUE} if there are no elements.
    */
   public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new MaxIntegerFn()).named("Max.Globally");
+    return Combine.globally(new MaxIntegerFn());
   }
 
   /**
@@ -64,7 +64,7 @@ public class Max {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new MaxIntegerFn()).named("Max.PerKey");
+    return Combine.<K, Integer, Integer>perKey(new MaxIntegerFn());
   }
 
   /**
@@ -73,7 +73,7 @@ public class Max {
    * or {@code Long.MIN_VALUE} if there are no elements.
    */
   public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new MaxLongFn()).named("Max.Globally");
+    return Combine.globally(new MaxLongFn());
   }
 
   /**
@@ -85,7 +85,7 @@ public class Max {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-    return Combine.<K, Long, Long>perKey(new MaxLongFn()).named("Max.PerKey");
+    return Combine.<K, Long, Long>perKey(new MaxLongFn());
   }
 
   /**
@@ -94,7 +94,7 @@ public class Max {
    * elements, or {@code Double.NEGATIVE_INFINITY} if there are no elements.
    */
   public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new MaxDoubleFn()).named("Max.Globally");
+    return Combine.globally(new MaxDoubleFn());
   }
 
   /**
@@ -106,7 +106,7 @@ public class Max {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new MaxDoubleFn()).named("Max.PerKey");
+    return Combine.<K, Double, Double>perKey(new MaxDoubleFn());
   }
 
   /**
@@ -116,7 +116,7 @@ public class Max {
    */
   public static <T extends Comparable<? super T>>
   Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MaxFn.<T>naturalOrder()).named("Max.Globally");
+    return Combine.<T, T>globally(MaxFn.<T>naturalOrder());
   }
 
   /**
@@ -129,7 +129,7 @@ public class Max {
    */
   public static <K, T extends Comparable<? super T>>
   Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder()).named("Max.PerKey");
+    return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder());
   }
 
   /**
@@ -139,7 +139,7 @@ public class Max {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MaxFn.of(comparator)).named("Max.Globally");
+    return Combine.<T, T>globally(MaxFn.of(comparator));
   }
 
   /**
@@ -151,7 +151,7 @@ public class Max {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MaxFn.of(comparator)).named("Max.PerKey");
+    return Combine.<K, T, T>perKey(MaxFn.of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 9eea3a0..cb77ba3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -64,7 +64,7 @@ public class Mean {
    * @param <NumT> the type of the {@code Number}s being combined
    */
   public static <NumT extends Number> Combine.Globally<NumT, Double> globally() {
-    return Combine.<NumT, Double>globally(new MeanFn<>()).named("Mean.Globally");
+    return Combine.<NumT, Double>globally(new MeanFn<>());
   }
 
   /**
@@ -81,7 +81,7 @@ public class Mean {
    * @param <NumT> the type of the {@code Number}s being combined
    */
   public static <K, NumT extends Number> Combine.PerKey<K, NumT, Double> perKey() {
-    return Combine.<K, NumT, Double>perKey(new MeanFn<>()).named("Mean.PerKey");
+    return Combine.<K, NumT, Double>perKey(new MeanFn<>());
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index f046779..5003594 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -52,7 +52,7 @@ public class Min {
    * {@code PCollection}'s elements, or {@code Integer.MAX_VALUE} if there are no elements.
    */
   public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new MinIntegerFn()).named("Min.Globally");
+    return Combine.globally(new MinIntegerFn());
   }
 
   /**
@@ -64,7 +64,7 @@ public class Min {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new MinIntegerFn()).named("Min.PerKey");
+    return Combine.<K, Integer, Integer>perKey(new MinIntegerFn());
   }
 
   /**
@@ -73,7 +73,7 @@ public class Min {
    * or {@code Long.MAX_VALUE} if there are no elements.
    */
   public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new MinLongFn()).named("Min.Globally");
+    return Combine.globally(new MinLongFn());
   }
 
   /**
@@ -85,7 +85,7 @@ public class Min {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-   return Combine.<K, Long, Long>perKey(new MinLongFn()).named("Min.PerKey");
+   return Combine.<K, Long, Long>perKey(new MinLongFn());
   }
 
   /**
@@ -94,7 +94,7 @@ public class Min {
    * elements, or {@code Double.POSITIVE_INFINITY} if there are no elements.
    */
   public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new MinDoubleFn()).named("Min.Globally");
+    return Combine.globally(new MinDoubleFn());
   }
 
   /**
@@ -106,7 +106,7 @@ public class Min {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new MinDoubleFn()).named("Min.PerKey");
+    return Combine.<K, Double, Double>perKey(new MinDoubleFn());
   }
 
   /**
@@ -116,7 +116,7 @@ public class Min {
    */
   public static <T extends Comparable<? super T>>
   Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MinFn.<T>naturalOrder()).named("Min.Globally");
+    return Combine.<T, T>globally(MinFn.<T>naturalOrder());
   }
 
   /**
@@ -129,7 +129,7 @@ public class Min {
    */
   public static <K, T extends Comparable<? super T>>
   Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder()).named("Min.PerKey");
+    return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder());
   }
 
   /**
@@ -139,7 +139,7 @@ public class Min {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MinFn.of(comparator)).named("Min.Globally");
+    return Combine.<T, T>globally(MinFn.of(comparator));
   }
 
   /**
@@ -151,7 +151,7 @@ public class Min {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MinFn.of(comparator)).named("Min.PerKey");
+    return Combine.<K, T, T>perKey(MinFn.of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 7e54a54..5b4fa19 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -762,12 +762,7 @@ public class ParDo {
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = getFn().getClass();
-      if (clazz.isAnonymousClass()) {
-        return "AnonymousParDo";
-      } else {
-        return String.format("ParDo(%s)", NameUtils.approximateSimpleName(clazz));
-      }
+      return String.format("ParDo(%s)", NameUtils.approximateSimpleName(getFn()));
     }
 
     /**
@@ -976,12 +971,7 @@ public class ParDo {
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = getFn().getClass();
-      if (clazz.isAnonymousClass()) {
-        return "AnonymousParMultiDo";
-      } else {
-        return String.format("ParMultiDo(%s)", NameUtils.approximateSimpleName(clazz));
-      }
+      return String.format("ParMultiDo(%s)", NameUtils.approximateSimpleName(getFn()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 27c5ced..48eafc3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -50,7 +50,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new SumIntegerFn()).named("Sum.Globally");
+    return Combine.globally(new SumIntegerFn());
   }
 
   /**
@@ -62,7 +62,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new SumIntegerFn()).named("Sum.PerKey");
+    return Combine.<K, Integer, Integer>perKey(new SumIntegerFn());
   }
 
   /**
@@ -73,7 +73,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new SumLongFn()).named("Sum.Globally");
+    return Combine.globally(new SumLongFn());
   }
 
   /**
@@ -85,7 +85,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-    return Combine.<K, Long, Long>perKey(new SumLongFn()).named("Sum.PerKey");
+    return Combine.<K, Long, Long>perKey(new SumLongFn());
   }
 
   /**
@@ -96,7 +96,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new SumDoubleFn()).named("Sum.Globally");
+    return Combine.globally(new SumDoubleFn());
   }
 
   /**
@@ -108,7 +108,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new SumDoubleFn()).named("Sum.PerKey");
+    return Combine.<K, Double, Double>perKey(new SumDoubleFn());
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 992a341..47be9b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -39,6 +39,8 @@ import org.apache.beam.sdk.transforms.Combine.PerKey;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.NameUtils.NameOverride;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -97,7 +99,7 @@ public class Top {
    */
   public static <T, ComparatorT extends Comparator<T> & Serializable>
       Combine.Globally<T, List<T>> of(int count, ComparatorT compareFn) {
-    return Combine.globally(new TopCombineFn<>(count, compareFn)).named("Top.Globally");
+    return Combine.globally(new TopCombineFn<>(count, compareFn));
   }
 
   /**
@@ -141,8 +143,7 @@ public class Top {
    * {@code KV}s and return the top values associated with each key.
    */
   public static <T extends Comparable<T>> Combine.Globally<T, List<T>> smallest(int count) {
-    return Combine.globally(new TopCombineFn<>(count, new Smallest<T>()))
-        .named("Smallest.Globally");
+    return Combine.globally(new TopCombineFn<>(count, new Smallest<T>()));
   }
 
   /**
@@ -186,7 +187,7 @@ public class Top {
    * {@code KV}s and return the top values associated with each key.
    */
   public static <T extends Comparable<T>> Combine.Globally<T, List<T>> largest(int count) {
-    return Combine.globally(new TopCombineFn<>(count, new Largest<T>())).named("Largest.Globally");
+    return Combine.globally(new TopCombineFn<>(count, new Largest<T>()));
   }
 
   /**
@@ -233,8 +234,7 @@ public class Top {
   public static <K, V, ComparatorT extends Comparator<V> & Serializable>
       PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
       perKey(int count, ComparatorT compareFn) {
-    return Combine.perKey(
-        new TopCombineFn<>(count, compareFn).<K>asKeyedFn()).named("Top.PerKey");
+    return Combine.perKey(new TopCombineFn<>(count, compareFn).<K>asKeyedFn());
   }
 
   /**
@@ -280,8 +280,7 @@ public class Top {
   public static <K, V extends Comparable<V>>
       PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
       smallestPerKey(int count) {
-    return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn())
-        .named("Smallest.PerKey");
+    return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn());
   }
 
   /**
@@ -327,9 +326,7 @@ public class Top {
   public static <K, V extends Comparable<V>>
       PerKey<K, V, List<V>>
       largestPerKey(int count) {
-    return Combine.perKey(
-new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
-        .named("Largest.PerKey");
+    return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn());
   }
 
   /**
@@ -368,7 +365,8 @@ new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
    * @param <T> type of element being compared
    */
   public static class TopCombineFn<T, ComparatorT extends Comparator<T> & Serializable>
-      extends AccumulatingCombineFn<T, BoundedHeap<T, ComparatorT>, List<T>> {
+      extends AccumulatingCombineFn<T, BoundedHeap<T, ComparatorT>, List<T>>
+      implements NameOverride {
 
     private final int count;
     private final ComparatorT compareFn;
@@ -380,6 +378,11 @@ new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
     }
 
     @Override
+    public String getNameOverride() {
+      return String.format("Top(%s)", NameUtils.approximateSimpleName(compareFn));
+    }
+
+    @Override
     public BoundedHeap<T, ComparatorT> createAccumulator() {
       return new BoundedHeap<>(count, compareFn, new ArrayList<T>());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
index 60a0e41..1c59af7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
@@ -29,12 +29,18 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
- * Helpers for extracting the name of objects (most commonly {@link DoFn} and {@link CombineFn}).
+ * Helpers for extracting the name of objects and classes.
  */
 public class NameUtils {
 
+  /** Classes may implement this interface to change how names are generated for their instances. */
+  public interface NameOverride {
+    /** Return the name to use for this instance. */
+    String getNameOverride();
+  }
+
   private static final String[] STANDARD_NAME_SUFFIXES =
-      new String[]{"OldDoFn", "DoFn", "Fn"};
+      new String[]{"OldDoFn", "DoFn", "CombineFn", "Fn"};
 
   /**
    * Pattern to match a non-anonymous inner class.
@@ -87,7 +93,16 @@ public class NameUtils {
   }
 
   /**
-   * Returns a simple name for a class.
+   * As {@link #approximateSimpleName(Object, String)} but returning {@code "Anonymous"} when
+   * {@code object} is an instance of anonymous class.
+   */
+  public static String approximateSimpleName(Object object) {
+    return approximateSimpleName(object, "Anonymous");
+  }
+
+  /**
+   * Returns a simple name describing a class that is being used as a function (eg., a {@link DoFn}
+   * or {@link CombineFn}, etc.).
    *
    * <p>Note: this is non-invertible - the name may be simplified to an
    * extent that it cannot be mapped back to the original class.
@@ -96,15 +111,28 @@ public class NameUtils {
    * removes the package and outer classes from the name,
    * and removes common suffixes.
    *
+   * <p>If the object is an instanceof {@link NameOverride}, the result of
+   * {@link NameOverride#getNameOverride()} is returned. This allows classes that act as wrappers to
+   * override the handling of names by delegating to the objects they wrap.
+   *
+   * <p>If the class is anonymous, the string {@code anonymousValue} is returned.
+   *
    * <p>Examples:
    * <ul>
    *   <li>{@code some.package.Word.SummaryDoFn} becomes "Summary"
    *   <li>{@code another.package.PairingFn} becomes "Pairing"
    * </ul>
-   *
-   * @throws IllegalArgumentException if the class is anonymous
    */
-  public static String approximateSimpleName(Class<?> clazz) {
+  public static String approximateSimpleName(Object object, String anonymousValue) {
+    if (object instanceof NameOverride) {
+      return ((NameOverride) object).getNameOverride();
+    }
+
+    Class<?> clazz = object.getClass();
+    if (clazz.isAnonymousClass()) {
+      return anonymousValue;
+    }
+
     return approximateSimpleName(clazz, /* dropOuterClassNames */ true);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index f783928..fef47fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -24,6 +24,7 @@ import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -470,8 +471,7 @@ public class CombineTest implements Serializable {
     runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.<KV<String, Double>>emptyList());
   }
 
-  // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine),
-  // provide their own top-level name.
+  // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine) have good names.
   @Test
   public void testCombinerNames() {
     Combine.PerKey<String, Integer, Integer> min = Min.integersPerKey();
@@ -479,10 +479,10 @@ public class CombineTest implements Serializable {
     Combine.PerKey<String, Integer, Double> mean = Mean.perKey();
     Combine.PerKey<String, Integer, Integer> sum = Sum.integersPerKey();
 
-    assertThat(min.getName(), Matchers.startsWith("Min"));
-    assertThat(max.getName(), Matchers.startsWith("Max"));
-    assertThat(mean.getName(), Matchers.startsWith("Mean"));
-    assertThat(sum.getName(), Matchers.startsWith("Sum"));
+    assertThat(min.getName(), equalTo("Combine.perKey(MinInteger)"));
+    assertThat(max.getName(), equalTo("Combine.perKey(MaxInteger)"));
+    assertThat(mean.getName(), equalTo("Combine.perKey(Mean)"));
+    assertThat(sum.getName(), equalTo("Combine.perKey(SumInteger)"));
   }
 
   private static final SerializableFunction<String, Integer> hotKeyFanout =
@@ -635,18 +635,13 @@ public class CombineTest implements Serializable {
 
   @Test
   public void testCombineGetName() {
-    assertEquals("Combine.Globally", Combine.globally(new SumInts()).getName());
-    assertEquals(
-        "MyCombineGlobally", Combine.globally(new SumInts()).named("MyCombineGlobally").getName());
+    assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName());
     assertEquals(
         "Combine.GloballyAsSingletonView",
         Combine.globally(new SumInts()).asSingletonView().getName());
-    assertEquals("Combine.PerKey", Combine.perKey(new TestKeyedCombineFn()).getName());
-    assertEquals(
-        "MyCombinePerKey",
-        Combine.perKey(new TestKeyedCombineFn()).named("MyCombinePerKey").getName());
+    assertEquals("Combine.perKey(TestKeyed)", Combine.perKey(new TestKeyedCombineFn()).getName());
     assertEquals(
-        "Combine.PerKeyWithHotKeyFanout",
+        "Combine.perKeyWithFanout(TestKeyed)",
         Combine.perKey(new TestKeyedCombineFn()).withHotKeyFanout(10).getName());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
index eafb12d..dca0542 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
@@ -110,6 +110,6 @@ public class CountTest {
   @Test
   public void testCountGetName() {
     assertEquals("Count.PerElement", Count.perElement().getName());
-    assertEquals("Count.Globally", Count.globally().getName());
+    assertEquals("Combine.globally(Count)", Count.globally().getName());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 5c78b3f..4aa39a3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -34,13 +34,13 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class MaxTest {
   @Test
-  public void testMeanGetNames() {
-    assertEquals("Max.Globally", Max.integersGlobally().getName());
-    assertEquals("Max.Globally", Max.doublesGlobally().getName());
-    assertEquals("Max.Globally", Max.longsGlobally().getName());
-    assertEquals("Max.PerKey", Max.integersPerKey().getName());
-    assertEquals("Max.PerKey", Max.doublesPerKey().getName());
-    assertEquals("Max.PerKey", Max.longsPerKey().getName());
+  public void testMaxGetNames() {
+    assertEquals("Combine.globally(MaxInteger)", Max.integersGlobally().getName());
+    assertEquals("Combine.globally(MaxDouble)", Max.doublesGlobally().getName());
+    assertEquals("Combine.globally(MaxLong)", Max.longsGlobally().getName());
+    assertEquals("Combine.perKey(MaxInteger)", Max.integersPerKey().getName());
+    assertEquals("Combine.perKey(MaxDouble)", Max.doublesPerKey().getName());
+    assertEquals("Combine.perKey(MaxLong)", Max.longsPerKey().getName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
index 1c94e35..84741ee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
@@ -36,10 +36,11 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class MeanTest {
+
   @Test
   public void testMeanGetNames() {
-    assertEquals("Mean.Globally", Mean.globally().getName());
-    assertEquals("Mean.PerKey", Mean.perKey().getName());
+    assertEquals("Combine.globally(Mean)", Mean.globally().getName());
+    assertEquals("Combine.perKey(Mean)", Mean.perKey().getName());
   }
 
   private static final Coder<CountSum<Number>> TEST_CODER = new CountSumCoder<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index a0eca07..4334ed9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -35,15 +35,14 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class MinTest {
   @Test
-  public void testMeanGetNames() {
-    assertEquals("Min.Globally", Min.integersGlobally().getName());
-    assertEquals("Min.Globally", Min.doublesGlobally().getName());
-    assertEquals("Min.Globally", Min.longsGlobally().getName());
-    assertEquals("Min.PerKey", Min.integersPerKey().getName());
-    assertEquals("Min.PerKey", Min.doublesPerKey().getName());
-    assertEquals("Min.PerKey", Min.longsPerKey().getName());
+  public void testMinGetNames() {
+    assertEquals("Combine.globally(MinInteger)", Min.integersGlobally().getName());
+    assertEquals("Combine.globally(MinDouble)", Min.doublesGlobally().getName());
+    assertEquals("Combine.globally(MinLong)", Min.longsGlobally().getName());
+    assertEquals("Combine.perKey(MinInteger)", Min.integersPerKey().getName());
+    assertEquals("Combine.perKey(MinDouble)", Min.doublesPerKey().getName());
+    assertEquals("Combine.perKey(MinLong)", Min.longsPerKey().getName());
   }
-
   @Test
   public void testMinIntegerFn() {
     checkCombineFn(

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
index b4f723d..04c0186 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
@@ -41,12 +41,12 @@ public class SumTest {
 
   @Test
   public void testSumGetNames() {
-    assertEquals("Sum.Globally", Sum.integersGlobally().getName());
-    assertEquals("Sum.Globally", Sum.doublesGlobally().getName());
-    assertEquals("Sum.Globally", Sum.longsGlobally().getName());
-    assertEquals("Sum.PerKey", Sum.integersPerKey().getName());
-    assertEquals("Sum.PerKey", Sum.doublesPerKey().getName());
-    assertEquals("Sum.PerKey", Sum.longsPerKey().getName());
+    assertEquals("Combine.globally(SumInteger)", Sum.integersGlobally().getName());
+    assertEquals("Combine.globally(SumDouble)", Sum.doublesGlobally().getName());
+    assertEquals("Combine.globally(SumLong)", Sum.longsGlobally().getName());
+    assertEquals("Combine.perKey(SumInteger)", Sum.integersPerKey().getName());
+    assertEquals("Combine.perKey(SumDouble)", Sum.doublesPerKey().getName());
+    assertEquals("Combine.perKey(SumLong)", Sum.longsPerKey().getName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index d011197..89e0076 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -234,12 +234,13 @@ public class TopTest {
 
   @Test
   public void testTopGetNames() {
-    assertEquals("Top.Globally", Top.of(1, new OrderByLength()).getName());
-    assertEquals("Smallest.Globally", Top.smallest(1).getName());
-    assertEquals("Largest.Globally", Top.largest(2).getName());
-    assertEquals("Top.PerKey", Top.perKey(1, new IntegerComparator()).getName());
-    assertEquals("Smallest.PerKey", Top.<String, Integer>smallestPerKey(1).getName());
-    assertEquals("Largest.PerKey", Top.<String, Integer>largestPerKey(2).getName());
+    assertEquals("Combine.globally(Top(OrderByLength))", Top.of(1, new OrderByLength()).getName());
+    assertEquals("Combine.globally(Top(Smallest))", Top.smallest(1).getName());
+    assertEquals("Combine.globally(Top(Largest))", Top.largest(2).getName());
+    assertEquals("Combine.perKey(Top(IntegerComparator))",
+        Top.perKey(1, new IntegerComparator()).getName());
+    assertEquals("Combine.perKey(Top(Smallest))", Top.<String, Integer>smallestPerKey(1).getName());
+    assertEquals("Combine.perKey(Top(Largest))", Top.<String, Integer>largestPerKey(2).getName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index b35e942..b81aa36 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.NameUtils.NameOverride;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PDone;
 import org.junit.Rule;
@@ -111,16 +112,12 @@ public class NameUtilsTest {
 
   @Test
   public void testSimpleName() {
-    assertEquals("Embedded", NameUtils.approximateSimpleName(EmbeddedOldDoFn.class));
+    assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedOldDoFn()));
   }
 
   @Test
   public void testAnonSimpleName() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-
-    EmbeddedOldDoFn anon = new EmbeddedOldDoFn(){};
-
-    NameUtils.approximateSimpleName(anon.getClass());
+    assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedOldDoFn() {}));
   }
 
   @Test
@@ -128,7 +125,7 @@ public class NameUtilsTest {
     EmbeddedOldDoFn fn = new EmbeddedOldDoFn();
     EmbeddedOldDoFn inner = fn.getEmbedded();
 
-    assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner.getClass()));
+    assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner));
   }
 
   @Test
@@ -160,9 +157,25 @@ public class NameUtilsTest {
     };
 
     assertEquals("NamedInnerClass",
-        NameUtils.approximateSimpleName(anonymousClassObj.getInnerClassInstance().getClass()));
+        NameUtils.approximateSimpleName(anonymousClassObj.getInnerClassInstance()));
     assertEquals("NameUtilsTest.NamedInnerClass",
-        NameUtils.approximatePTransformName(
-            anonymousClassObj.getInnerClassInstance().getClass()));
+        NameUtils.approximatePTransformName(anonymousClassObj.getInnerClassInstance().getClass()));
+  }
+
+  @Test
+  public void testApproximateSimpleNameOverride() {
+    Object overriddenName = new NameOverride() {
+      @Override
+      public String getNameOverride() {
+        return "CUSTOM_NAME";
+      }
+    };
+    assertEquals("CUSTOM_NAME", NameUtils.approximateSimpleName(overriddenName));
+  }
+
+  @Test
+  public void testApproximateSimpleNameCustomAnonymous() {
+    Object overriddenName = new Object() {};
+    assertEquals("CUSTOM_NAME", NameUtils.approximateSimpleName(overriddenName, "CUSTOM_NAME"));
   }
 }