You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2017/01/03 21:27:59 UTC

[1/4] beam git commit: Move name utilities from StringUtils to NameUtils

Repository: beam
Updated Branches:
  refs/heads/master 0616245e6 -> 6c1e46976


Move name utilities from StringUtils to NameUtils

Add more exhaustive tests for NameUtils


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38d902be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38d902be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38d902be

Branch: refs/heads/master
Commit: 38d902be61f2dd05ebb2391c86629d29ed5596f8
Parents: 0616245
Author: bchambers <bc...@google.com>
Authored: Thu Dec 29 13:01:24 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Jan 3 13:15:26 2017 -0800

----------------------------------------------------------------------
 .../core/UnboundedReadFromBoundedSource.java    |   4 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   9 +-
 .../DataflowUnboundedReadFromBoundedSource.java |   6 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   5 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |   7 +-
 .../apache/beam/sdk/transforms/PTransform.java  |   4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   6 +-
 .../org/apache/beam/sdk/util/NameUtils.java     | 134 +++++++++++++++
 .../org/apache/beam/sdk/util/StringUtils.java   | 100 -----------
 .../org/apache/beam/sdk/values/PValueBase.java  |   4 +-
 .../org/apache/beam/sdk/util/NameUtilsTest.java | 168 +++++++++++++++++++
 .../apache/beam/sdk/util/StringUtilsTest.java   | 100 -----------
 12 files changed, 323 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index be1793c..645a411 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.core;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -50,6 +49,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -100,7 +100,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   public String getKindString() {
-    return "Read(" + approximateSimpleName(source.getClass()) + ")";
+    return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 29c0058..69c9c18 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -21,8 +21,6 @@ import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName;
-import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
 import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -143,6 +141,7 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.PathValidator;
 import org.apache.beam.sdk.util.PropertyNames;
@@ -2309,7 +2308,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     public String getKindString() {
-      return "Read(" + approximateSimpleName(source.getClass()) + ")";
+      return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
     }
 
     static {
@@ -2785,8 +2784,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           ? "streaming" : "batch";
       String name =
           transform == null
-              ? approximateSimpleName(doFn.getClass())
-              : approximatePTransformName(transform.getClass());
+              ? NameUtils.approximateSimpleName(doFn.getClass())
+              : NameUtils.approximatePTransformName(transform.getClass());
       throw new UnsupportedOperationException(
           String.format("The DataflowRunner in %s mode does not support %s.", mode, name));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index 65db817..db87e21 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.internal;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -50,6 +49,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -109,10 +109,10 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
     if (source.getClass().isAnonymousClass()) {
       sourceName = "AnonymousSource";
     } else {
-      sourceName = approximateSimpleName(source.getClass());
+      sourceName = NameUtils.approximateSimpleName(source.getClass());
     }
 
-    return "Read(" + sourceName + ")";
+    return String.format("Read(%s)", sourceName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index f568d86..ac84c5e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
-
 import com.google.api.client.util.BackOff;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
@@ -36,6 +34,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.ValueWithRecordId;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -118,7 +117,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   public String getKindString() {
-    return "Read(" + approximateSimpleName(source.getClass()) + ")";
+    return "Read(" + NameUtils.approximateSimpleName(source.getClass()) + ")";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 7ec3b0e..7404cba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -17,12 +17,11 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
-
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
@@ -118,7 +117,7 @@ public class Read {
 
     @Override
     public String getKindString() {
-      return "Read(" + approximateSimpleName(source.getClass()) + ")";
+      return "Read(" + NameUtils.approximateSimpleName(source.getClass()) + ")";
     }
 
     @Override
@@ -185,7 +184,7 @@ public class Read {
 
     @Override
     public String getKindString() {
-      return "Read(" + approximateSimpleName(source.getClass()) + ")";
+      return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index ce4891d..efe4339 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.util.StringUtils;
+import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.TypedPValue;
@@ -243,7 +243,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
     if (getClass().isAnonymousClass()) {
       return "AnonymousTransform";
     } else {
-      return StringUtils.approximatePTransformName(getClass());
+      return NameUtils.approximatePTransformName(getClass());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index f897f82..059effd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.StringUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -772,7 +772,7 @@ public class ParDo {
       if (clazz.isAnonymousClass()) {
         return "AnonymousParDo";
       } else {
-        return String.format("ParDo(%s)", StringUtils.approximateSimpleName(clazz));
+        return String.format("ParDo(%s)", NameUtils.approximateSimpleName(clazz));
       }
     }
 
@@ -994,7 +994,7 @@ public class ParDo {
       if (clazz.isAnonymousClass()) {
         return "AnonymousParMultiDo";
       } else {
-        return String.format("ParMultiDo(%s)", StringUtils.approximateSimpleName(clazz));
+        return String.format("ParMultiDo(%s)", NameUtils.approximateSimpleName(clazz));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
new file mode 100644
index 0000000..60a0e41
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Helpers for extracting the name of objects (most commonly {@link DoFn} and {@link CombineFn}).
+ */
+public class NameUtils {
+
+  private static final String[] STANDARD_NAME_SUFFIXES =
+      new String[]{"OldDoFn", "DoFn", "Fn"};
+
+  /**
+   * Pattern to match a non-anonymous inner class.
+   * Eg, matches "Foo$Bar", or even "Foo$1$Bar", but not "Foo$1" or "Foo$1$2".
+   */
+  private static final Pattern NAMED_INNER_CLASS =
+      Pattern.compile(".+\\$(?<INNER>[^0-9].*)");
+
+  private static final String ANONYMOUS_CLASS_REGEX = "\\$[0-9]+\\$";
+
+  private static String approximateSimpleName(Class<?> clazz, boolean dropOuterClassNames) {
+    checkArgument(!clazz.isAnonymousClass(),
+        "Attempted to get simple name of anonymous class");
+    return approximateSimpleName(clazz.getName(), dropOuterClassNames);
+  }
+
+  @VisibleForTesting
+  static String approximateSimpleName(String fullName, boolean dropOuterClassNames) {
+    String shortName = fullName.substring(fullName.lastIndexOf('.') + 1);
+
+    // Drop common suffixes for each named component.
+    String[] names = shortName.split("\\$");
+    for (int i = 0; i < names.length; i++) {
+      names[i] = simplifyNameComponent(names[i]);
+    }
+    shortName = Joiner.on('$').join(names);
+
+    if (dropOuterClassNames) {
+      // Simplify inner class name by dropping outer class prefixes.
+      Matcher m = NAMED_INNER_CLASS.matcher(shortName);
+      if (m.matches()) {
+        shortName = m.group("INNER");
+      }
+    } else {
+      // Dropping anonymous outer classes
+      shortName = shortName.replaceAll(ANONYMOUS_CLASS_REGEX, ".");
+      shortName = shortName.replaceAll("\\$", ".");
+    }
+    return shortName;
+  }
+
+
+  private static String simplifyNameComponent(String name) {
+    for (String suffix : STANDARD_NAME_SUFFIXES) {
+      if (name.endsWith(suffix) && name.length() > suffix.length()) {
+        return name.substring(0, name.length() - suffix.length());
+      }
+    }
+    return name;
+  }
+
+  /**
+   * Returns a simple name for a class.
+   *
+   * <p>Note: this is non-invertible - the name may be simplified to an
+   * extent that it cannot be mapped back to the original class.
+   *
+   * <p>This can be used to generate human-readable names. It
+   * removes the package and outer classes from the name,
+   * and removes common suffixes.
+   *
+   * <p>Examples:
+   * <ul>
+   *   <li>{@code some.package.Word.SummaryDoFn} becomes "Summary"
+   *   <li>{@code another.package.PairingFn} becomes "Pairing"
+   * </ul>
+   *
+   * @throws IllegalArgumentException if the class is anonymous
+   */
+  public static String approximateSimpleName(Class<?> clazz) {
+    return approximateSimpleName(clazz, /* dropOuterClassNames */ true);
+  }
+
+  /**
+   * Returns a name for a PTransform class.
+   *
+   * <p>This can be used to generate human-readable transform names. It
+   * removes the package from the name, and removes common suffixes.
+   *
+   * <p>It is different than approximateSimpleName:
+   * <ul>
+   *   <li>1. It keeps the outer classes names.
+   *   <li>2. It removes the common transform inner class: "Bound".
+   * </ul>
+   *
+   * <p>Examples:
+   * <ul>
+   *   <li>{@code some.package.Word.Summary} becomes "Word.Summary"
+   *   <li>{@code another.package.Pairing.Bound} becomes "Pairing"
+   * </ul>
+   */
+  public static String approximatePTransformName(Class<?> clazz) {
+    checkArgument(PTransform.class.isAssignableFrom(clazz));
+    return approximateSimpleName(clazz, /* dropOuterClassNames */ false)
+        .replaceFirst("\\.Bound$", "");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
index 3ff8448..993353d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
@@ -17,15 +17,10 @@
  */
 package org.apache.beam.sdk.util;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.base.Joiner;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.beam.sdk.transforms.PTransform;
 
 /**
  * Utilities for working with JSON and other human-readable string formats.
@@ -95,64 +90,6 @@ public class StringUtils {
     return byteArray;
   }
 
-  private static final String[] STANDARD_NAME_SUFFIXES =
-      new String[]{"OldDoFn", "DoFn", "Fn"};
-
-  /**
-   * Pattern to match a non-anonymous inner class.
-   * Eg, matches "Foo$Bar", or even "Foo$1$Bar", but not "Foo$1" or "Foo$1$2".
-   */
-  private static final Pattern NAMED_INNER_CLASS =
-      Pattern.compile(".+\\$(?<INNER>[^0-9].*)");
-
-  private static final String ANONYMOUS_CLASS_REGEX = "\\$[0-9]+\\$";
-
-  /**
-   * Returns a simple name for a class.
-   *
-   * <p>Note: this is non-invertible - the name may be simplified to an
-   * extent that it cannot be mapped back to the original class.
-   *
-   * <p>This can be used to generate human-readable names. It
-   * removes the package and outer classes from the name,
-   * and removes common suffixes.
-   *
-   * <p>Examples:
-   * <ul>
-   *   <li>{@code some.package.Word.SummaryDoFn} becomes "Summary"
-   *   <li>{@code another.package.PairingFn} becomes "Pairing"
-   * </ul>
-   *
-   * @throws IllegalArgumentException if the class is anonymous
-   */
-  public static String approximateSimpleName(Class<?> clazz) {
-    return approximateSimpleName(clazz, /* dropOuterClassNames */ true);
-  }
-
-  /**
-   * Returns a name for a PTransform class.
-   *
-   * <p>This can be used to generate human-readable transform names. It
-   * removes the package from the name, and removes common suffixes.
-   *
-   * <p>It is different than approximateSimpleName:
-   * <ul>
-   *   <li>1. It keeps the outer classes names.
-   *   <li>2. It removes the common transform inner class: "Bound".
-   * </ul>
-   *
-   * <p>Examples:
-   * <ul>
-   *   <li>{@code some.package.Word.Summary} becomes "Word.Summary"
-   *   <li>{@code another.package.Pairing.Bound} becomes "Pairing"
-   * </ul>
-   */
-  public static String approximatePTransformName(Class<?> clazz) {
-    checkArgument(PTransform.class.isAssignableFrom(clazz));
-    return approximateSimpleName(clazz, /* dropOuterClassNames */ false)
-        .replaceFirst("\\.Bound$", "");
-  }
-
   /**
    * Calculate the Levenshtein distance between two strings.
    *
@@ -204,41 +141,4 @@ public class StringUtils {
 
     return v1[t.length()];
   }
-
-  private static String approximateSimpleName(Class<?> clazz, boolean dropOuterClassNames) {
-    checkArgument(!clazz.isAnonymousClass(),
-        "Attempted to get simple name of anonymous class");
-
-    String fullName = clazz.getName();
-    String shortName = fullName.substring(fullName.lastIndexOf('.') + 1);
-
-    // Drop common suffixes for each named component.
-    String[] names = shortName.split("\\$");
-    for (int i = 0; i < names.length; i++) {
-      names[i] = simplifyNameComponent(names[i]);
-    }
-    shortName = Joiner.on('$').join(names);
-
-    if (dropOuterClassNames) {
-      // Simplify inner class name by dropping outer class prefixes.
-      Matcher m = NAMED_INNER_CLASS.matcher(shortName);
-      if (m.matches()) {
-        shortName = m.group("INNER");
-      }
-    } else {
-      // Dropping anonymous outer classes
-      shortName = shortName.replaceAll(ANONYMOUS_CLASS_REGEX, ".");
-      shortName = shortName.replaceAll("\\$", ".");
-    }
-    return shortName;
-  }
-
-  private static String simplifyNameComponent(String name) {
-    for (String suffix : STANDARD_NAME_SUFFIXES) {
-      if (name.endsWith(suffix) && name.length() > suffix.length()) {
-        return name.substring(0, name.length() - suffix.length());
-      }
-    }
-    return name;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 7b44737..8778597 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -22,7 +22,7 @@ import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.StringUtils;
+import org.apache.beam.sdk.util.NameUtils;
 
 /**
  * A {@link PValueBase} is an abstract base class that provides
@@ -155,6 +155,6 @@ public abstract class PValueBase extends POutputValueBase implements PValue {
    * <p>By default, uses the base name of the current class as its kind string.
    */
   protected String getKindString() {
-    return StringUtils.approximateSimpleName(getClass());
+    return NameUtils.approximateSimpleName(getClass());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
new file mode 100644
index 0000000..b35e942
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PDone;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link NameUtils}.
+ */
+@RunWith(JUnit4.class)
+public class NameUtilsTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testDropsStandardSuffixes() {
+    assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", true));
+    assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", true));
+    assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", true));
+
+    assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", false));
+    assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", false));
+    assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", false));
+  }
+
+  @Test
+  public void testDropsStandardSuffixesInAllComponents() {
+    assertEquals("Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", true));
+    assertEquals("Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", true));
+    assertEquals("Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", true));
+
+    assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", false));
+    assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", false));
+    assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", false));
+  }
+
+  @Test
+  public void testDropsOuterClassNamesTrue() {
+    assertEquals("Bar", NameUtils.approximateSimpleName("Foo$1$Bar", true));
+    assertEquals("Foo$1", NameUtils.approximateSimpleName("Foo$1", true));
+    assertEquals("Foo$1$2", NameUtils.approximateSimpleName("Foo$1$2", true));
+  }
+
+  @Test
+  public void testDropsOuterClassNamesFalse() {
+    assertEquals("Foo.Bar", NameUtils.approximateSimpleName("Foo$1$Bar", false));
+    assertEquals("Foo.1", NameUtils.approximateSimpleName("Foo$1", false));
+    assertEquals("Foo.2", NameUtils.approximateSimpleName("Foo$1$2", false));
+  }
+
+  /**
+   * Inner class for simple name test.
+   */
+  private class EmbeddedOldDoFn {
+
+    private class DeeperEmbeddedOldDoFn extends EmbeddedOldDoFn {}
+
+    private EmbeddedOldDoFn getEmbedded() {
+      return new DeeperEmbeddedOldDoFn();
+    }
+  }
+
+  private class EmbeddedPTransform extends PTransform<PBegin, PDone> {
+    @Override
+    public PDone expand(PBegin begin) {
+      throw new IllegalArgumentException("Should never be applied");
+    }
+
+    private class Bound extends PTransform<PBegin, PDone> {
+      @Override
+      public PDone expand(PBegin begin) {
+        throw new IllegalArgumentException("Should never be applied");
+      }
+    }
+
+    private Bound getBound() {
+      return new Bound();
+    }
+  }
+
+  private interface AnonymousClass {
+    Object getInnerClassInstance();
+  }
+
+  @Test
+  public void testSimpleName() {
+    assertEquals("Embedded", NameUtils.approximateSimpleName(EmbeddedOldDoFn.class));
+  }
+
+  @Test
+  public void testAnonSimpleName() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+
+    EmbeddedOldDoFn anon = new EmbeddedOldDoFn(){};
+
+    NameUtils.approximateSimpleName(anon.getClass());
+  }
+
+  @Test
+  public void testNestedSimpleName() {
+    EmbeddedOldDoFn fn = new EmbeddedOldDoFn();
+    EmbeddedOldDoFn inner = fn.getEmbedded();
+
+    assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner.getClass()));
+  }
+
+  @Test
+  public void testPTransformName() {
+    EmbeddedPTransform transform = new EmbeddedPTransform();
+    assertEquals(
+        "NameUtilsTest.EmbeddedPTransform",
+        NameUtils.approximatePTransformName(transform.getClass()));
+    assertEquals(
+        "NameUtilsTest.EmbeddedPTransform",
+        NameUtils.approximatePTransformName(transform.getBound().getClass()));
+    assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.Bound.class));
+  }
+
+  @Test
+  public void testPTransformNameWithAnonOuterClass() throws Exception {
+    AnonymousClass anonymousClassObj = new AnonymousClass() {
+      class NamedInnerClass extends PTransform<PBegin, PDone> {
+        @Override
+        public PDone expand(PBegin begin) {
+          throw new IllegalArgumentException("Should never be applied");
+        }
+      }
+
+      @Override
+      public Object getInnerClassInstance() {
+        return new NamedInnerClass();
+      }
+    };
+
+    assertEquals("NamedInnerClass",
+        NameUtils.approximateSimpleName(anonymousClassObj.getInnerClassInstance().getClass()));
+    assertEquals("NameUtilsTest.NamedInnerClass",
+        NameUtils.approximatePTransformName(
+            anonymousClassObj.getInnerClassInstance().getClass()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/38d902be/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
index 1ac176b..df6a300 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
@@ -20,13 +20,7 @@ package org.apache.beam.sdk.util;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PDone;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -35,8 +29,6 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class StringUtilsTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void testTranscodeEmptyByteArray() {
@@ -55,98 +47,6 @@ public class StringUtilsTest {
     assertArrayEquals(bytes, StringUtils.jsonStringToByteArray(string));
   }
 
-  /**
-   * Inner class for simple name test.
-   */
-  private class EmbeddedOldDoFn {
-
-    private class DeeperEmbeddedOldDoFn extends EmbeddedOldDoFn {}
-
-    private EmbeddedOldDoFn getEmbedded() {
-      return new DeeperEmbeddedOldDoFn();
-    }
-  }
-
-  private class EmbeddedPTransform extends PTransform<PBegin, PDone> {
-    @Override
-    public PDone expand(PBegin begin) {
-      throw new IllegalArgumentException("Should never be applied");
-    }
-
-    private class Bound extends PTransform<PBegin, PDone> {
-      @Override
-      public PDone expand(PBegin begin) {
-        throw new IllegalArgumentException("Should never be applied");
-      }
-    }
-
-    private Bound getBound() {
-      return new Bound();
-    }
-  }
-
-  private interface AnonymousClass {
-    Object getInnerClassInstance();
-  }
-
-  @Test
-  public void testSimpleName() {
-    assertEquals("Embedded",
-        StringUtils.approximateSimpleName(EmbeddedOldDoFn.class));
-  }
-
-  @Test
-  public void testAnonSimpleName() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-
-    EmbeddedOldDoFn anon = new EmbeddedOldDoFn(){};
-
-    StringUtils.approximateSimpleName(anon.getClass());
-  }
-
-  @Test
-  public void testNestedSimpleName() {
-    EmbeddedOldDoFn fn = new EmbeddedOldDoFn();
-    EmbeddedOldDoFn inner = fn.getEmbedded();
-
-    assertEquals("DeeperEmbedded", StringUtils.approximateSimpleName(inner.getClass()));
-  }
-
-  @Test
-  public void testPTransformName() {
-    EmbeddedPTransform transform = new EmbeddedPTransform();
-    assertEquals(
-        "StringUtilsTest.EmbeddedPTransform",
-        StringUtils.approximatePTransformName(transform.getClass()));
-    assertEquals(
-        "StringUtilsTest.EmbeddedPTransform",
-        StringUtils.approximatePTransformName(transform.getBound().getClass()));
-    assertEquals("TextIO.Write", StringUtils.approximatePTransformName(TextIO.Write.Bound.class));
-  }
-
-  @Test
-  public void testPTransformNameWithAnonOuterClass() throws Exception {
-    AnonymousClass anonymousClassObj = new AnonymousClass() {
-      class NamedInnerClass extends PTransform<PBegin, PDone> {
-        @Override
-        public PDone expand(PBegin begin) {
-          throw new IllegalArgumentException("Should never be applied");
-        }
-      }
-
-      @Override
-      public Object getInnerClassInstance() {
-        return new NamedInnerClass();
-      }
-    };
-
-    assertEquals("NamedInnerClass",
-        StringUtils.approximateSimpleName(anonymousClassObj.getInnerClassInstance().getClass()));
-    assertEquals("StringUtilsTest.NamedInnerClass",
-        StringUtils.approximatePTransformName(
-            anonymousClassObj.getInnerClassInstance().getClass()));
-  }
-
   @Test
   public void testLevenshteinDistance() {
     assertEquals(0, StringUtils.getLevenshteinDistance("", "")); // equal


[2/4] beam git commit: Remove unused name fields from ParDo constructors

Posted by bc...@apache.org.
Remove unused name fields from ParDo constructors


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5a3f759
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5a3f759
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5a3f759

Branch: refs/heads/master
Commit: e5a3f75988ae30944817d00b035c46a92f63ee83
Parents: 38d902b
Author: bchambers <bc...@google.com>
Authored: Thu Dec 29 13:42:30 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Jan 3 13:15:36 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/ParDo.java   | 30 ++++++--------------
 1 file changed, 8 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e5a3f759/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 059effd..7e54a54 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -594,15 +594,13 @@ public class ParDo {
    * bind the input/output types of this {@link PTransform}.
    */
   public static class Unbound {
-    private final String name;
     private final List<PCollectionView<?>> sideInputs;
 
     Unbound() {
-      this(null, ImmutableList.<PCollectionView<?>>of());
+      this(ImmutableList.<PCollectionView<?>>of());
     }
 
-    Unbound(String name, List<PCollectionView<?>> sideInputs) {
-      this.name = name;
+    Unbound(List<PCollectionView<?>> sideInputs) {
       this.sideInputs = sideInputs;
     }
 
@@ -632,7 +630,7 @@ public class ParDo {
       ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder();
       builder.addAll(this.sideInputs);
       builder.addAll(sideInputs);
-      return new Unbound(name, builder.build());
+      return new Unbound(builder.build());
     }
 
     /**
@@ -658,12 +656,12 @@ public class ParDo {
      */
     public <OutputT> UnboundMulti<OutputT> withOutputTags(
         TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
-      return new UnboundMulti<>(name, sideInputs, mainOutputTag, sideOutputTags);
+      return new UnboundMulti<>(sideInputs, mainOutputTag, sideOutputTags);
     }
 
     private <InputT, OutputT> Bound<InputT, OutputT> of(
         DoFn<InputT, OutputT> doFn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
-      return new Bound<>(name, doFn, sideInputs, fnDisplayData);
+      return new Bound<>(doFn, sideInputs, fnDisplayData);
     }
   }
 
@@ -681,17 +679,14 @@ public class ParDo {
    */
   public static class Bound<InputT, OutputT>
       extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-    // Inherits name.
     private final List<PCollectionView<?>> sideInputs;
     private final DoFn<InputT, OutputT> fn;
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
 
     Bound(
-        String name,
         DoFn<InputT, OutputT> fn,
         List<PCollectionView<?>> sideInputs,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
-      super(name);
       this.fn = SerializableUtils.clone(fn);
       this.fnDisplayData = fnDisplayData;
       this.sideInputs = sideInputs;
@@ -720,7 +715,6 @@ public class ParDo {
     public Bound<InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
       return new Bound<>(
-          name,
           fn,
           ImmutableList.<PCollectionView<?>>builder()
               .addAll(this.sideInputs)
@@ -739,7 +733,7 @@ public class ParDo {
      */
     public BoundMulti<InputT, OutputT> withOutputTags(
         TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
-      return new BoundMulti<>(name, fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData);
+      return new BoundMulti<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData);
     }
 
     @Override
@@ -809,16 +803,13 @@ public class ParDo {
    * @param <OutputT> the type of the main output {@code PCollection} elements
    */
   public static class UnboundMulti<OutputT> {
-    private final String name;
     private final List<PCollectionView<?>> sideInputs;
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList sideOutputTags;
 
-    UnboundMulti(String name,
-                 List<PCollectionView<?>> sideInputs,
+    UnboundMulti(List<PCollectionView<?>> sideInputs,
                  TupleTag<OutputT> mainOutputTag,
                  TupleTagList sideOutputTags) {
-      this.name = name;
       this.sideInputs = sideInputs;
       this.mainOutputTag = mainOutputTag;
       this.sideOutputTags = sideOutputTags;
@@ -850,7 +841,6 @@ public class ParDo {
     public UnboundMulti<OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
       return new UnboundMulti<>(
-          name,
           ImmutableList.<PCollectionView<?>>builder()
               .addAll(this.sideInputs)
               .addAll(sideInputs)
@@ -874,7 +864,7 @@ public class ParDo {
 
     private <InputT> BoundMulti<InputT, OutputT> of(
         DoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
-      return new BoundMulti<>(name, fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData);
+      return new BoundMulti<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData);
     }
   }
 
@@ -891,7 +881,6 @@ public class ParDo {
    */
   public static class BoundMulti<InputT, OutputT>
       extends PTransform<PCollection<? extends InputT>, PCollectionTuple> {
-    // Inherits name.
     private final List<PCollectionView<?>> sideInputs;
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList sideOutputTags;
@@ -899,13 +888,11 @@ public class ParDo {
     private final DoFn<InputT, OutputT> fn;
 
     BoundMulti(
-        String name,
         DoFn<InputT, OutputT> fn,
         List<PCollectionView<?>> sideInputs,
         TupleTag<OutputT> mainOutputTag,
         TupleTagList sideOutputTags,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
-      super(name);
       this.sideInputs = sideInputs;
       this.mainOutputTag = mainOutputTag;
       this.sideOutputTags = sideOutputTags;
@@ -937,7 +924,6 @@ public class ParDo {
     public BoundMulti<InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
       return new BoundMulti<>(
-          name,
           fn,
           ImmutableList.<PCollectionView<?>>builder()
               .addAll(this.sideInputs)


[4/4] beam git commit: Closes #1718

Posted by bc...@apache.org.
Closes #1718


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c1e4697
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c1e4697
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c1e4697

Branch: refs/heads/master
Commit: 6c1e4697640ab2a5053adb96083918a39fb3b744
Parents: 0616245 16b2667
Author: bchambers <bc...@google.com>
Authored: Tue Jan 3 13:15:50 2017 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Jan 3 13:15:50 2017 -0800

----------------------------------------------------------------------
 .../core/UnboundedReadFromBoundedSource.java    |   4 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   9 +-
 .../DataflowUnboundedReadFromBoundedSource.java |  11 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   5 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |   7 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 175 +++++++++---------
 .../org/apache/beam/sdk/transforms/Count.java   |   4 +-
 .../org/apache/beam/sdk/transforms/Max.java     |  20 +-
 .../org/apache/beam/sdk/transforms/Mean.java    |   4 +-
 .../org/apache/beam/sdk/transforms/Min.java     |  20 +-
 .../apache/beam/sdk/transforms/PTransform.java  |   4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  46 ++---
 .../org/apache/beam/sdk/transforms/Sum.java     |  12 +-
 .../org/apache/beam/sdk/transforms/Top.java     |  27 +--
 .../org/apache/beam/sdk/util/NameUtils.java     | 162 +++++++++++++++++
 .../org/apache/beam/sdk/util/StringUtils.java   | 100 ----------
 .../org/apache/beam/sdk/values/PValueBase.java  |   4 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  23 +--
 .../apache/beam/sdk/transforms/CountTest.java   |   2 +-
 .../org/apache/beam/sdk/transforms/MaxTest.java |  14 +-
 .../apache/beam/sdk/transforms/MeanTest.java    |   5 +-
 .../org/apache/beam/sdk/transforms/MinTest.java |  15 +-
 .../org/apache/beam/sdk/transforms/SumTest.java |  12 +-
 .../org/apache/beam/sdk/transforms/TopTest.java |  13 +-
 .../org/apache/beam/sdk/util/NameUtilsTest.java | 181 +++++++++++++++++++
 .../apache/beam/sdk/util/StringUtilsTest.java   | 100 ----------
 26 files changed, 544 insertions(+), 435 deletions(-)
----------------------------------------------------------------------



[3/4] beam git commit: Remove .named from Combine

Posted by bc...@apache.org.
Remove .named from Combine

Introduces a NameOverride interface that allows some classes to define
custom behavior for getting the name. This is necessary for
parameterized CombineFns to expose details about their parameter values.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16b26673
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16b26673
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16b26673

Branch: refs/heads/master
Commit: 16b266738ed46966400bf0ad1807359a5f763419
Parents: e5a3f75
Author: bchambers <bc...@google.com>
Authored: Thu Dec 29 13:10:13 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Jan 3 13:15:45 2017 -0800

----------------------------------------------------------------------
 .../core/UnboundedReadFromBoundedSource.java    |   2 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   4 +-
 .../DataflowUnboundedReadFromBoundedSource.java |   9 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   2 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |   4 +-
 .../org/apache/beam/sdk/transforms/Combine.java | 175 ++++++++++---------
 .../org/apache/beam/sdk/transforms/Count.java   |   4 +-
 .../org/apache/beam/sdk/transforms/Max.java     |  20 +--
 .../org/apache/beam/sdk/transforms/Mean.java    |   4 +-
 .../org/apache/beam/sdk/transforms/Min.java     |  20 +--
 .../org/apache/beam/sdk/transforms/ParDo.java   |  14 +-
 .../org/apache/beam/sdk/transforms/Sum.java     |  12 +-
 .../org/apache/beam/sdk/transforms/Top.java     |  27 +--
 .../org/apache/beam/sdk/util/NameUtils.java     |  40 ++++-
 .../apache/beam/sdk/transforms/CombineTest.java |  23 +--
 .../apache/beam/sdk/transforms/CountTest.java   |   2 +-
 .../org/apache/beam/sdk/transforms/MaxTest.java |  14 +-
 .../apache/beam/sdk/transforms/MeanTest.java    |   5 +-
 .../org/apache/beam/sdk/transforms/MinTest.java |  15 +-
 .../org/apache/beam/sdk/transforms/SumTest.java |  12 +-
 .../org/apache/beam/sdk/transforms/TopTest.java |  13 +-
 .../org/apache/beam/sdk/util/NameUtilsTest.java |  33 ++--
 22 files changed, 239 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 645a411..3073076 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -100,7 +100,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   public String getKindString() {
-    return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
+    return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 69c9c18..03e5dfc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2308,7 +2308,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     public String getKindString() {
-      return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
+      return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
     }
 
     static {
@@ -2784,7 +2784,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           ? "streaming" : "batch";
       String name =
           transform == null
-              ? NameUtils.approximateSimpleName(doFn.getClass())
+              ? NameUtils.approximateSimpleName(doFn)
               : NameUtils.approximatePTransformName(transform.getClass());
       throw new UnsupportedOperationException(
           String.format("The DataflowRunner in %s mode does not support %s.", mode, name));

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index db87e21..a2ae799 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -105,14 +105,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
 
   @Override
   public String getKindString() {
-    String sourceName;
-    if (source.getClass().isAnonymousClass()) {
-      sourceName = "AnonymousSource";
-    } else {
-      sourceName = NameUtils.approximateSimpleName(source.getClass());
-    }
-
-    return String.format("Read(%s)", sourceName);
+    return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index ac84c5e..8b63bfd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -117,7 +117,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
   @Override
   public String getKindString() {
-    return "Read(" + NameUtils.approximateSimpleName(source.getClass()) + ")";
+    return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 7404cba..0e269a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -117,7 +117,7 @@ public class Read {
 
     @Override
     public String getKindString() {
-      return "Read(" + NameUtils.approximateSimpleName(source.getClass()) + ")";
+      return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
     }
 
     @Override
@@ -184,7 +184,7 @@ public class Read {
 
     @Override
     public String getKindString() {
-      return String.format("Read(%s)", NameUtils.approximateSimpleName(source.getClass()));
+      return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 3b07260..92c04ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -58,6 +58,8 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.NameUtils.NameOverride;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -473,56 +475,72 @@ public class Combine {
     @Override
     public <K> KeyedCombineFn<K, InputT, AccumT, OutputT> asKeyedFn() {
       // The key, an object, is never even looked at.
-      return new KeyedCombineFn<K, InputT, AccumT, OutputT>() {
-        @Override
-        public AccumT createAccumulator(K key) {
-          return CombineFn.this.createAccumulator();
-        }
+      return new KeyIgnoringCombineFn<>(this);
+    }
 
-        @Override
-        public AccumT addInput(K key, AccumT accumulator, InputT input) {
-          return CombineFn.this.addInput(accumulator, input);
-        }
+    private static class KeyIgnoringCombineFn<K, InputT, AccumT, OutputT>
+        extends KeyedCombineFn<K, InputT, AccumT, OutputT>
+        implements NameOverride {
 
-        @Override
-        public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
-          return CombineFn.this.mergeAccumulators(accumulators);
-        }
+      private final CombineFn<InputT, AccumT, OutputT> fn;
 
-        @Override
-        public OutputT extractOutput(K key, AccumT accumulator) {
-          return CombineFn.this.extractOutput(accumulator);
-        }
+      private KeyIgnoringCombineFn(CombineFn<InputT, AccumT, OutputT> fn) {
+        this.fn = fn;
+      }
 
-        @Override
-        public AccumT compact(K key, AccumT accumulator) {
-          return CombineFn.this.compact(accumulator);
-        }
+      @Override
+      public AccumT createAccumulator(K key) {
+        return fn.createAccumulator();
+      }
 
-        @Override
-        public Coder<AccumT> getAccumulatorCoder(
-            CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return CombineFn.this.getAccumulatorCoder(registry, inputCoder);
-        }
+      @Override
+      public AccumT addInput(K key, AccumT accumulator, InputT input) {
+        return fn.addInput(accumulator, input);
+      }
 
-        @Override
-        public Coder<OutputT> getDefaultOutputCoder(
-            CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
-            throws CannotProvideCoderException {
-          return CombineFn.this.getDefaultOutputCoder(registry, inputCoder);
-        }
+      @Override
+      public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) {
+        return fn.mergeAccumulators(accumulators);
+      }
 
-        @Override
-        public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
-          return CombineFn.this;
-        }
+      @Override
+      public OutputT extractOutput(K key, AccumT accumulator) {
+        return fn.extractOutput(accumulator);
+      }
 
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          builder.delegate(CombineFn.this);
-        }
-      };
+      @Override
+      public AccumT compact(K key, AccumT accumulator) {
+        return fn.compact(accumulator);
+      }
+
+      @Override
+      public Coder<AccumT> getAccumulatorCoder(
+          CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
+          throws CannotProvideCoderException {
+        return fn.getAccumulatorCoder(registry, inputCoder);
+      }
+
+      @Override
+      public Coder<OutputT> getDefaultOutputCoder(
+          CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder)
+          throws CannotProvideCoderException {
+        return fn.getDefaultOutputCoder(registry, inputCoder);
+      }
+
+      @Override
+      public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
+        return fn;
+      }
+
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.delegate(fn);
+      }
+
+      @Override
+      public String getNameOverride() {
+        return NameUtils.approximateSimpleName(fn);
+      }
     }
   }
 
@@ -1338,20 +1356,9 @@ public class Combine {
       this.sideInputs = ImmutableList.of();
     }
 
-    private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
-      super(name);
-      this.fn = fn;
-      this.fnDisplayData = fnDisplayData;
-      this.insertDefault = insertDefault;
-      this.fanout = fanout;
-      this.sideInputs = ImmutableList.of();
-    }
-
-    private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
+    private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout,
         List<PCollectionView<?>> sideInputs) {
-      super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.insertDefault = insertDefault;
@@ -1359,12 +1366,9 @@ public class Combine {
       this.sideInputs = sideInputs;
     }
 
-    /**
-     * Return a new {@code Globally} transform that's like this transform but with the
-     * specified name. Does not modify this transform.
-     */
-    public Globally<InputT, OutputT> named(String name) {
-      return new Globally<>(name, fn, fnDisplayData, insertDefault, fanout);
+    @Override
+    protected String getKindString() {
+      return String.format("Combine.globally(%s)", NameUtils.approximateSimpleName(fn));
     }
 
     /**
@@ -1384,7 +1388,7 @@ public class Combine {
      * is not globally windowed and the output is not being used as a side input.
      */
     public Globally<InputT, OutputT> withoutDefaults() {
-      return new Globally<>(name, fn, fnDisplayData, false, fanout);
+      return new Globally<>(fn, fnDisplayData, false, fanout);
     }
 
     /**
@@ -1395,7 +1399,7 @@ public class Combine {
      * that will be used.
      */
     public Globally<InputT, OutputT> withFanout(int fanout) {
-      return new Globally<>(name, fn, fnDisplayData, insertDefault, fanout);
+      return new Globally<>(fn, fnDisplayData, insertDefault, fanout);
     }
 
     /**
@@ -1405,7 +1409,7 @@ public class Combine {
     public Globally<InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
       checkState(fn instanceof RequiresContextInternal);
-      return new Globally<>(name, fn, fnDisplayData, insertDefault, fanout,
+      return new Globally<>(fn, fnDisplayData, insertDefault, fanout,
           ImmutableList.copyOf(sideInputs));
     }
 
@@ -1613,7 +1617,9 @@ public class Combine {
    * {@link #perKey(SerializableFunction)}, and
    * {@link #groupedValues(SerializableFunction)}.
    */
-  public static class IterableCombineFn<V> extends CombineFn<V, List<V>, V> {
+  public static class IterableCombineFn<V>
+      extends CombineFn<V, List<V>, V>
+      implements NameOverride {
     /**
      * Returns a {@code CombineFn} that uses the given
      * {@code SerializableFunction} to combine values.
@@ -1693,6 +1699,11 @@ public class Combine {
       singleton.add(combiner.apply(values));
       return singleton;
     }
+
+    @Override
+    public String getNameOverride() {
+      return NameUtils.approximateSimpleName(combiner);
+    }
   }
 
   /**
@@ -1774,33 +1785,19 @@ public class Combine {
       this.sideInputs = ImmutableList.of();
     }
 
-    private PerKey(String name,
+    private PerKey(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         boolean fewKeys, List<PCollectionView<?>> sideInputs) {
-      super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.fewKeys = fewKeys;
       this.sideInputs = sideInputs;
     }
 
-    private PerKey(
-        String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
-      super(name);
-      this.fn = fn;
-      this.fnDisplayData = fnDisplayData;
-      this.fewKeys = fewKeys;
-      this.sideInputs = ImmutableList.of();
-    }
-
-    /**
-     * Return a new {@code Globally} transform that's like this transform but with the
-     * specified name. Does not modify this transform.
-     */
-    public PerKey<K, InputT, OutputT> named(String name) {
-      return new PerKey<>(name, fn, fnDisplayData, fewKeys);
+    @Override
+    protected String getKindString() {
+      return String.format("Combine.perKey(%s)", NameUtils.approximateSimpleName(fn));
     }
 
     /**
@@ -1810,7 +1807,7 @@ public class Combine {
     public PerKey<K, InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
       checkState(fn instanceof RequiresContextInternal);
-      return new PerKey<>(name, fn, fnDisplayData, fewKeys,
+      return new PerKey<>(fn, fnDisplayData, fewKeys,
           ImmutableList.copyOf(sideInputs));
     }
 
@@ -1827,7 +1824,7 @@ public class Combine {
      */
     public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(
         SerializableFunction<? super K, Integer> hotKeyFanout) {
-      return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData, hotKeyFanout);
+      return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData, hotKeyFanout);
     }
 
     /**
@@ -1835,7 +1832,7 @@ public class Combine {
      * constant value for every key.
      */
     public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) {
-      return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData,
+      return new PerKeyWithHotKeyFanout<>(fn, fnDisplayData,
           new SimpleFunction<K, Integer>() {
             @Override
             public void populateDisplayData(Builder builder) {
@@ -1890,17 +1887,21 @@ public class Combine {
     private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final SerializableFunction<? super K, Integer> hotKeyFanout;
 
-    private PerKeyWithHotKeyFanout(String name,
+    private PerKeyWithHotKeyFanout(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         SerializableFunction<? super K, Integer> hotKeyFanout) {
-      super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.hotKeyFanout = hotKeyFanout;
     }
 
     @Override
+    protected String getKindString() {
+      return String.format("Combine.perKeyWithFanout(%s)", NameUtils.approximateSimpleName(fn));
+    }
+
+    @Override
     public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
       return applyHelper(input);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 9101996..d164978 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -51,7 +51,7 @@ public class Count {
    * its input {@link PCollection}.
    */
   public static <T> Combine.Globally<T, Long> globally() {
-    return Combine.globally(new CountFn<T>()).named("Count.Globally");
+    return Combine.globally(new CountFn<T>());
   }
 
   /**
@@ -59,7 +59,7 @@ public class Count {
    * associated with each key of its input {@link PCollection}.
    */
   public static <K, V> Combine.PerKey<K, V, Long> perKey() {
-    return Combine.<K, V, Long>perKey(new CountFn<V>()).named("Count.PerKey");
+    return Combine.<K, V, Long>perKey(new CountFn<V>());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index c44d9b6..0990ca4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -52,7 +52,7 @@ public class Max {
    * elements, or {@code Integer.MIN_VALUE} if there are no elements.
    */
   public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new MaxIntegerFn()).named("Max.Globally");
+    return Combine.globally(new MaxIntegerFn());
   }
 
   /**
@@ -64,7 +64,7 @@ public class Max {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new MaxIntegerFn()).named("Max.PerKey");
+    return Combine.<K, Integer, Integer>perKey(new MaxIntegerFn());
   }
 
   /**
@@ -73,7 +73,7 @@ public class Max {
    * or {@code Long.MIN_VALUE} if there are no elements.
    */
   public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new MaxLongFn()).named("Max.Globally");
+    return Combine.globally(new MaxLongFn());
   }
 
   /**
@@ -85,7 +85,7 @@ public class Max {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-    return Combine.<K, Long, Long>perKey(new MaxLongFn()).named("Max.PerKey");
+    return Combine.<K, Long, Long>perKey(new MaxLongFn());
   }
 
   /**
@@ -94,7 +94,7 @@ public class Max {
    * elements, or {@code Double.NEGATIVE_INFINITY} if there are no elements.
    */
   public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new MaxDoubleFn()).named("Max.Globally");
+    return Combine.globally(new MaxDoubleFn());
   }
 
   /**
@@ -106,7 +106,7 @@ public class Max {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new MaxDoubleFn()).named("Max.PerKey");
+    return Combine.<K, Double, Double>perKey(new MaxDoubleFn());
   }
 
   /**
@@ -116,7 +116,7 @@ public class Max {
    */
   public static <T extends Comparable<? super T>>
   Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MaxFn.<T>naturalOrder()).named("Max.Globally");
+    return Combine.<T, T>globally(MaxFn.<T>naturalOrder());
   }
 
   /**
@@ -129,7 +129,7 @@ public class Max {
    */
   public static <K, T extends Comparable<? super T>>
   Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder()).named("Max.PerKey");
+    return Combine.<K, T, T>perKey(MaxFn.<T>naturalOrder());
   }
 
   /**
@@ -139,7 +139,7 @@ public class Max {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MaxFn.of(comparator)).named("Max.Globally");
+    return Combine.<T, T>globally(MaxFn.of(comparator));
   }
 
   /**
@@ -151,7 +151,7 @@ public class Max {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MaxFn.of(comparator)).named("Max.PerKey");
+    return Combine.<K, T, T>perKey(MaxFn.of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 9eea3a0..cb77ba3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -64,7 +64,7 @@ public class Mean {
    * @param <NumT> the type of the {@code Number}s being combined
    */
   public static <NumT extends Number> Combine.Globally<NumT, Double> globally() {
-    return Combine.<NumT, Double>globally(new MeanFn<>()).named("Mean.Globally");
+    return Combine.<NumT, Double>globally(new MeanFn<>());
   }
 
   /**
@@ -81,7 +81,7 @@ public class Mean {
    * @param <NumT> the type of the {@code Number}s being combined
    */
   public static <K, NumT extends Number> Combine.PerKey<K, NumT, Double> perKey() {
-    return Combine.<K, NumT, Double>perKey(new MeanFn<>()).named("Mean.PerKey");
+    return Combine.<K, NumT, Double>perKey(new MeanFn<>());
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index f046779..5003594 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -52,7 +52,7 @@ public class Min {
    * {@code PCollection}'s elements, or {@code Integer.MAX_VALUE} if there are no elements.
    */
   public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new MinIntegerFn()).named("Min.Globally");
+    return Combine.globally(new MinIntegerFn());
   }
 
   /**
@@ -64,7 +64,7 @@ public class Min {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new MinIntegerFn()).named("Min.PerKey");
+    return Combine.<K, Integer, Integer>perKey(new MinIntegerFn());
   }
 
   /**
@@ -73,7 +73,7 @@ public class Min {
    * or {@code Long.MAX_VALUE} if there are no elements.
    */
   public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new MinLongFn()).named("Min.Globally");
+    return Combine.globally(new MinLongFn());
   }
 
   /**
@@ -85,7 +85,7 @@ public class Min {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-   return Combine.<K, Long, Long>perKey(new MinLongFn()).named("Min.PerKey");
+   return Combine.<K, Long, Long>perKey(new MinLongFn());
   }
 
   /**
@@ -94,7 +94,7 @@ public class Min {
    * elements, or {@code Double.POSITIVE_INFINITY} if there are no elements.
    */
   public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new MinDoubleFn()).named("Min.Globally");
+    return Combine.globally(new MinDoubleFn());
   }
 
   /**
@@ -106,7 +106,7 @@ public class Min {
    * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing.
    */
   public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new MinDoubleFn()).named("Min.PerKey");
+    return Combine.<K, Double, Double>perKey(new MinDoubleFn());
   }
 
   /**
@@ -116,7 +116,7 @@ public class Min {
    */
   public static <T extends Comparable<? super T>>
   Combine.Globally<T, T> globally() {
-    return Combine.<T, T>globally(MinFn.<T>naturalOrder()).named("Min.Globally");
+    return Combine.<T, T>globally(MinFn.<T>naturalOrder());
   }
 
   /**
@@ -129,7 +129,7 @@ public class Min {
    */
   public static <K, T extends Comparable<? super T>>
   Combine.PerKey<K, T, T> perKey() {
-    return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder()).named("Min.PerKey");
+    return Combine.<K, T, T>perKey(MinFn.<T>naturalOrder());
   }
 
   /**
@@ -139,7 +139,7 @@ public class Min {
    */
   public static <T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.Globally<T, T> globally(ComparatorT comparator) {
-    return Combine.<T, T>globally(MinFn.of(comparator)).named("Min.Globally");
+    return Combine.<T, T>globally(MinFn.of(comparator));
   }
 
   /**
@@ -151,7 +151,7 @@ public class Min {
    */
   public static <K, T, ComparatorT extends Comparator<? super T> & Serializable>
   Combine.PerKey<K, T, T> perKey(ComparatorT comparator) {
-    return Combine.<K, T, T>perKey(MinFn.of(comparator)).named("Min.PerKey");
+    return Combine.<K, T, T>perKey(MinFn.of(comparator));
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 7e54a54..5b4fa19 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -762,12 +762,7 @@ public class ParDo {
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = getFn().getClass();
-      if (clazz.isAnonymousClass()) {
-        return "AnonymousParDo";
-      } else {
-        return String.format("ParDo(%s)", NameUtils.approximateSimpleName(clazz));
-      }
+      return String.format("ParDo(%s)", NameUtils.approximateSimpleName(getFn()));
     }
 
     /**
@@ -976,12 +971,7 @@ public class ParDo {
 
     @Override
     protected String getKindString() {
-      Class<?> clazz = getFn().getClass();
-      if (clazz.isAnonymousClass()) {
-        return "AnonymousParMultiDo";
-      } else {
-        return String.format("ParMultiDo(%s)", NameUtils.approximateSimpleName(clazz));
-      }
+      return String.format("ParMultiDo(%s)", NameUtils.approximateSimpleName(getFn()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
index 27c5ced..48eafc3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java
@@ -50,7 +50,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Integer, Integer> integersGlobally() {
-    return Combine.globally(new SumIntegerFn()).named("Sum.Globally");
+    return Combine.globally(new SumIntegerFn());
   }
 
   /**
@@ -62,7 +62,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Integer, Integer> integersPerKey() {
-    return Combine.<K, Integer, Integer>perKey(new SumIntegerFn()).named("Sum.PerKey");
+    return Combine.<K, Integer, Integer>perKey(new SumIntegerFn());
   }
 
   /**
@@ -73,7 +73,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Long, Long> longsGlobally() {
-    return Combine.globally(new SumLongFn()).named("Sum.Globally");
+    return Combine.globally(new SumLongFn());
   }
 
   /**
@@ -85,7 +85,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Long, Long> longsPerKey() {
-    return Combine.<K, Long, Long>perKey(new SumLongFn()).named("Sum.PerKey");
+    return Combine.<K, Long, Long>perKey(new SumLongFn());
   }
 
   /**
@@ -96,7 +96,7 @@ public class Sum {
    * {@code 0} if there are no elements.
    */
   public static Combine.Globally<Double, Double> doublesGlobally() {
-    return Combine.globally(new SumDoubleFn()).named("Sum.Globally");
+    return Combine.globally(new SumDoubleFn());
   }
 
   /**
@@ -108,7 +108,7 @@ public class Sum {
    * that key in the input {@code PCollection}.
    */
   public static <K> Combine.PerKey<K, Double, Double> doublesPerKey() {
-    return Combine.<K, Double, Double>perKey(new SumDoubleFn()).named("Sum.PerKey");
+    return Combine.<K, Double, Double>perKey(new SumDoubleFn());
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 992a341..47be9b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -39,6 +39,8 @@ import org.apache.beam.sdk.transforms.Combine.PerKey;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.NameUtils.NameOverride;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -97,7 +99,7 @@ public class Top {
    */
   public static <T, ComparatorT extends Comparator<T> & Serializable>
       Combine.Globally<T, List<T>> of(int count, ComparatorT compareFn) {
-    return Combine.globally(new TopCombineFn<>(count, compareFn)).named("Top.Globally");
+    return Combine.globally(new TopCombineFn<>(count, compareFn));
   }
 
   /**
@@ -141,8 +143,7 @@ public class Top {
    * {@code KV}s and return the top values associated with each key.
    */
   public static <T extends Comparable<T>> Combine.Globally<T, List<T>> smallest(int count) {
-    return Combine.globally(new TopCombineFn<>(count, new Smallest<T>()))
-        .named("Smallest.Globally");
+    return Combine.globally(new TopCombineFn<>(count, new Smallest<T>()));
   }
 
   /**
@@ -186,7 +187,7 @@ public class Top {
    * {@code KV}s and return the top values associated with each key.
    */
   public static <T extends Comparable<T>> Combine.Globally<T, List<T>> largest(int count) {
-    return Combine.globally(new TopCombineFn<>(count, new Largest<T>())).named("Largest.Globally");
+    return Combine.globally(new TopCombineFn<>(count, new Largest<T>()));
   }
 
   /**
@@ -233,8 +234,7 @@ public class Top {
   public static <K, V, ComparatorT extends Comparator<V> & Serializable>
       PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
       perKey(int count, ComparatorT compareFn) {
-    return Combine.perKey(
-        new TopCombineFn<>(count, compareFn).<K>asKeyedFn()).named("Top.PerKey");
+    return Combine.perKey(new TopCombineFn<>(count, compareFn).<K>asKeyedFn());
   }
 
   /**
@@ -280,8 +280,7 @@ public class Top {
   public static <K, V extends Comparable<V>>
       PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
       smallestPerKey(int count) {
-    return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn())
-        .named("Smallest.PerKey");
+    return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn());
   }
 
   /**
@@ -327,9 +326,7 @@ public class Top {
   public static <K, V extends Comparable<V>>
       PerKey<K, V, List<V>>
       largestPerKey(int count) {
-    return Combine.perKey(
-new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
-        .named("Largest.PerKey");
+    return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn());
   }
 
   /**
@@ -368,7 +365,8 @@ new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
    * @param <T> type of element being compared
    */
   public static class TopCombineFn<T, ComparatorT extends Comparator<T> & Serializable>
-      extends AccumulatingCombineFn<T, BoundedHeap<T, ComparatorT>, List<T>> {
+      extends AccumulatingCombineFn<T, BoundedHeap<T, ComparatorT>, List<T>>
+      implements NameOverride {
 
     private final int count;
     private final ComparatorT compareFn;
@@ -380,6 +378,11 @@ new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
     }
 
     @Override
+    public String getNameOverride() {
+      return String.format("Top(%s)", NameUtils.approximateSimpleName(compareFn));
+    }
+
+    @Override
     public BoundedHeap<T, ComparatorT> createAccumulator() {
       return new BoundedHeap<>(count, compareFn, new ArrayList<T>());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
index 60a0e41..1c59af7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
@@ -29,12 +29,18 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
- * Helpers for extracting the name of objects (most commonly {@link DoFn} and {@link CombineFn}).
+ * Helpers for extracting the name of objects and classes.
  */
 public class NameUtils {
 
+  /** Classes may implement this interface to change how names are generated for their instances. */
+  public interface NameOverride {
+    /** Return the name to use for this instance. */
+    String getNameOverride();
+  }
+
   private static final String[] STANDARD_NAME_SUFFIXES =
-      new String[]{"OldDoFn", "DoFn", "Fn"};
+      new String[]{"OldDoFn", "DoFn", "CombineFn", "Fn"};
 
   /**
    * Pattern to match a non-anonymous inner class.
@@ -87,7 +93,16 @@ public class NameUtils {
   }
 
   /**
-   * Returns a simple name for a class.
+   * As {@link #approximateSimpleName(Object, String)} but returning {@code "Anonymous"} when
+   * {@code object} is an instance of anonymous class.
+   */
+  public static String approximateSimpleName(Object object) {
+    return approximateSimpleName(object, "Anonymous");
+  }
+
+  /**
+   * Returns a simple name describing a class that is being used as a function (eg., a {@link DoFn}
+   * or {@link CombineFn}, etc.).
    *
    * <p>Note: this is non-invertible - the name may be simplified to an
    * extent that it cannot be mapped back to the original class.
@@ -96,15 +111,28 @@ public class NameUtils {
    * removes the package and outer classes from the name,
    * and removes common suffixes.
    *
+   * <p>If the object is an instanceof {@link NameOverride}, the result of
+   * {@link NameOverride#getNameOverride()} is returned. This allows classes that act as wrappers to
+   * override the handling of names by delegating to the objects they wrap.
+   *
+   * <p>If the class is anonymous, the string {@code anonymousValue} is returned.
+   *
    * <p>Examples:
    * <ul>
    *   <li>{@code some.package.Word.SummaryDoFn} becomes "Summary"
    *   <li>{@code another.package.PairingFn} becomes "Pairing"
    * </ul>
-   *
-   * @throws IllegalArgumentException if the class is anonymous
    */
-  public static String approximateSimpleName(Class<?> clazz) {
+  public static String approximateSimpleName(Object object, String anonymousValue) {
+    if (object instanceof NameOverride) {
+      return ((NameOverride) object).getNameOverride();
+    }
+
+    Class<?> clazz = object.getClass();
+    if (clazz.isAnonymousClass()) {
+      return anonymousValue;
+    }
+
     return approximateSimpleName(clazz, /* dropOuterClassNames */ true);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index f783928..fef47fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -24,6 +24,7 @@ import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -470,8 +471,7 @@ public class CombineTest implements Serializable {
     runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.<KV<String, Double>>emptyList());
   }
 
-  // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine),
-  // provide their own top-level name.
+  // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine) have good names.
   @Test
   public void testCombinerNames() {
     Combine.PerKey<String, Integer, Integer> min = Min.integersPerKey();
@@ -479,10 +479,10 @@ public class CombineTest implements Serializable {
     Combine.PerKey<String, Integer, Double> mean = Mean.perKey();
     Combine.PerKey<String, Integer, Integer> sum = Sum.integersPerKey();
 
-    assertThat(min.getName(), Matchers.startsWith("Min"));
-    assertThat(max.getName(), Matchers.startsWith("Max"));
-    assertThat(mean.getName(), Matchers.startsWith("Mean"));
-    assertThat(sum.getName(), Matchers.startsWith("Sum"));
+    assertThat(min.getName(), equalTo("Combine.perKey(MinInteger)"));
+    assertThat(max.getName(), equalTo("Combine.perKey(MaxInteger)"));
+    assertThat(mean.getName(), equalTo("Combine.perKey(Mean)"));
+    assertThat(sum.getName(), equalTo("Combine.perKey(SumInteger)"));
   }
 
   private static final SerializableFunction<String, Integer> hotKeyFanout =
@@ -635,18 +635,13 @@ public class CombineTest implements Serializable {
 
   @Test
   public void testCombineGetName() {
-    assertEquals("Combine.Globally", Combine.globally(new SumInts()).getName());
-    assertEquals(
-        "MyCombineGlobally", Combine.globally(new SumInts()).named("MyCombineGlobally").getName());
+    assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName());
     assertEquals(
         "Combine.GloballyAsSingletonView",
         Combine.globally(new SumInts()).asSingletonView().getName());
-    assertEquals("Combine.PerKey", Combine.perKey(new TestKeyedCombineFn()).getName());
-    assertEquals(
-        "MyCombinePerKey",
-        Combine.perKey(new TestKeyedCombineFn()).named("MyCombinePerKey").getName());
+    assertEquals("Combine.perKey(TestKeyed)", Combine.perKey(new TestKeyedCombineFn()).getName());
     assertEquals(
-        "Combine.PerKeyWithHotKeyFanout",
+        "Combine.perKeyWithFanout(TestKeyed)",
         Combine.perKey(new TestKeyedCombineFn()).withHotKeyFanout(10).getName());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
index eafb12d..dca0542 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
@@ -110,6 +110,6 @@ public class CountTest {
   @Test
   public void testCountGetName() {
     assertEquals("Count.PerElement", Count.perElement().getName());
-    assertEquals("Count.Globally", Count.globally().getName());
+    assertEquals("Combine.globally(Count)", Count.globally().getName());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 5c78b3f..4aa39a3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -34,13 +34,13 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class MaxTest {
   @Test
-  public void testMeanGetNames() {
-    assertEquals("Max.Globally", Max.integersGlobally().getName());
-    assertEquals("Max.Globally", Max.doublesGlobally().getName());
-    assertEquals("Max.Globally", Max.longsGlobally().getName());
-    assertEquals("Max.PerKey", Max.integersPerKey().getName());
-    assertEquals("Max.PerKey", Max.doublesPerKey().getName());
-    assertEquals("Max.PerKey", Max.longsPerKey().getName());
+  public void testMaxGetNames() {
+    assertEquals("Combine.globally(MaxInteger)", Max.integersGlobally().getName());
+    assertEquals("Combine.globally(MaxDouble)", Max.doublesGlobally().getName());
+    assertEquals("Combine.globally(MaxLong)", Max.longsGlobally().getName());
+    assertEquals("Combine.perKey(MaxInteger)", Max.integersPerKey().getName());
+    assertEquals("Combine.perKey(MaxDouble)", Max.doublesPerKey().getName());
+    assertEquals("Combine.perKey(MaxLong)", Max.longsPerKey().getName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
index 1c94e35..84741ee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MeanTest.java
@@ -36,10 +36,11 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class MeanTest {
+
   @Test
   public void testMeanGetNames() {
-    assertEquals("Mean.Globally", Mean.globally().getName());
-    assertEquals("Mean.PerKey", Mean.perKey().getName());
+    assertEquals("Combine.globally(Mean)", Mean.globally().getName());
+    assertEquals("Combine.perKey(Mean)", Mean.perKey().getName());
   }
 
   private static final Coder<CountSum<Number>> TEST_CODER = new CountSumCoder<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index a0eca07..4334ed9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -35,15 +35,14 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class MinTest {
   @Test
-  public void testMeanGetNames() {
-    assertEquals("Min.Globally", Min.integersGlobally().getName());
-    assertEquals("Min.Globally", Min.doublesGlobally().getName());
-    assertEquals("Min.Globally", Min.longsGlobally().getName());
-    assertEquals("Min.PerKey", Min.integersPerKey().getName());
-    assertEquals("Min.PerKey", Min.doublesPerKey().getName());
-    assertEquals("Min.PerKey", Min.longsPerKey().getName());
+  public void testMinGetNames() {
+    assertEquals("Combine.globally(MinInteger)", Min.integersGlobally().getName());
+    assertEquals("Combine.globally(MinDouble)", Min.doublesGlobally().getName());
+    assertEquals("Combine.globally(MinLong)", Min.longsGlobally().getName());
+    assertEquals("Combine.perKey(MinInteger)", Min.integersPerKey().getName());
+    assertEquals("Combine.perKey(MinDouble)", Min.doublesPerKey().getName());
+    assertEquals("Combine.perKey(MinLong)", Min.longsPerKey().getName());
   }
-
   @Test
   public void testMinIntegerFn() {
     checkCombineFn(

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
index b4f723d..04c0186 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SumTest.java
@@ -41,12 +41,12 @@ public class SumTest {
 
   @Test
   public void testSumGetNames() {
-    assertEquals("Sum.Globally", Sum.integersGlobally().getName());
-    assertEquals("Sum.Globally", Sum.doublesGlobally().getName());
-    assertEquals("Sum.Globally", Sum.longsGlobally().getName());
-    assertEquals("Sum.PerKey", Sum.integersPerKey().getName());
-    assertEquals("Sum.PerKey", Sum.doublesPerKey().getName());
-    assertEquals("Sum.PerKey", Sum.longsPerKey().getName());
+    assertEquals("Combine.globally(SumInteger)", Sum.integersGlobally().getName());
+    assertEquals("Combine.globally(SumDouble)", Sum.doublesGlobally().getName());
+    assertEquals("Combine.globally(SumLong)", Sum.longsGlobally().getName());
+    assertEquals("Combine.perKey(SumInteger)", Sum.integersPerKey().getName());
+    assertEquals("Combine.perKey(SumDouble)", Sum.doublesPerKey().getName());
+    assertEquals("Combine.perKey(SumLong)", Sum.longsPerKey().getName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index d011197..89e0076 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -234,12 +234,13 @@ public class TopTest {
 
   @Test
   public void testTopGetNames() {
-    assertEquals("Top.Globally", Top.of(1, new OrderByLength()).getName());
-    assertEquals("Smallest.Globally", Top.smallest(1).getName());
-    assertEquals("Largest.Globally", Top.largest(2).getName());
-    assertEquals("Top.PerKey", Top.perKey(1, new IntegerComparator()).getName());
-    assertEquals("Smallest.PerKey", Top.<String, Integer>smallestPerKey(1).getName());
-    assertEquals("Largest.PerKey", Top.<String, Integer>largestPerKey(2).getName());
+    assertEquals("Combine.globally(Top(OrderByLength))", Top.of(1, new OrderByLength()).getName());
+    assertEquals("Combine.globally(Top(Smallest))", Top.smallest(1).getName());
+    assertEquals("Combine.globally(Top(Largest))", Top.largest(2).getName());
+    assertEquals("Combine.perKey(Top(IntegerComparator))",
+        Top.perKey(1, new IntegerComparator()).getName());
+    assertEquals("Combine.perKey(Top(Smallest))", Top.<String, Integer>smallestPerKey(1).getName());
+    assertEquals("Combine.perKey(Top(Largest))", Top.<String, Integer>largestPerKey(2).getName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/16b26673/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index b35e942..b81aa36 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.NameUtils.NameOverride;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PDone;
 import org.junit.Rule;
@@ -111,16 +112,12 @@ public class NameUtilsTest {
 
   @Test
   public void testSimpleName() {
-    assertEquals("Embedded", NameUtils.approximateSimpleName(EmbeddedOldDoFn.class));
+    assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedOldDoFn()));
   }
 
   @Test
   public void testAnonSimpleName() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-
-    EmbeddedOldDoFn anon = new EmbeddedOldDoFn(){};
-
-    NameUtils.approximateSimpleName(anon.getClass());
+    assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedOldDoFn() {}));
   }
 
   @Test
@@ -128,7 +125,7 @@ public class NameUtilsTest {
     EmbeddedOldDoFn fn = new EmbeddedOldDoFn();
     EmbeddedOldDoFn inner = fn.getEmbedded();
 
-    assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner.getClass()));
+    assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner));
   }
 
   @Test
@@ -160,9 +157,25 @@ public class NameUtilsTest {
     };
 
     assertEquals("NamedInnerClass",
-        NameUtils.approximateSimpleName(anonymousClassObj.getInnerClassInstance().getClass()));
+        NameUtils.approximateSimpleName(anonymousClassObj.getInnerClassInstance()));
     assertEquals("NameUtilsTest.NamedInnerClass",
-        NameUtils.approximatePTransformName(
-            anonymousClassObj.getInnerClassInstance().getClass()));
+        NameUtils.approximatePTransformName(anonymousClassObj.getInnerClassInstance().getClass()));
+  }
+
+  @Test
+  public void testApproximateSimpleNameOverride() {
+    Object overriddenName = new NameOverride() {
+      @Override
+      public String getNameOverride() {
+        return "CUSTOM_NAME";
+      }
+    };
+    assertEquals("CUSTOM_NAME", NameUtils.approximateSimpleName(overriddenName));
+  }
+
+  @Test
+  public void testApproximateSimpleNameCustomAnonymous() {
+    Object overriddenName = new Object() {};
+    assertEquals("CUSTOM_NAME", NameUtils.approximateSimpleName(overriddenName, "CUSTOM_NAME"));
   }
 }