You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/15 23:54:53 UTC

[2/3] incubator-beam git commit: Add DisplayData for combine transforms

Add DisplayData for combine transforms

If more than one combineFn have the same namespace, add a sequential suffix.
This is necessary because each namespace/key pair must be unique within
the transform.

Add a `JavaClass` wrapper around a name/simple-name for a class. This is
necessary in cases where the class may be serialized to support
accessing `DisplayData` since `Class` is not serializable in some cases.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0baa4c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0baa4c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0baa4c9

Branch: refs/heads/master
Commit: b0baa4c9d66750b1cbdbb0dc7f02e62385436bc2
Parents: d440d94
Author: Scott Wegner <sw...@google.com>
Authored: Mon Apr 11 09:08:23 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Fri Apr 15 14:26:51 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/ApproximateQuantiles.java    |   8 +
 .../beam/sdk/transforms/ApproximateUnique.java  |  44 ++++++
 .../org/apache/beam/sdk/transforms/Combine.java | 155 +++++++++++++++----
 .../beam/sdk/transforms/CombineFnBase.java      |  27 +++-
 .../apache/beam/sdk/transforms/CombineFns.java  |  65 ++++++++
 .../beam/sdk/transforms/CombineWithContext.java |   6 +
 .../org/apache/beam/sdk/transforms/Max.java     |   6 +
 .../org/apache/beam/sdk/transforms/Min.java     |   6 +
 .../org/apache/beam/sdk/transforms/Sample.java  |  14 ++
 .../org/apache/beam/sdk/transforms/Top.java     |   8 +
 .../sdk/transforms/display/ClassForDisplay.java |  93 +++++++++++
 .../sdk/transforms/display/DisplayData.java     | 111 +++++++++++--
 .../org/apache/beam/sdk/util/CombineFnUtil.java |  13 ++
 .../transforms/ApproximateQuantilesTest.java    |  13 ++
 .../sdk/transforms/ApproximateUniqueTest.java   |  17 ++
 .../beam/sdk/transforms/CombineFnsTest.java     |  69 ++++++++-
 .../apache/beam/sdk/transforms/CombineTest.java |  22 ++-
 .../org/apache/beam/sdk/transforms/MaxTest.java |  13 +-
 .../org/apache/beam/sdk/transforms/MinTest.java |  13 +-
 .../apache/beam/sdk/transforms/SampleTest.java  |  14 ++
 .../org/apache/beam/sdk/transforms/TopTest.java |  13 ++
 .../transforms/display/ClassForDisplayTest.java |  66 ++++++++
 .../transforms/display/DisplayDataMatchers.java |  51 +++---
 .../sdk/transforms/display/DisplayDataTest.java |  18 ++-
 .../display/ClassForDisplayJava8Test.java       |  46 ++++++
 .../beam/sdk/transforms/CombineJava8Test.java   |  42 +++++
 26 files changed, 878 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 2ed7a85..c58c736 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
 import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WeightedValue;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
@@ -359,6 +360,13 @@ public class ApproximateQuantiles {
         CoderRegistry registry, Coder<T> elementCoder) {
       return new QuantileStateCoder<>(compareFn, elementCoder);
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add("numQuantiles", numQuantiles)
+          .add("comparer", compareFn.getClass());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index 4f9dfc4..175897b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -31,6 +32,8 @@ import com.google.common.hash.Hashing;
 import com.google.common.hash.HashingOutputStream;
 import com.google.common.io.ByteStreams;
 
+import org.apache.avro.reflect.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -168,6 +171,12 @@ public class ApproximateUnique {
     private final long sampleSize;
 
     /**
+     * The desired maximum estimation error or null if not specified.
+     */
+    @Nullable
+    private final Double maximumEstimationError;
+
+    /**
      * @see ApproximateUnique#globally(int)
      */
     public Globally(int sampleSize) {
@@ -178,7 +187,9 @@ public class ApproximateUnique {
             + "In general, the estimation "
             + "error is about 2 / sqrt(sampleSize).");
       }
+
       this.sampleSize = sampleSize;
+      this.maximumEstimationError = null;
     }
 
     /**
@@ -190,7 +201,9 @@ public class ApproximateUnique {
             "ApproximateUnique needs an "
             + "estimation error between 1% (0.01) and 50% (0.5).");
       }
+
       this.sampleSize = sampleSizeFromEstimationError(maximumEstimationError);
+      this.maximumEstimationError = maximumEstimationError;
     }
 
     @Override
@@ -200,6 +213,11 @@ public class ApproximateUnique {
           Combine.globally(
               new ApproximateUniqueCombineFn<>(sampleSize, coder)));
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      ApproximateUnique.populateDisplayData(builder, sampleSize, maximumEstimationError);
+    }
   }
 
   /**
@@ -213,9 +231,19 @@ public class ApproximateUnique {
   static class PerKey<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> {
 
+    /**
+     * The number of entries in the statistical sample; the higher this number,
+     * the more accurate the estimate will be.
+     */
     private final long sampleSize;
 
     /**
+     * The the desired maximum estimation error or null if not specified.
+     */
+    @Nullable
+    private final Double maximumEstimationError;
+
+    /**
      * @see ApproximateUnique#perKey(int)
      */
     public PerKey(int sampleSize) {
@@ -225,7 +253,9 @@ public class ApproximateUnique {
             + "sampleSize >= 16 for an estimation error <= 50%.  In general, "
             + "the estimation error is about 2 / sqrt(sampleSize).");
       }
+
       this.sampleSize = sampleSize;
+      this.maximumEstimationError = null;
     }
 
     /**
@@ -237,7 +267,9 @@ public class ApproximateUnique {
             "ApproximateUnique.PerKey needs an "
             + "estimation error between 1% (0.01) and 50% (0.5).");
       }
+
       this.sampleSize = sampleSizeFromEstimationError(estimationError);
+      this.maximumEstimationError = estimationError;
     }
 
     @Override
@@ -254,6 +286,11 @@ public class ApproximateUnique {
           Combine.perKey(new ApproximateUniqueCombineFn<>(
               sampleSize, coder).<K>asKeyedFn()));
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      ApproximateUnique.populateDisplayData(builder, sampleSize, maximumEstimationError);
+    }
   }
 
 
@@ -418,4 +455,11 @@ public class ApproximateUnique {
   static long sampleSizeFromEstimationError(double estimationError) {
     return Math.round(Math.ceil(4.0 / Math.pow(estimationError, 2.0)));
   }
+
+  private static void populateDisplayData(
+      DisplayData.Builder builder, long sampleSize, Double maxEstimationError) {
+    builder
+        .add("sampleSize", sampleSize)
+        .addIfNotNull("maximumEstimationError", maxEstimationError);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 28bbeed..3566fa5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -37,6 +36,9 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
+import org.apache.beam.sdk.transforms.display.ClassForDisplay;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -101,9 +103,10 @@ public class Combine {
    */
   public static <V> Globally<V, V> globally(
       SerializableFunction<Iterable<V>, V> combiner) {
-    return globally(IterableCombineFn.of(combiner));
+    return globally(IterableCombineFn.of(combiner), ClassForDisplay.fromInstance(combiner));
   }
 
+
   /**
    * Returns a {@link Globally Combine.Globally} {@code PTransform}
    * that uses the given {@code GloballyCombineFn} to combine all
@@ -121,7 +124,12 @@ public class Combine {
    */
   public static <InputT, OutputT> Globally<InputT, OutputT> globally(
       GlobalCombineFn<? super InputT, ?, OutputT> fn) {
-    return new Globally<>(fn, true, 0);
+    return globally(fn, ClassForDisplay.fromInstance(fn));
+  }
+
+  private static <InputT, OutputT> Globally<InputT, OutputT> globally(
+          GlobalCombineFn<? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
+    return new Globally<>(fn, fnClass, true, 0);
   }
 
   /**
@@ -142,7 +150,7 @@ public class Combine {
    */
   public static <K, V> PerKey<K, V, V> perKey(
       SerializableFunction<Iterable<V>, V> fn) {
-    return perKey(Combine.IterableCombineFn.of(fn));
+    return perKey(IterableCombineFn.of(fn).<K>asKeyedFn(), ClassForDisplay.fromInstance(fn));
   }
 
   /**
@@ -163,7 +171,7 @@ public class Combine {
    */
   public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
       GlobalCombineFn<? super InputT, ?, OutputT> fn) {
-    return perKey(fn.<K>asKeyedFn());
+    return perKey(fn.<K>asKeyedFn(), ClassForDisplay.fromInstance(fn));
   }
 
   /**
@@ -184,7 +192,12 @@ public class Combine {
    */
   public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
       PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
-    return new PerKey<>(fn, false /*fewKeys*/);
+    return perKey(fn, ClassForDisplay.fromInstance(fn));
+  }
+
+  private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
+          PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
+    return new PerKey<>(fn, fnClass, false /*fewKeys*/);
   }
 
   /**
@@ -192,8 +205,8 @@ public class Combine {
    * in {@link GroupByKey}.
    */
   private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys(
-      PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
-    return new PerKey<>(fn, true /*fewKeys*/);
+      PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
+    return new PerKey<>(fn, fnClass, true /*fewKeys*/);
   }
 
   /**
@@ -219,7 +232,7 @@ public class Combine {
    */
   public static <K, V> GroupedValues<K, V, V> groupedValues(
       SerializableFunction<Iterable<V>, V> fn) {
-    return groupedValues(IterableCombineFn.of(fn));
+    return groupedValues(IterableCombineFn.of(fn).<K>asKeyedFn(), ClassForDisplay.fromInstance(fn));
   }
 
   /**
@@ -245,7 +258,7 @@ public class Combine {
    */
   public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
       GlobalCombineFn<? super InputT, ?, OutputT> fn) {
-    return groupedValues(fn.<K>asKeyedFn());
+    return groupedValues(fn.<K>asKeyedFn(), ClassForDisplay.fromInstance(fn));
   }
 
   /**
@@ -271,9 +284,13 @@ public class Combine {
    */
   public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
       PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
-    return new GroupedValues<>(fn);
+    return groupedValues(fn, ClassForDisplay.fromInstance(fn));
   }
 
+  private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
+          PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
+    return new GroupedValues<>(fn, fnClass);
+  }
 
   /////////////////////////////////////////////////////////////////////////////
 
@@ -495,6 +512,11 @@ public class Combine {
         public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
           return CombineFn.this;
         }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+          CombineFn.this.populateDisplayData(builder);
+        }
       };
     }
   }
@@ -1168,6 +1190,11 @@ public class Combine {
             CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
           return KeyedCombineFn.this.getDefaultOutputCoder(registry, keyCoder, inputCoder);
         }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+          KeyedCombineFn.this.populateDisplayData(builder);
+        }
       };
     }
 
@@ -1233,31 +1260,36 @@ public class Combine {
       extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
     private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
+    private final ClassForDisplay fnClass;
     private final boolean insertDefault;
     private final int fanout;
     private final List<PCollectionView<?>> sideInputs;
 
-    private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn,
+    private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn, ClassForDisplay fnClass,
         boolean insertDefault, int fanout) {
       this.fn = fn;
+      this.fnClass = fnClass;
       this.insertDefault = insertDefault;
       this.fanout = fanout;
       this.sideInputs = ImmutableList.<PCollectionView<?>>of();
     }
 
     private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        boolean insertDefault, int fanout) {
+        ClassForDisplay fnClass, boolean insertDefault, int fanout) {
       super(name);
       this.fn = fn;
+      this.fnClass = fnClass;
       this.insertDefault = insertDefault;
       this.fanout = fanout;
       this.sideInputs = ImmutableList.<PCollectionView<?>>of();
     }
 
     private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        boolean insertDefault, int fanout, List<PCollectionView<?>> sideInputs) {
+        ClassForDisplay fnClass, boolean insertDefault, int fanout,
+        List<PCollectionView<?>> sideInputs) {
       super(name);
       this.fn = fn;
+      this.fnClass = fnClass;
       this.insertDefault = insertDefault;
       this.fanout = fanout;
       this.sideInputs = sideInputs;
@@ -1268,7 +1300,7 @@ public class Combine {
      * specified name. Does not modify this transform.
      */
     public Globally<InputT, OutputT> named(String name) {
-      return new Globally<>(name, fn, insertDefault, fanout);
+      return new Globally<>(name, fn, fnClass, insertDefault, fanout);
     }
 
     /**
@@ -1279,7 +1311,7 @@ public class Combine {
      * to an empty input set will be returned.
      */
     public GloballyAsSingletonView<InputT, OutputT> asSingletonView() {
-      return new GloballyAsSingletonView<>(fn, insertDefault, fanout);
+      return new GloballyAsSingletonView<>(fn, fnClass, insertDefault, fanout);
     }
 
     /**
@@ -1288,7 +1320,7 @@ public class Combine {
      * is not globally windowed and the output is not being used as a side input.
      */
     public Globally<InputT, OutputT> withoutDefaults() {
-      return new Globally<>(name, fn, false, fanout);
+      return new Globally<>(name, fn, fnClass, false, fanout);
     }
 
     /**
@@ -1299,7 +1331,7 @@ public class Combine {
      * that will be used.
      */
     public Globally<InputT, OutputT> withFanout(int fanout) {
-      return new Globally<>(name, fn, insertDefault, fanout);
+      return new Globally<>(name, fn, fnClass, insertDefault, fanout);
     }
 
     /**
@@ -1309,7 +1341,7 @@ public class Combine {
     public Globally<InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
       Preconditions.checkState(fn instanceof RequiresContextInternal);
-      return new Globally<InputT, OutputT>(name, fn, insertDefault, fanout,
+      return new Globally<InputT, OutputT>(name, fn, fnClass, insertDefault, fanout,
           ImmutableList.<PCollectionView<?>>copyOf(sideInputs));
     }
 
@@ -1320,7 +1352,7 @@ public class Combine {
           .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
 
       Combine.PerKey<Void, InputT, OutputT> combine =
-          Combine.<Void, InputT, OutputT>fewKeys(fn.asKeyedFn());
+          Combine.<Void, InputT, OutputT>fewKeys(fn.asKeyedFn(), fnClass);
       if (!sideInputs.isEmpty()) {
         combine = combine.withSideInputs(sideInputs);
       }
@@ -1344,6 +1376,12 @@ public class Combine {
       }
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      Combine.populateDisplayData(builder, fn, fnClass);
+      Combine.populateGlobalDisplayData(builder, fanout, insertDefault);
+    }
+
     private PCollection<OutputT> insertDefaultValueIfEmpty(PCollection<OutputT> maybeEmpty) {
       final PCollectionView<Iterable<OutputT>> maybeEmptyView = maybeEmpty.apply(
           View.<OutputT>asIterable());
@@ -1370,6 +1408,20 @@ public class Combine {
     }
   }
 
+  private static void populateDisplayData(
+      DisplayData.Builder builder, HasDisplayData fn, ClassForDisplay fnClass) {
+    builder
+        .include(fn, fnClass)
+        .add("combineFn", fnClass);
+  }
+
+  private static void populateGlobalDisplayData(
+      DisplayData.Builder builder, int fanout, boolean insertDefault) {
+    builder
+        .addIfNotDefault("fanout", fanout, 0)
+        .add("emitDefaultOnEmptyInput", insertDefault);
+  }
+
   /**
    * {@code Combine.GloballyAsSingletonView<InputT, OutputT>} takes a {@code PCollection<InputT>}
    * and returns a {@code PCollectionView<OutputT>} whose elements are the result of
@@ -1413,12 +1465,15 @@ public class Combine {
       extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
 
     private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
+    private final ClassForDisplay fnClass;
     private final boolean insertDefault;
     private final int fanout;
 
     private GloballyAsSingletonView(
-        GlobalCombineFn<? super InputT, ?, OutputT> fn, boolean insertDefault, int fanout) {
+        GlobalCombineFn<? super InputT, ?, OutputT> fn, ClassForDisplay fnClass,
+        boolean insertDefault, int fanout) {
       this.fn = fn;
+      this.fnClass = fnClass;
       this.insertDefault = insertDefault;
       this.fanout = fanout;
     }
@@ -1449,6 +1504,12 @@ public class Combine {
     public GlobalCombineFn<? super InputT, ?, OutputT> getCombineFn() {
       return fn;
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      Combine.populateDisplayData(builder, fn, fnClass);
+      Combine.populateGlobalDisplayData(builder, fanout, insertDefault);
+    }
   }
 
   /**
@@ -1528,6 +1589,11 @@ public class Combine {
       return accumulator.size() > 1 ? mergeToSingleton(accumulator) : accumulator;
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add("combineFn", combiner.getClass());
+    }
+
     private List<V> mergeToSingleton(Iterable<V> values) {
       List<V> singleton = new ArrayList<>();
       singleton.add(combiner.apply(values));
@@ -1601,30 +1667,35 @@ public class Combine {
     extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
     private final transient PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+    private final ClassForDisplay fnClass;
     private final boolean fewKeys;
     private final List<PCollectionView<?>> sideInputs;
 
     private PerKey(
-        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, boolean fewKeys) {
+        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass,
+        boolean fewKeys) {
       this.fn = fn;
+      this.fnClass = fnClass;
       this.fewKeys = fewKeys;
       this.sideInputs = ImmutableList.of();
     }
 
     private PerKey(String name,
-        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass,
         boolean fewKeys, List<PCollectionView<?>> sideInputs) {
       super(name);
       this.fn = fn;
+      this.fnClass = fnClass;
       this.fewKeys = fewKeys;
       this.sideInputs = sideInputs;
     }
 
     private PerKey(
         String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        boolean fewKeys) {
+        ClassForDisplay fnClass, boolean fewKeys) {
       super(name);
       this.fn = fn;
+      this.fnClass = fnClass;
       this.fewKeys = fewKeys;
       this.sideInputs = ImmutableList.of();
     }
@@ -1634,7 +1705,7 @@ public class Combine {
      * specified name. Does not modify this transform.
      */
     public PerKey<K, InputT, OutputT> named(String name) {
-      return new PerKey<K, InputT, OutputT>(name, fn, fewKeys);
+      return new PerKey<K, InputT, OutputT>(name, fn, fnClass, fewKeys);
     }
 
     /**
@@ -1644,7 +1715,7 @@ public class Combine {
     public PerKey<K, InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
       Preconditions.checkState(fn instanceof RequiresContextInternal);
-      return new PerKey<K, InputT, OutputT>(name, fn, fewKeys,
+      return new PerKey<K, InputT, OutputT>(name, fn, fnClass, fewKeys,
           ImmutableList.<PCollectionView<?>>copyOf(sideInputs));
     }
 
@@ -1661,7 +1732,7 @@ public class Combine {
      */
     public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(
         SerializableFunction<? super K, Integer> hotKeyFanout) {
-      return new PerKeyWithHotKeyFanout<K, InputT, OutputT>(name, fn, hotKeyFanout);
+      return new PerKeyWithHotKeyFanout<K, InputT, OutputT>(name, fn, fnClass, hotKeyFanout);
     }
 
     /**
@@ -1669,7 +1740,7 @@ public class Combine {
      * constant value for every key.
      */
     public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) {
-      return new PerKeyWithHotKeyFanout<K, InputT, OutputT>(name, fn,
+      return new PerKeyWithHotKeyFanout<K, InputT, OutputT>(name, fn, fnClass,
           new SerializableFunction<K, Integer>(){
             @Override
             public Integer apply(K unused) {
@@ -1698,6 +1769,11 @@ public class Combine {
           .apply(GroupByKey.<K, InputT>create(fewKeys))
           .apply(Combine.<K, InputT, OutputT>groupedValues(fn).withSideInputs(sideInputs));
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      Combine.populateDisplayData(builder, fn, fnClass);
+    }
   }
 
   /**
@@ -1707,13 +1783,16 @@ public class Combine {
       extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
     private final transient PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+    private final ClassForDisplay fnClass;
     private final SerializableFunction<? super K, Integer> hotKeyFanout;
 
     private PerKeyWithHotKeyFanout(String name,
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+        ClassForDisplay fnClass,
         SerializableFunction<? super K, Integer> hotKeyFanout) {
       super(name);
       this.fn = fn;
+      this.fnClass = fnClass;
       this.hotKeyFanout = hotKeyFanout;
     }
 
@@ -1996,6 +2075,12 @@ public class Combine {
           .apply("PostCombine", Combine.perKey(postCombine));
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      Combine.populateDisplayData(builder, fn, fnClass);
+      builder.add("fanoutFn", hotKeyFanout.getClass());
+    }
+
     /**
      * Used to store either an input or accumulator value, for flattening
      * the hot and cold key paths.
@@ -2137,23 +2222,28 @@ public class Combine {
                          PCollection<KV<K, OutputT>>> {
 
     private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
+    private final ClassForDisplay fnClass;
     private final List<PCollectionView<?>> sideInputs;
 
-    private GroupedValues(PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) {
+    private GroupedValues(
+        PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, ClassForDisplay fnClass) {
       this.fn = SerializableUtils.clone(fn);
+      this.fnClass = fnClass;
       this.sideInputs = ImmutableList.<PCollectionView<?>>of();
     }
 
     private GroupedValues(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
+        ClassForDisplay fnClass,
         List<PCollectionView<?>> sideInputs) {
       this.fn = SerializableUtils.clone(fn);
+      this.fnClass = fnClass;
       this.sideInputs = sideInputs;
     }
 
     public GroupedValues<K, InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
-      return new GroupedValues<>(fn, ImmutableList.<PCollectionView<?>>copyOf(sideInputs));
+      return new GroupedValues<>(fn, fnClass, ImmutableList.<PCollectionView<?>>copyOf(sideInputs));
     }
 
     /**
@@ -2240,5 +2330,10 @@ public class Combine {
               kvCoder.getKeyCoder(), kvCoder.getValueCoder());
       return KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder);
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      Combine.populateDisplayData(builder, fn, fnClass);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
index a57d446..1b64bb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 import com.google.common.collect.ImmutableMap;
@@ -52,7 +54,7 @@ public class CombineFnBase {
    * @param <AccumT> type of mutable accumulator values
    * @param <OutputT> type of output values
    */
-  public interface GlobalCombineFn<InputT, AccumT, OutputT> extends Serializable {
+  public interface GlobalCombineFn<InputT, AccumT, OutputT> extends Serializable, HasDisplayData {
 
     /**
      * Returns the {@code Coder} to use for accumulator {@code AccumT}
@@ -117,7 +119,8 @@ public class CombineFnBase {
    * @param <AccumT> type of mutable accumulator values
    * @param <OutputT> type of output values
    */
-  public interface PerKeyCombineFn<K, InputT, AccumT, OutputT> extends Serializable {
+  public interface PerKeyCombineFn<K, InputT, AccumT, OutputT>
+  extends Serializable, HasDisplayData {
     /**
      * Returns the {@code Coder} to use for accumulator {@code AccumT}
      * values, or null if it is not able to be inferred.
@@ -217,6 +220,16 @@ public class CombineFnBase {
       return (TypeVariable<?>)
           new TypeDescriptor<OutputT>(AbstractGlobalCombineFn.class) {}.getType();
     }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>By default, does not register any display data. Implementors may override this method
+     * to provide their own display metadata.
+     */
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+    }
   }
 
   /**
@@ -282,5 +295,15 @@ public class CombineFnBase {
       return (TypeVariable<?>)
           new TypeDescriptor<OutputT>(AbstractPerKeyCombineFn.class) {}.getType();
     }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>By default, does not register any display data. Implementors may override this method
+     * to provide their own display metadata.
+     */
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index d98bd13..ed45498 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -31,14 +31,19 @@ import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.TupleTag;
 
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -47,6 +52,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -455,6 +461,11 @@ public class CombineFns {
       }
       return new ComposedAccumulatorCoder(coders);
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      CombineFns.populateDisplayData(builder, combineFns);
+    }
   }
 
   /**
@@ -588,6 +599,11 @@ public class CombineFns {
       }
       return new ComposedAccumulatorCoder(coders);
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      CombineFns.populateDisplayData(builder, combineFnWithContexts);
+    }
   }
 
   /**
@@ -769,6 +785,11 @@ public class CombineFns {
       }
       return new ComposedAccumulatorCoder(coders);
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      CombineFns.populateDisplayData(builder, keyedCombineFns);
+    }
   }
 
   /**
@@ -915,6 +936,11 @@ public class CombineFns {
       }
       return new ComposedAccumulatorCoder(coders);
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      CombineFns.populateDisplayData(builder, keyedCombineFns);
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -1008,4 +1034,43 @@ public class CombineFns {
         "Cannot compose with tuple tag %s because it is already present in the composition.",
         outputTag);
   }
+
+  /**
+   * Populate display data for the {@code combineFns} that make up a composed combine transform.
+   *
+   * <p>The same combineFn class may be used multiple times, in which case we must take special care
+   * to register display data with unique namespaces.
+   */
+  private static void populateDisplayData(
+      DisplayData.Builder builder, List<? extends HasDisplayData> combineFns) {
+
+    // NB: ArrayListMultimap necessary to maintain ordering of combineFns of the same type.
+    Multimap<Class<?>, HasDisplayData> combineFnMap = ArrayListMultimap.create();
+
+    for (int i = 0; i < combineFns.size(); i++) {
+      HasDisplayData combineFn = combineFns.get(i);
+      builder.add("combineFn" + (i + 1), combineFn.getClass());
+      combineFnMap.put(combineFn.getClass(), combineFn);
+    }
+
+    for (Map.Entry<Class<?>, Collection<HasDisplayData>> combineFnEntries :
+        combineFnMap.asMap().entrySet()) {
+
+      Collection<HasDisplayData> classCombineFns = combineFnEntries.getValue();
+      if (classCombineFns.size() == 1) {
+        // Only one combineFn of this type, include it directly.
+        builder.include(Iterables.getOnlyElement(classCombineFns));
+
+      } else {
+        // Multiple combineFns of same type, add a namespace suffix so display data is
+        // unique and ordered.
+        String baseNamespace = combineFnEntries.getKey().getName();
+        for (int i = 0; i < combineFns.size(); i++) {
+          HasDisplayData combineFn = combineFns.get(i);
+          String namespace = String.format("%s#%d", baseNamespace, i + 1);
+          builder.include(combineFn, namespace);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
index 77a7e53..9bb4a01 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
@@ -167,6 +168,11 @@ public class CombineWithContext {
         public CombineFnWithContext<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) {
           return CombineFnWithContext.this;
         }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+          CombineFnWithContext.this.populateDisplayData(builder);
+        }
       };
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
index 132d7f2..28749d7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.common.Counter;
 import org.apache.beam.sdk.util.common.Counter.AggregationKind;
 import org.apache.beam.sdk.util.common.CounterProvider;
@@ -204,6 +205,11 @@ public class Max {
     public T apply(T left, T right) {
       return comparator.compare(left, right) >= 0 ? left : right;
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add("comparer", comparator.getClass());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
index dcee91f..8f3082e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Min.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.common.Counter;
 import org.apache.beam.sdk.util.common.Counter.AggregationKind;
 import org.apache.beam.sdk.util.common.CounterProvider;
@@ -204,6 +205,11 @@ public class Min {
     public T apply(T left, T right) {
       return comparator.compare(left, right) <= 0 ? left : right;
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add("comparer", comparator.getClass());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 1e621d4..6362bd4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -153,6 +154,11 @@ public class Sample {
                  .of(new SampleAnyDoFn<>(limit, iterableView)))
           .setCoder(in.getCoder());
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add("sampleSize", limit);
+    }
   }
 
   /**
@@ -188,6 +194,7 @@ public class Sample {
       extends CombineFn<T,
           Top.BoundedHeap<KV<Integer, T>, SerializableComparator<KV<Integer, T>>>,
           Iterable<T>> {
+    private final int sampleSize;
     private final Top.TopCombineFn<KV<Integer, T>, SerializableComparator<KV<Integer, T>>>
         topCombineFn;
     private final Random rand = new Random();
@@ -196,6 +203,8 @@ public class Sample {
       if (sampleSize < 0) {
         throw new IllegalArgumentException("sample size must be >= 0");
       }
+
+      this.sampleSize = sampleSize;
       topCombineFn = new Top.TopCombineFn<KV<Integer, T>, SerializableComparator<KV<Integer, T>>>(
           sampleSize, new KV.OrderByKey<Integer, T>());
     }
@@ -244,5 +253,10 @@ public class Sample {
         CoderRegistry registry, Coder<T> inputCoder) {
       return IterableCoder.of(inputCoder);
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add("sampleSize", sampleSize);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 82747c2..4b366bc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
 import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
 import org.apache.beam.sdk.transforms.Combine.PerKey;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -393,6 +394,13 @@ new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn())
     }
 
     @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add("count", count)
+          .add("comparer", compareFn.getClass());
+    }
+
+    @Override
     public String getIncompatibleGlobalWindowErrorMessage() {
       return "Default values are not supported in Top.[of, smallest, largest]() if the output "
           + "PCollection is not windowed by GlobalWindows. Instead, use "

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java
new file mode 100644
index 0000000..455d6e1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/ClassForDisplay.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.display;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Display metadata representing a Java class.
+ *
+ * <p>Java classes can be registered as display metadata via
+ * {@link DisplayData.Builder#add(String, ClassForDisplay)}. {@link ClassForDisplay} is
+ * serializable, unlike {@link Class} which can fail to serialize for Java 8 lambda functions.
+ */
+public class ClassForDisplay implements Serializable {
+  private final String simpleName;
+  private final String name;
+
+  private ClassForDisplay(Class<?> clazz) {
+    name = clazz.getName();
+    simpleName = clazz.getSimpleName();
+  }
+
+  /**
+   * Create a {@link ClassForDisplay} instance representing the specified class.
+   */
+  public static ClassForDisplay of(Class<?> clazz) {
+    return new ClassForDisplay(checkNotNull(clazz));
+  }
+
+  /**
+   * Create a {@link ClassForDisplay} from the class of the specified object instance.
+   */
+  public static ClassForDisplay fromInstance(Object obj) {
+    checkNotNull(obj);
+    return new ClassForDisplay(obj.getClass());
+  }
+
+  /**
+   * Retrieve the fully-qualified name of the class.
+   *
+   * @see Class#getName()
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Retrieve a simple representation of the class name.
+   *
+   * @see Class#getSimpleName()
+   */
+  public String getSimpleName() {
+    return simpleName;
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+
+  @Override
+  public int hashCode() {
+    return name.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ClassForDisplay) {
+      ClassForDisplay that = (ClassForDisplay) obj;
+      return Objects.equals(this.name, that.name);
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 3aeed83..6065dc4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -128,7 +128,7 @@ public class DisplayData {
     return builder.toString();
   }
 
-  private static String namespaceOf(Class<?> clazz) {
+  private static String namespaceOf(ClassForDisplay clazz) {
     return clazz.getName();
   }
 
@@ -138,19 +138,35 @@ public class DisplayData {
    */
   public interface Builder {
     /**
-     * Register display metadata from the specified subcomponent. For example, a {@link ParDo}
-     * transform includes display metadata from the encapsulated {@link DoFn}.
+     * Register display metadata from the specified subcomponent.
+     *
+     * @see #include(HasDisplayData, String)
      */
     Builder include(HasDisplayData subComponent);
 
     /**
      * Register display metadata from the specified subcomponent, using the specified namespace.
-     * For example, a {@link ParDo} transform includes display metadata from the encapsulated
-     * {@link DoFn}.
+     *
+     * @see #include(HasDisplayData, String)
      */
     Builder include(HasDisplayData subComponent, Class<?> namespace);
 
     /**
+     * Register display metadata from the specified subcomponent, using the specified namespace.
+     *
+     * @see #include(HasDisplayData, String)
+     */
+    Builder include(HasDisplayData subComponent, ClassForDisplay namespace);
+
+    /**
+     * Register display metadata from the specified subcomponent, using the specified namespace.
+     *
+     * <p>For example, a {@link ParDo} transform includes display metadata from the encapsulated
+     * {@link DoFn}.
+     */
+    Builder include(HasDisplayData subComponent, String namespace);
+
+    /**
      * Register the given string display metadata. The metadata item will be registered with type
      * {@link DisplayData.Type#STRING}, and is identified by the specified key and namespace from
      * the current transform or component.
@@ -289,6 +305,13 @@ public class DisplayData {
     ItemBuilder add(String key, Class<?> value);
 
     /**
+     * Register the given class display metadata. The metadata item will be registered with type
+     * {@link DisplayData.Type#JAVA_CLASS}, and is identified by the specified key and namespace
+     * from the current transform or component.
+     */
+    ItemBuilder add(String key, ClassForDisplay value);
+
+    /**
      * Register the given class display data if the value is not null.
      *
      * @see DisplayData.Builder#add(String, Class)
@@ -296,6 +319,13 @@ public class DisplayData {
     ItemBuilder addIfNotNull(String key, @Nullable Class<?> value);
 
     /**
+     * Register the given class display data if the value is not null.
+     *
+     * @see DisplayData.Builder#add(String, ClassForDisplay)
+     */
+    ItemBuilder addIfNotNull(String key, @Nullable ClassForDisplay value);
+
+    /**
      * Register the given class display data if the value is different than the specified default.
      *
      * @see DisplayData.Builder#add(String, Class)
@@ -303,6 +333,13 @@ public class DisplayData {
     ItemBuilder addIfNotDefault(
         String key, @Nullable Class<?> value, @Nullable Class<?> defaultValue);
 
+    /**
+     * Register the given class display data if the value is different than the specified default.
+     *
+     * @see DisplayData.Builder#add(String, ClassForDisplay)
+     */
+    ItemBuilder addIfNotDefault(
+        String key, @Nullable ClassForDisplay value, @Nullable ClassForDisplay defaultValue);
   /**
    * Register the given display metadata with the specified type.
    *
@@ -345,6 +382,14 @@ public class DisplayData {
      * <p>Leaving the namespace unspecified will default to the registering instance's class.
      */
     ItemBuilder withNamespace(Class<?> namespace);
+
+    /**
+     * Adds an explicit namespace to the most-recently added display metadata. The namespace
+     * and key uniquely identify the display metadata.
+     *
+     * <p>Leaving the namespace unspecified will default to the registering instance's class.
+     */
+    ItemBuilder withNamespace(ClassForDisplay namespace);
   }
 
   /**
@@ -362,11 +407,10 @@ public class DisplayData {
     private final String label;
     private final String url;
 
-    private static Item create(Class<?> nsClass, String key, Type type, Object value) {
+    private static Item create(String nsClass, String key, Type type, Object value) {
       FormattedItemValue formatted = type.format(value);
-      String namespace = namespaceOf(nsClass);
       return new Item(
-          namespace, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null);
+          nsClass, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null);
     }
 
     private Item(
@@ -494,7 +538,7 @@ public class DisplayData {
       return new Item(this.ns, this.key, this.type, this.value, this.shortValue, url, this.label);
     }
 
-    private Item withNamespace(Class<?> nsClass) {
+    private Item withNamespace(ClassForDisplay nsClass) {
       String namespace = namespaceOf(nsClass);
       return new Item(
           namespace, this.key, this.type, this.value, this.shortValue, this.url, this.label);
@@ -515,7 +559,7 @@ public class DisplayData {
     private final String ns;
     private final String key;
 
-    public static Identifier of(Class<?> namespace, String key) {
+    public static Identifier of(ClassForDisplay namespace, String key) {
       return of(namespaceOf(namespace), key);
     }
 
@@ -608,7 +652,12 @@ public class DisplayData {
     JAVA_CLASS {
       @Override
       FormattedItemValue format(Object value) {
-        Class<?> clazz = checkType(value, Class.class, JAVA_CLASS);
+        if (value instanceof Class<?>) {
+          ClassForDisplay classForDisplay = ClassForDisplay.of((Class<?>) value);
+          return format(classForDisplay);
+        }
+
+        ClassForDisplay clazz = checkType(value, ClassForDisplay.class, JAVA_CLASS);
         return new FormattedItemValue(clazz.getName(), clazz.getSimpleName());
       }
     };
@@ -644,7 +693,7 @@ public class DisplayData {
         return  TIMESTAMP;
       } else if (value instanceof Duration) {
         return  DURATION;
-      } else if (value instanceof Class<?>) {
+      } else if (value instanceof Class<?> || value instanceof ClassForDisplay) {
         return  JAVA_CLASS;
       } else if (value instanceof String) {
         return  STRING;
@@ -680,7 +729,7 @@ public class DisplayData {
     private final Map<Identifier, Item> entries;
     private final Set<Object> visited;
 
-    private Class<?> latestNs;
+    private String latestNs;
 
     @Nullable
     private Item latestItem;
@@ -704,13 +753,25 @@ public class DisplayData {
 
     @Override
     public Builder include(HasDisplayData subComponent, Class<?> namespace) {
+      checkNotNull(namespace);
+      return include(subComponent, ClassForDisplay.of(namespace));
+    }
+
+    @Override
+    public Builder include(HasDisplayData subComponent, ClassForDisplay namespace) {
+      checkNotNull(namespace);
+      return include(subComponent, namespaceOf(namespace));
+    }
+
+    @Override
+    public Builder include(HasDisplayData subComponent, String namespace) {
       checkNotNull(subComponent);
       checkNotNull(namespace);
 
       commitLatest();
       boolean newComponent = visited.add(subComponent);
       if (newComponent) {
-        Class prevNs = this.latestNs;
+        String prevNs = this.latestNs;
         this.latestNs = namespace;
         subComponent.populateDisplayData(this);
         this.latestNs = prevNs;
@@ -822,17 +883,34 @@ public class DisplayData {
     }
 
     @Override
+    public ItemBuilder add(String key, ClassForDisplay value) {
+      checkNotNull(value);
+      return addItemIf(true, key, Type.JAVA_CLASS, value);
+    }
+
+    @Override
     public ItemBuilder addIfNotNull(String key, @Nullable Class<?> value) {
       return addItemIf(value != null, key, Type.JAVA_CLASS, value);
     }
 
     @Override
+    public ItemBuilder addIfNotNull(String key, @Nullable ClassForDisplay value) {
+      return addItemIf(value != null, key, Type.JAVA_CLASS, value);
+    }
+
+    @Override
     public ItemBuilder addIfNotDefault(
         String key, @Nullable Class<?> value, @Nullable Class<?> defaultValue) {
       return addItemIf(!Objects.equals(value, defaultValue), key, Type.JAVA_CLASS, value);
     }
 
     @Override
+    public ItemBuilder addIfNotDefault(
+        String key, @Nullable ClassForDisplay value, @Nullable ClassForDisplay defaultValue) {
+      return addItemIf(!Objects.equals(value, defaultValue), key, Type.JAVA_CLASS, value);
+    }
+
+    @Override
     public ItemBuilder add(String key, Type type, Object value) {
       checkNotNull(value);
       checkNotNull(type);
@@ -887,6 +965,11 @@ public class DisplayData {
     @Override
     public ItemBuilder withNamespace(Class<?> namespace) {
       checkNotNull(namespace);
+      return withNamespace(ClassForDisplay.of(namespace));
+    }
+
+    @Override
+    public ItemBuilder withNamespace(ClassForDisplay namespace) {
       if (latestItem != null) {
         latestItem = latestItem.withNamespace(namespace);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
index fbb683c..34197f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.state.StateContext;
 
 import java.io.IOException;
@@ -101,6 +102,10 @@ public class CombineFnUtil {
             CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
           return combineFn.getDefaultOutputCoder(registry, inputCoder);
         }
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+          combineFn.populateDisplayData(builder);
+        }
       };
     }
   }
@@ -150,6 +155,10 @@ public class CombineFnUtil {
             Coder<InputT> inputCoder) throws CannotProvideCoderException {
           return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
         }
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+          keyedCombineFn.populateDisplayData(builder);
+        }
       };
     }
   }
@@ -195,6 +204,10 @@ public class CombineFnUtil {
         Coder<InputT> inputCoder) throws CannotProvideCoderException {
       return combineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder);
     }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      combineFn.populateDisplayData(builder);
+    }
 
     private void writeObject(@SuppressWarnings("unused") ObjectOutputStream out)
         throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index 6bc5c1e..cc81748 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 
 import org.apache.beam.sdk.Pipeline;
@@ -28,6 +30,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ApproximateQuantiles.ApproximateQuantilesCombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -232,6 +235,16 @@ public class ApproximateQuantilesTest {
         Arrays.asList("b", "aaa", "ccccc"));
   }
 
+  @Test
+  public void testDisplayData() {
+    Top.Largest<Integer> comparer = new Top.Largest<Integer>();
+    PTransform<?, ?> approxQuanitiles = ApproximateQuantiles.globally(20, comparer);
+    DisplayData displayData = DisplayData.from(approxQuanitiles);
+
+    assertThat(displayData, hasDisplayItem("numQuantiles", 20));
+    assertThat(displayData, hasDisplayItem("comparer", comparer.getClass()));
+  }
+
   private Matcher<Iterable<? extends Integer>> quantileMatcher(
       int size, int numQuantiles, int absoluteError) {
     List<Matcher<? super Integer>> quantiles = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 3a4b813..c94c9f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -26,6 +29,7 @@ import org.apache.beam.sdk.TestUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -290,4 +294,17 @@ public class ApproximateUniqueTest implements Serializable {
       return null;
     }
   }
+
+  @Test
+  public void testDisplayData() {
+    ApproximateUnique.Globally<Integer> specifiedSampleSize = ApproximateUnique.globally(1234);
+    ApproximateUnique.PerKey<String, Integer> specifiedMaxError = ApproximateUnique.perKey(0.1234);
+
+    assertThat(DisplayData.from(specifiedSampleSize), hasDisplayItem("sampleSize", 1234));
+
+    DisplayData maxErrorDisplayData = DisplayData.from(specifiedMaxError);
+    assertThat(maxErrorDisplayData, hasDisplayItem("maximumEstimationError", 0.1234));
+    assertThat("calculated sampleSize should be included", maxErrorDisplayData,
+        hasDisplayItem(hasKey("sampleSize")));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 0a02538..e66f13a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes;
+
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
@@ -35,6 +38,7 @@ import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
 import org.apache.beam.sdk.transforms.Min.MinIntegerFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -61,7 +65,7 @@ import java.util.List;
  * Unit tests for {@link CombineFns}.
  */
 @RunWith(JUnit4.class)
-public class CombineFnsTest {
+public class  CombineFnsTest {
   @Rule public ExpectedException expectedException = ExpectedException.none();
 
   @Test
@@ -278,6 +282,69 @@ public class CombineFnsTest {
     p.run();
   }
 
+  @Test
+  public void testComposedCombineDisplayData() {
+    SimpleFunction<String, String> extractFn = new SimpleFunction<String, String>() {
+      @Override
+      public String apply(String input) {
+        return input;
+      }
+    };
+
+    DisplayDataCombineFn combineFn1 = new DisplayDataCombineFn("value1");
+    DisplayDataCombineFn combineFn2 = new DisplayDataCombineFn("value2");
+
+    CombineFns.ComposedCombineFn<String> composedCombine = CombineFns.compose()
+        .with(extractFn, combineFn1, new TupleTag<String>())
+        .with(extractFn, combineFn2, new TupleTag<String>());
+
+    DisplayData displayData = DisplayData.from(composedCombine);
+    assertThat(displayData, hasDisplayItem("combineFn1", combineFn1.getClass()));
+    assertThat(displayData, hasDisplayItem("combineFn2", combineFn2.getClass()));
+
+    String nsBase = DisplayDataCombineFn.class.getName();
+    assertThat(displayData, includes(combineFn1, nsBase + "#1"));
+    assertThat(displayData, includes(combineFn2, nsBase + "#2"));
+  }
+
+  private static class DisplayDataCombineFn extends Combine.CombineFn<String, String, String> {
+    private final String value;
+    private static int i;
+    private final int id;
+
+    DisplayDataCombineFn(String value) {
+      id = ++i;
+      this.value = value;
+    }
+
+    @Override
+    public String createAccumulator() {
+      return null;
+    }
+
+    @Override
+    public String addInput(String accumulator, String input) {
+      return null;
+    }
+
+    @Override
+    public String mergeAccumulators(Iterable<String> accumulators) {
+      return null;
+    }
+
+    @Override
+    public String extractOutput(String accumulator) {
+      return null;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add("uniqueKey" + id, value)
+          .add("sharedKey", value);
+    }
+  }
+
   private static class UserString implements Serializable {
     private String strValue;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 049baa3..b710641 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -18,7 +18,8 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
-
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import static org.junit.Assert.assertEquals;
@@ -44,6 +45,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -682,6 +684,24 @@ public class CombineTest implements Serializable {
         Combine.perKey(new TestKeyedCombineFn()).withHotKeyFanout(10).getName());
   }
 
+  @Test
+  public void testDisplayData() {
+    UniqueInts combineFn = new UniqueInts() {
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add("fnMetadata", "foobar");
+      }
+    };
+    Combine.Globally<?, ?> combine = Combine.globally(combineFn)
+        .withFanout(1234);
+    DisplayData displayData = DisplayData.from(combine);
+
+    assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass()));
+    assertThat(displayData, hasDisplayItem("emitDefaultOnEmptyInput", true));
+    assertThat(displayData, hasDisplayItem("fanout", 1234));
+    assertThat(displayData, includes(combineFn));
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   // Test classes, for different kinds of combining fns.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 5722365..226255a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -18,9 +18,12 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
-
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
 import com.google.common.collect.Lists;
 
 import org.junit.Test;
@@ -65,4 +68,12 @@ public class MaxTest {
         Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
         4.0);
   }
+
+  @Test
+  public void testDisplayData() {
+    Top.Largest<Integer> comparer = new Top.Largest<>();
+
+    Combine.Globally<Integer, Integer> max = Max.globally(comparer);
+    assertThat(DisplayData.from(max), hasDisplayItem("comparer", comparer.getClass()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index 8f1d301..d7ec322 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 
+import static org.apache.beam.sdk.TestUtils.checkCombineFn;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import com.google.common.collect.Lists;
 
 import org.junit.Test;
@@ -65,4 +68,12 @@ public class MinTest {
         Lists.newArrayList(1.0, 2.0, 3.0, 4.0),
         1.0);
   }
+
+  @Test
+  public void testDisplayData() {
+    Top.Smallest<Integer> comparer = new Top.Smallest<>();
+
+    Combine.Globally<Integer, Integer> min = Min.globally(comparer);
+    assertThat(DisplayData.from(min), hasDisplayItem("comparer", comparer.getClass()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 0c2af3f..4b1d5dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.transforms;
 import static org.apache.beam.sdk.TestUtils.LINES;
 import static org.apache.beam.sdk.TestUtils.NO_LINES;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.base.Joiner;
@@ -260,4 +263,15 @@ public class SampleTest {
   public void testSampleGetName() {
     assertEquals("Sample.SampleAny", Sample.<String>any(1).getName());
   }
+
+  @Test
+  public void testDisplayData() {
+    PTransform<?, ?> sampleAny = Sample.any(1234);
+    DisplayData sampleAnyDisplayData = DisplayData.from(sampleAny);
+    assertThat(sampleAnyDisplayData, hasDisplayItem("sampleSize", 1234));
+
+    PTransform<?, ?> samplePerKey = Sample.fixedSizePerKey(2345);
+    DisplayData perKeyDisplayData = DisplayData.from(samplePerKey);
+    assertThat(perKeyDisplayData, hasDisplayItem("sampleSize", 2345));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index 1815cc9..6d580e7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.sdk.Pipeline;
@@ -25,6 +27,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.Bound;
@@ -233,6 +236,16 @@ public class TopTest {
     assertEquals("Largest.PerKey", Top.<String, Integer>largestPerKey(2).getName());
   }
 
+  @Test
+  public void testDisplayData() {
+    Top.Largest<Integer> comparer = new Top.Largest<Integer>();
+    Combine.Globally<Integer, List<Integer>> top = Top.of(1234, comparer);
+    DisplayData displayData = DisplayData.from(top);
+
+    assertThat(displayData, hasDisplayItem("count", 1234));
+    assertThat(displayData, hasDisplayItem("comparer", comparer.getClass()));
+  }
+
   private static class OrderByLength implements Comparator<String>, Serializable {
     @Override
     public int compare(String a, String b) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java
new file mode 100644
index 0000000..19f56c6
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/ClassForDisplayTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.display;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.util.SerializableUtils;
+import com.google.common.testing.EqualsTester;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link ClassForDisplay}.
+ */
+@RunWith(JUnit4.class)
+public class ClassForDisplayTest {
+  @Rule
+  public final ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testProperties() {
+    ClassForDisplay thisClass = ClassForDisplay.of(ClassForDisplayTest.class);
+    assertEquals(ClassForDisplayTest.class.getName(), thisClass.getName());
+    assertEquals(ClassForDisplayTest.class.getSimpleName(), thisClass.getSimpleName());
+  }
+
+  @Test
+  public void testInputValidation() {
+    thrown.expect(NullPointerException.class);
+    ClassForDisplay.of(null);
+  }
+
+  @Test
+  public void testEquality() {
+    new EqualsTester()
+        .addEqualityGroup(
+            ClassForDisplay.of(ClassForDisplayTest.class), ClassForDisplay.fromInstance(this))
+        .addEqualityGroup(ClassForDisplay.of(ClassForDisplay.class))
+        .addEqualityGroup(ClassForDisplay.of(Class.class))
+        .testEquals();
+  }
+
+  @Test
+  public void testSerialization() {
+    SerializableUtils.ensureSerializable(ClassForDisplay.of(ClassForDisplayTest.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
index 8cfb5c2..abdc350 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
@@ -149,40 +149,42 @@ public class DisplayDataMatchers {
     }
   }
 
-  /**
-   * Create a matcher that matches if the examined {@link DisplayData} contains all display data
-   * registered from the specified subcomponent.
-   */
-  public static Matcher<DisplayData> includes(final HasDisplayData subComponent) {
+  /** @see #includes(HasDisplayData, String) */
+  public static Matcher<DisplayData> includes(HasDisplayData subComponent) {
     return includes(subComponent, subComponent.getClass());
   }
 
+  /** @see #includes(HasDisplayData, String) */
+  public static Matcher<DisplayData> includes(
+      HasDisplayData subComponent, Class<? extends HasDisplayData> namespace) {
+    return includes(subComponent, namespace.getName());
+  }
+
   /**
    * Create a matcher that matches if the examined {@link DisplayData} contains all display data
    * registered from the specified subcomponent and namespace.
    */
   public static Matcher<DisplayData> includes(
-      final HasDisplayData subComponent, final Class<? extends HasDisplayData> namespace) {
+      final HasDisplayData subComponent, final String namespace) {
     return new CustomTypeSafeMatcher<DisplayData>("includes subcomponent") {
       @Override
       protected boolean matchesSafely(DisplayData displayData) {
-        DisplayData subComponentData = DisplayData.from(subComponent);
+        DisplayData subComponentData = subComponentData();
         if (subComponentData.items().size() == 0) {
           throw new UnsupportedOperationException("subComponent contains no display data; " +
               "cannot verify whether it is included");
         }
 
-        DisplayDataComparision comparison = checkSubset(displayData, subComponentData, namespace);
+        DisplayDataComparison comparison = checkSubset(displayData, subComponentData);
         return comparison.missingItems.isEmpty();
       }
 
-
       @Override
       protected void describeMismatchSafely(
           DisplayData displayData, Description mismatchDescription) {
-        DisplayData subComponentDisplayData = DisplayData.from(subComponent);
-        DisplayDataComparision comparison = checkSubset(
-            displayData, subComponentDisplayData, subComponent.getClass());
+        DisplayData subComponentDisplayData = subComponentData();
+        DisplayDataComparison comparison = checkSubset(
+            displayData, subComponentDisplayData);
 
         mismatchDescription
             .appendText("did not include:\n")
@@ -191,12 +193,21 @@ public class DisplayDataMatchers {
             .appendValue(comparison.unmatchedItems);
       }
 
-      private DisplayDataComparision checkSubset(
-          DisplayData displayData, DisplayData included, Class<?> namespace) {
-        DisplayDataComparision comparison = new DisplayDataComparision(displayData.items());
+      private DisplayData subComponentData() {
+        return DisplayData.from(new HasDisplayData() {
+          @Override
+          public void populateDisplayData(DisplayData.Builder builder) {
+            builder.include(subComponent, namespace);
+          }
+        });
+      }
+
+      private DisplayDataComparison checkSubset(
+          DisplayData displayData, DisplayData included) {
+        DisplayDataComparison comparison = new DisplayDataComparison(displayData.items());
         for (Item item : included.items()) {
           Item matchedItem = displayData.asMap().get(
-              DisplayData.Identifier.of(namespace, item.getKey()));
+              DisplayData.Identifier.of(item.getNamespace(), item.getKey()));
 
           if (matchedItem != null) {
             comparison.matched(matchedItem);
@@ -208,11 +219,11 @@ public class DisplayDataMatchers {
         return comparison;
       }
 
-      class DisplayDataComparision {
+      class DisplayDataComparison {
         Collection<DisplayData.Item> missingItems;
         Collection<DisplayData.Item> unmatchedItems;
 
-        DisplayDataComparision(Collection<Item> superset) {
+        DisplayDataComparison(Collection<Item> superset) {
           missingItems = Sets.newHashSet();
           unmatchedItems = Sets.newHashSet(superset);
         }
@@ -315,7 +326,9 @@ public class DisplayDataMatchers {
             valueMatcher, "with value", "value") {
       @Override
       protected T featureValueOf(DisplayData.Item actual) {
-        return (T) actual.getValue();
+        @SuppressWarnings("unchecked")
+        T value = (T) actual.getValue();
+        return value;
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0baa4c9/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 5aee8dd..106c441 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -340,7 +340,7 @@ public class DisplayDataTest {
       @Override
       public void populateDisplayData(Builder builder) {
         builder.add("foo", "bar")
-            .withNamespace(null);
+            .withNamespace((Class<?>) null);
       }
     });
   }
@@ -349,10 +349,10 @@ public class DisplayDataTest {
   public void testIdentifierEquality() {
     new EqualsTester()
         .addEqualityGroup(
-            DisplayData.Identifier.of(DisplayDataTest.class, "1"),
-            DisplayData.Identifier.of(DisplayDataTest.class, "1"))
-        .addEqualityGroup(DisplayData.Identifier.of(Object.class, "1"))
-        .addEqualityGroup(DisplayData.Identifier.of(DisplayDataTest.class, "2"))
+            DisplayData.Identifier.of(ClassForDisplay.of(DisplayDataTest.class), "1"),
+            DisplayData.Identifier.of(ClassForDisplay.of(DisplayDataTest.class), "1"))
+        .addEqualityGroup(DisplayData.Identifier.of(ClassForDisplay.of(Object.class), "1"))
+        .addEqualityGroup(DisplayData.Identifier.of(ClassForDisplay.of(DisplayDataTest.class), "2"))
         .testEquals();
   }
 
@@ -568,6 +568,7 @@ public class DisplayDataTest {
                     .add("float", 3.14)
                     .add("boolean", true)
                     .add("java_class", DisplayDataTest.class)
+                    .add("java_class2", ClassForDisplay.of(DisplayDataTest.class))
                     .add("timestamp", Instant.now())
                     .add("duration", Duration.standardHours(1));
               }
@@ -585,6 +586,9 @@ public class DisplayDataTest {
         hasItem(allOf(hasKey("java_class"), hasType(DisplayData.Type.JAVA_CLASS))));
     assertThat(
         items,
+        hasItem(allOf(hasKey("java_class2"), hasType(DisplayData.Type.JAVA_CLASS))));
+    assertThat(
+        items,
         hasItem(allOf(hasKey("timestamp"), hasType(DisplayData.Type.TIMESTAMP))));
     assertThat(
         items, hasItem(allOf(hasKey("duration"), hasType(DisplayData.Type.DURATION))));
@@ -678,6 +682,8 @@ public class DisplayDataTest {
     assertEquals(DisplayData.Type.BOOLEAN, DisplayData.inferType(true));
     assertEquals(DisplayData.Type.TIMESTAMP, DisplayData.inferType(Instant.now()));
     assertEquals(DisplayData.Type.DURATION, DisplayData.inferType(Duration.millis(1234)));
+    assertEquals(DisplayData.Type.JAVA_CLASS,
+        DisplayData.inferType(ClassForDisplay.of(DisplayDataTest.class)));
     assertEquals(DisplayData.Type.JAVA_CLASS, DisplayData.inferType(DisplayDataTest.class));
     assertEquals(DisplayData.Type.STRING, DisplayData.inferType("hello world"));
 
@@ -773,7 +779,7 @@ public class DisplayDataTest {
     DisplayData.from(new HasDisplayData() {
         @Override
         public void populateDisplayData(Builder builder) {
-          builder.include(subComponent, null);
+          builder.include(subComponent, (ClassForDisplay) null);
         }
       });
   }