You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/10/20 18:19:00 UTC

[1/3] incubator-beam git commit: Add Display Data 'path' metadata

Repository: incubator-beam
Updated Branches:
  refs/heads/master ff6301bd6 -> 5047cf746


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
index 5a58519..4d7814c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -74,7 +74,7 @@ public class BoundedReadFromUnboundedSourceTest implements Serializable{
     };
 
     BoundedReadFromUnboundedSource<KV<Integer, Integer>> read = Read.from(src).withMaxNumRecords(5);
-    assertThat(DisplayData.from(read), includesDisplayDataFrom(src));
+    assertThat(DisplayData.from(read), includesDisplayDataFor("source", src));
   }
 
   private static class Checker

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index d7c451d..f8769ea 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
@@ -424,7 +424,7 @@ public class CompressedSourceTest {
     assertThat(compressedSourceDisplayData, hasDisplayItem("compressionMode"));
     assertThat(gzipDisplayData, hasDisplayItem("compressionMode", CompressionMode.GZIP.toString()));
     assertThat(compressedSourceDisplayData, hasDisplayItem("source", inputSource.getClass()));
-    assertThat(compressedSourceDisplayData, includesDisplayDataFrom(inputSource));
+    assertThat(compressedSourceDisplayData, includesDisplayDataFor("source", inputSource));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index a5138c5..2e90f9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasItem;
 
@@ -96,11 +96,11 @@ public class ReadTest implements Serializable{
 
     DisplayData boundedDisplayData = DisplayData.from(bounded);
     assertThat(boundedDisplayData, hasDisplayItem("source", boundedSource.getClass()));
-    assertThat(boundedDisplayData, includesDisplayDataFrom(boundedSource));
+    assertThat(boundedDisplayData, includesDisplayDataFor("source", boundedSource));
 
     DisplayData unboundedDisplayData = DisplayData.from(unbounded);
     assertThat(unboundedDisplayData, hasDisplayItem("source", unboundedSource.getClass()));
-    assertThat(unboundedDisplayData, includesDisplayDataFrom(unboundedSource));
+    assertThat(unboundedDisplayData, includesDisplayDataFor("source", unboundedSource));
     assertThat(unboundedDisplayData, hasDisplayItem("maxRecords", 1234));
     assertThat(unboundedDisplayData, hasDisplayItem("maxReadTime", maxReadTime));
   }
@@ -142,12 +142,12 @@ public class ReadTest implements Serializable{
     Set<DisplayData> boundedDisplayData = evaluator
         .displayDataForPrimitiveSourceTransforms(bounded);
     assertThat(boundedDisplayData, hasItem(hasDisplayItem("source", boundedSource.getClass())));
-    assertThat(boundedDisplayData, hasItem(includesDisplayDataFrom(boundedSource)));
+    assertThat(boundedDisplayData, hasItem(includesDisplayDataFor("source", boundedSource)));
 
     Set<DisplayData> unboundedDisplayData = evaluator
         .displayDataForPrimitiveSourceTransforms(unbounded);
     assertThat(unboundedDisplayData, hasItem(hasDisplayItem("source")));
-    assertThat(unboundedDisplayData, hasItem(includesDisplayDataFrom(unboundedSource)));
+    assertThat(unboundedDisplayData, hasItem(includesDisplayDataFor("source", unboundedSource)));
   }
 
   private abstract static class CustomBoundedSource extends BoundedSource<String> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 997566a..5be5ff1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -274,7 +274,7 @@ public class WriteTest {
     DisplayData displayData = DisplayData.from(write);
 
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
-    assertThat(displayData, includesDisplayDataFrom(sink));
+    assertThat(displayData, includesDisplayDataFor("sink", sink));
   }
 
   @Test
@@ -288,7 +288,7 @@ public class WriteTest {
     Write.Bound<String> write = Write.to(sink).withNumShards(1);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
-    assertThat(displayData, includesDisplayDataFrom(sink));
+    assertThat(displayData, includesDisplayDataFor("sink", sink));
     assertThat(displayData, hasDisplayItem("numShards", 1));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 32222d3..52b98ee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -24,6 +24,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -34,6 +35,7 @@ import static org.junit.Assert.assertTrue;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
@@ -48,11 +50,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TestRule;
@@ -756,6 +764,38 @@ public class ProxyInvocationHandlerTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void pipelineOptionsDisplayDataExceptionShouldFail() {
+    Object brokenValueType = new Object() {
+      @JsonValue
+      public int getValue () {
+        return 42;
+      }
+
+      @Override
+      public String toString() {
+        throw new RuntimeException("oh noes!!");
+      }
+    };
+
+    Pipeline p = TestPipeline.create();
+    p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
+
+    p.apply(Create.of(1, 2, 3));
+
+    expectedException.expectMessage(
+        ProxyInvocationHandler.PipelineOptionsDisplayData.class.getName());
+    expectedException.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
+    p.run();
+  }
+
+  /** {@link PipelineOptions} to inject bad object implementations. */
+  public interface ObjectPipelineOptions extends PipelineOptions {
+    Object getValue();
+    void setValue(Object value);
+  }
+
+  @Test
   public void testDisplayDataInheritanceNamespace() {
     ExtendsBaseOptions options = PipelineOptionsFactory.as(ExtendsBaseOptions.class);
     options.setFoo("bar");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 35f9858..8862531 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
@@ -298,9 +298,8 @@ public class  CombineFnsTest {
     assertThat(displayData, hasDisplayItem("combineFn1", combineFn1.getClass()));
     assertThat(displayData, hasDisplayItem("combineFn2", combineFn2.getClass()));
 
-    String nsBase = DisplayDataCombineFn.class.getName();
-    assertThat(displayData, includesDisplayDataFrom(combineFn1, nsBase + "#1"));
-    assertThat(displayData, includesDisplayDataFrom(combineFn2, nsBase + "#2"));
+    assertThat(displayData, includesDisplayDataFor("combineFn1", combineFn1));
+    assertThat(displayData, includesDisplayDataFor("combineFn2", combineFn2));
   }
 
   private static class DisplayDataCombineFn extends Combine.CombineFn<String, String, String> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 5ce8055..671f00e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -682,7 +682,7 @@ public class CombineTest implements Serializable {
     assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass()));
     assertThat(displayData, hasDisplayItem("emitDefaultOnEmptyInput", true));
     assertThat(displayData, hasDisplayItem("fanout", 1234));
-    assertThat(displayData, includesDisplayDataFrom(combineFn));
+    assertThat(displayData, includesDisplayDataFor("combineFn", combineFn));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index bda696f..52244a0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -21,7 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
@@ -1432,7 +1432,7 @@ public class ParDoTest implements Serializable {
         hasType(DisplayData.Type.JAVA_CLASS),
         DisplayDataMatchers.hasValue(fn.getClass().getName()))));
 
-    assertThat(displayData, includesDisplayDataFrom(fn));
+    assertThat(displayData, includesDisplayDataFor("fn", fn));
   }
 
   @Test
@@ -1450,7 +1450,7 @@ public class ParDoTest implements Serializable {
     Bound<String, String> parDo = ParDo.of(fn);
 
     DisplayData displayData = DisplayData.from(parDo);
-    assertThat(displayData, includesDisplayDataFrom(fn));
+    assertThat(displayData, includesDisplayDataFor("fn", fn));
     assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
   }
 
@@ -1494,7 +1494,7 @@ public class ParDoTest implements Serializable {
             .of(fn);
 
     DisplayData displayData = DisplayData.from(parDo);
-    assertThat(displayData, includesDisplayDataFrom(fn));
+    assertThat(displayData, includesDisplayDataFor("fn", fn));
     assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
index e9db522..7e0bd12 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java
@@ -18,8 +18,10 @@
 package org.apache.beam.sdk.transforms.display;
 
 import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.is;
 
 import com.google.common.collect.Sets;
+import java.util.Arrays;
 import java.util.Collection;
 import org.apache.beam.sdk.transforms.display.DisplayData.Item;
 import org.hamcrest.CustomTypeSafeMatcher;
@@ -44,10 +46,10 @@ public class DisplayDataMatchers {
    * Creates a matcher that matches if the examined {@link DisplayData} contains any items.
    */
   public static Matcher<DisplayData> hasDisplayItem() {
-    return new FeatureMatcher<DisplayData, Collection<DisplayData.Item<?>>>(
+    return new FeatureMatcher<DisplayData, Collection<DisplayData.Item>>(
         Matchers.not(Matchers.empty()), "DisplayData", "DisplayData") {
       @Override
-      protected Collection<Item<?>> featureValueOf(DisplayData actual) {
+      protected Collection<Item> featureValueOf(DisplayData actual) {
         return actual.items();
       }
     };
@@ -130,14 +132,14 @@ public class DisplayDataMatchers {
    * Creates a matcher that matches if the examined {@link DisplayData} contains any item
    * matching the specified {@code itemMatcher}.
    */
-  public static Matcher<DisplayData> hasDisplayItem(Matcher<DisplayData.Item<?>> itemMatcher) {
+  public static Matcher<DisplayData> hasDisplayItem(Matcher<DisplayData.Item> itemMatcher) {
     return new HasDisplayDataItemMatcher(itemMatcher);
   }
 
   private static class HasDisplayDataItemMatcher extends TypeSafeDiagnosingMatcher<DisplayData> {
-    private final Matcher<Item<?>> itemMatcher;
+    private final Matcher<Item> itemMatcher;
 
-    private HasDisplayDataItemMatcher(Matcher<DisplayData.Item<?>> itemMatcher) {
+    private HasDisplayDataItemMatcher(Matcher<DisplayData.Item> itemMatcher) {
       this.itemMatcher = itemMatcher;
     }
 
@@ -149,7 +151,7 @@ public class DisplayDataMatchers {
 
     @Override
     protected boolean matchesSafely(DisplayData data, Description mismatchDescription) {
-      Collection<Item<?>> items = data.items();
+      Collection<Item> items = data.items();
       boolean isMatch = Matchers.hasItem(itemMatcher).matches(items);
       if (!isMatch) {
         mismatchDescription.appendText("found " + items.size() + " non-matching item(s):\n");
@@ -160,42 +162,31 @@ public class DisplayDataMatchers {
     }
   }
 
-  /** @see #includesDisplayDataFrom(HasDisplayData, String) */
-  public static Matcher<DisplayData> includesDisplayDataFrom(HasDisplayData subComponent) {
-    return includesDisplayDataFrom(subComponent, subComponent.getClass());
-  }
-
-  /** @see #includesDisplayDataFrom(HasDisplayData, String) */
-  public static Matcher<DisplayData> includesDisplayDataFrom(
-      HasDisplayData subComponent, Class<? extends HasDisplayData> namespace) {
-    return includesDisplayDataFrom(subComponent, namespace.getName());
-  }
-
   /**
    * Create a matcher that matches if the examined {@link DisplayData} contains all display data
    * registered from the specified subcomponent and namespace.
    */
-  public static Matcher<DisplayData> includesDisplayDataFrom(
-      final HasDisplayData subComponent, final String namespace) {
+  public static Matcher<DisplayData> includesDisplayDataFor(
+      final String path, final HasDisplayData subComponent) {
     return new CustomTypeSafeMatcher<DisplayData>("includes subcomponent") {
       @Override
       protected boolean matchesSafely(DisplayData displayData) {
-        DisplayData subComponentData = subComponentData();
+        DisplayData subComponentData = subComponentData(path);
         if (subComponentData.items().size() == 0) {
           throw new UnsupportedOperationException("subComponent contains no display data; "
               + "cannot verify whether it is included");
         }
 
-        DisplayDataComparison comparison = checkSubset(displayData, subComponentData);
+        DisplayDataComparison comparison = checkSubset(displayData, subComponentData, path);
         return comparison.missingItems.isEmpty();
       }
 
       @Override
       protected void describeMismatchSafely(
           DisplayData displayData, Description mismatchDescription) {
-        DisplayData subComponentDisplayData = subComponentData();
+        DisplayData subComponentDisplayData = subComponentData(path);
         DisplayDataComparison comparison = checkSubset(
-            displayData, subComponentDisplayData);
+            displayData, subComponentDisplayData, path);
 
         mismatchDescription
             .appendText("did not include:\n")
@@ -204,21 +195,21 @@ public class DisplayDataMatchers {
             .appendValue(comparison.unmatchedItems);
       }
 
-      private DisplayData subComponentData() {
+      private DisplayData subComponentData(final String path) {
         return DisplayData.from(new HasDisplayData() {
           @Override
           public void populateDisplayData(DisplayData.Builder builder) {
-            builder.include(subComponent, namespace);
+            builder.include(path, subComponent);
           }
         });
       }
 
       private DisplayDataComparison checkSubset(
-          DisplayData displayData, DisplayData included) {
+          DisplayData displayData, DisplayData included, String path) {
         DisplayDataComparison comparison = new DisplayDataComparison(displayData.items());
-        for (Item<?> item : included.items()) {
-          Item<?> matchedItem = displayData.asMap().get(
-              DisplayData.Identifier.of(item.getNamespace(), item.getKey()));
+        for (Item item : included.items()) {
+          Item matchedItem = displayData.asMap().get(DisplayData.Identifier.of(
+              DisplayData.Path.absolute(path), item.getNamespace(), item.getKey()));
 
           if (matchedItem != null) {
             comparison.matched(matchedItem);
@@ -231,19 +222,19 @@ public class DisplayDataMatchers {
       }
 
       class DisplayDataComparison {
-        Collection<DisplayData.Item<?>> missingItems;
-        Collection<DisplayData.Item<?>> unmatchedItems;
+        Collection<Item> missingItems;
+        Collection<Item> unmatchedItems;
 
-        DisplayDataComparison(Collection<Item<?>> superset) {
+        DisplayDataComparison(Collection<Item> superset) {
           missingItems = Sets.newHashSet();
           unmatchedItems = Sets.newHashSet(superset);
         }
 
-        void matched(Item<?> supersetItem) {
+        void matched(Item supersetItem) {
           unmatchedItems.remove(supersetItem);
         }
 
-        void missing(Item<?> subsetItem) {
+        void missing(Item subsetItem) {
           missingItems.add(subsetItem);
         }
       }
@@ -254,59 +245,59 @@ public class DisplayDataMatchers {
    * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a key
    * with the specified value.
    */
-  public static Matcher<DisplayData.Item<?>> hasKey(String key) {
-    return hasKey(Matchers.is(key));
+  public static Matcher<DisplayData.Item> hasKey(String key) {
+    return hasKey(is(key));
   }
 
   /**
    * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a key
    * matching the specified key matcher.
    */
-  public static Matcher<DisplayData.Item<?>> hasKey(Matcher<String> keyMatcher) {
-    return new FeatureMatcher<DisplayData.Item<?>, String>(keyMatcher, "with key", "key") {
+  public static Matcher<DisplayData.Item> hasKey(Matcher<String> keyMatcher) {
+    return new FeatureMatcher<DisplayData.Item, String>(keyMatcher, "with key", "key") {
       @Override
-      protected String featureValueOf(DisplayData.Item<?> actual) {
+      protected String featureValueOf(DisplayData.Item actual) {
         return actual.getKey();
       }
     };
   }
 
+
   /**
-   * Creates a matcher that matches if the examined {@link DisplayData.Item} contains the
-   * specified namespace.
+   * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a path
+   * matching the specified namespace.
    */
-  public static Matcher<DisplayData.Item<?>> hasNamespace(Class<?> namespace) {
-    return hasNamespace(Matchers.<Class<?>>is(namespace));
+  public static Matcher<DisplayData.Item> hasPath(String... paths) {
+    DisplayData.Path path = (paths.length == 0)
+        ? DisplayData.Path.root()
+        : DisplayData.Path.absolute(paths[0], Arrays.copyOfRange(paths, 1, paths.length));
+    return new FeatureMatcher<DisplayData.Item, DisplayData.Path>(
+        is(path), " with namespace", "namespace") {
+      @Override
+      protected DisplayData.Path featureValueOf(DisplayData.Item actual) {
+        return actual.getPath();
+      }
+    };
   }
 
   /**
    * Creates a matcher that matches if the examined {@link DisplayData.Item} contains the
    * specified namespace.
    */
-  public static Matcher<DisplayData.Item<?>> hasNamespace(String namespace) {
-    return new FeatureMatcher<DisplayData.Item<?>, String>(
-        Matchers.is(namespace), "display item with namespace", "namespace") {
-      @Override
-      protected String featureValueOf(Item<?> actual) {
-        return actual.getNamespace();
-      }
-    };
+  public static Matcher<DisplayData.Item> hasNamespace(Class<?> namespace) {
+    return hasNamespace(Matchers.<Class<?>>is(namespace));
   }
 
   /**
    * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a namespace
    * matching the specified namespace matcher.
    */
-  public static Matcher<DisplayData.Item<?>> hasNamespace(Matcher<Class<?>> namespaceMatcher) {
-    return new FeatureMatcher<DisplayData.Item<?>, Class<?>>(
-        namespaceMatcher, "display item with namespace", "namespace") {
+  public static Matcher<DisplayData.Item> hasNamespace(Matcher<Class<?>> namespaceMatcher) {
+    return new FeatureMatcher<DisplayData.Item, Class<?>>(
+        namespaceMatcher, " with namespace", "namespace") {
       @Override
-      protected Class<?> featureValueOf(DisplayData.Item<?> actual) {
-        try {
-          return Class.forName(actual.getNamespace());
-        } catch (ClassNotFoundException e) {
-          return null;
-        }
+      protected Class<?> featureValueOf(DisplayData.Item actual) {
+        return actual.getNamespace();
       }
     };
   }
@@ -315,19 +306,19 @@ public class DisplayDataMatchers {
    * Creates a matcher that matches if the examined {@link DisplayData.Item} matches the
    * specified type.
    */
-  public static Matcher<DisplayData.Item<?>> hasType(DisplayData.Type type) {
-    return hasType(Matchers.is(type));
+  public static Matcher<DisplayData.Item> hasType(DisplayData.Type type) {
+    return hasType(is(type));
   }
 
   /**
    * Creates a matcher that matches if the examined {@link DisplayData.Item} has a type
    * matching the specified type matcher.
    */
-  public static Matcher<DisplayData.Item<?>> hasType(Matcher<DisplayData.Type> typeMatcher) {
-    return new FeatureMatcher<DisplayData.Item<?>, DisplayData.Type>(
+  public static Matcher<DisplayData.Item> hasType(Matcher<DisplayData.Type> typeMatcher) {
+    return new FeatureMatcher<DisplayData.Item, DisplayData.Type>(
             typeMatcher, "with type", "type") {
       @Override
-      protected DisplayData.Type featureValueOf(DisplayData.Item<?> actual) {
+      protected DisplayData.Type featureValueOf(DisplayData.Item actual) {
         return actual.getType();
       }
     };
@@ -338,19 +329,19 @@ public class DisplayDataMatchers {
    * value.
    */
 
-  public static Matcher<DisplayData.Item<?>> hasValue(Object value) {
-    return hasValue(Matchers.is(value));
+  public static Matcher<DisplayData.Item> hasValue(Object value) {
+    return hasValue(is(value));
   }
 
   /**
    * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a value
    * matching the specified value matcher.
    */
-  public static <T> Matcher<DisplayData.Item<?>> hasValue(Matcher<T> valueMatcher) {
-    return new FeatureMatcher<DisplayData.Item<?>, T>(
+  public static <T> Matcher<DisplayData.Item> hasValue(Matcher<T> valueMatcher) {
+    return new FeatureMatcher<DisplayData.Item, T>(
             valueMatcher, "with value", "value") {
       @Override
-      protected T featureValueOf(DisplayData.Item<?> actual) {
+      protected T featureValueOf(DisplayData.Item actual) {
         @SuppressWarnings("unchecked")
         T value = (T) actual.getValue();
         return value;
@@ -362,19 +353,19 @@ public class DisplayDataMatchers {
    * Creates a matcher that matches if the examined {@link DisplayData.Item} has the specified
    * label.
    */
-  public static Matcher<DisplayData.Item<?>> hasLabel(String label) {
-    return hasLabel(Matchers.is(label));
+  public static Matcher<DisplayData.Item> hasLabel(String label) {
+    return hasLabel(is(label));
   }
 
   /**
    * Creates a matcher that matches if the examined {@link DisplayData.Item} has a label matching
    * the specified label matcher.
    */
-  public static Matcher<DisplayData.Item<?>> hasLabel(Matcher<String> labelMatcher) {
-    return new FeatureMatcher<DisplayData.Item<?>, String>(
+  public static Matcher<DisplayData.Item> hasLabel(Matcher<String> labelMatcher) {
+    return new FeatureMatcher<DisplayData.Item, String>(
         labelMatcher, "display item with label", "label") {
       @Override
-      protected String featureValueOf(DisplayData.Item<?> actual) {
+      protected String featureValueOf(DisplayData.Item actual) {
         return actual.getLabel();
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
index 3ea6830..f7f8d40 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
@@ -19,9 +19,10 @@ package org.apache.beam.sdk.transforms.display;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasPath;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -100,6 +101,32 @@ public class DisplayDataMatchersTest {
   }
 
   @Test
+  public void testHasPath() {
+    Matcher<DisplayData> matcher = hasDisplayItem(hasPath("a", "b"));
+
+    final HasDisplayData subComponent = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.include("b", new HasDisplayData() {
+          @Override
+          public void populateDisplayData(Builder builder) {
+            builder.add(DisplayData.item("foo", "bar"));
+          }
+        });
+      }
+    };
+
+    assertFalse(matcher.matches(DisplayData.from(subComponent)));
+
+    assertThat(DisplayData.from(new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.include("a", subComponent);
+      }
+    }), matcher);
+  }
+
+  @Test
   public void testHasNamespace() {
     Matcher<DisplayData> matcher = hasDisplayItem(hasNamespace(SampleTransform.class));
 
@@ -124,25 +151,47 @@ public class DisplayDataMatchersTest {
     HasDisplayData hasSubcomponent = new HasDisplayData() {
       @Override
       public void populateDisplayData(Builder builder) {
-        builder
-          .include(subComponent)
-          .add(DisplayData.item("foo2", "bar2"));
+        builder.include("p", subComponent);
       }
     };
-    HasDisplayData sameKeyDifferentNamespace = new HasDisplayData() {
+
+    HasDisplayData wrongPath = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.include("q", subComponent);
+      }
+    };
+
+    HasDisplayData deeplyNested = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.include("p", new HasDisplayData() {
+          @Override
+          public void populateDisplayData(Builder builder) {
+            builder.include("p", subComponent);
+          }
+        });
+      }
+    };
+
+    HasDisplayData sameDisplayItemDifferentComponent = new HasDisplayData() {
       @Override
       public void populateDisplayData(Builder builder) {
         builder.add(DisplayData.item("foo", "bar"));
       }
     };
-    Matcher<DisplayData> matcher = includesDisplayDataFrom(subComponent);
 
-    assertFalse(matcher.matches(DisplayData.from(sameKeyDifferentNamespace)));
+    Matcher<DisplayData> matcher = includesDisplayDataFor("p", subComponent);
+
+    assertFalse("should not match sub-component at different path",
+        matcher.matches(DisplayData.from(wrongPath)));
+    assertFalse("should not match deeply nested sub-component",
+        matcher.matches(DisplayData.from(deeplyNested)));
+    assertFalse("should not match identical display data from different component",
+        matcher.matches(DisplayData.from(sameDisplayItemDifferentComponent)));
     assertThat(DisplayData.from(hasSubcomponent), matcher);
-    assertThat(DisplayData.from(subComponent), matcher);
   }
 
-
   private DisplayData createDisplayDataWithItem(final String key, final String value) {
     return DisplayData.from(new SampleTransform(key, value));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index a709bd8..770b836 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -21,13 +21,15 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLabel;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasPath;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
@@ -51,11 +53,10 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 import java.util.regex.Pattern;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.DisplayData.Item;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.CustomTypeSafeMatcher;
 import org.hamcrest.FeatureMatcher;
@@ -115,8 +116,8 @@ public class DisplayDataTest implements Serializable {
           @Override
           public void populateDisplayData(DisplayData.Builder builder) {
             builder
-                .include(subComponent1)
-                .include(subComponent2)
+                .include("p1", subComponent1)
+                .include("p2", subComponent2)
                 .add(DisplayData.item("minSproggles", 200)
                   .withLabel("Minimum Required Sproggles"))
                 .add(DisplayData.item("fireLasers", true))
@@ -174,7 +175,7 @@ public class DisplayDataTest implements Serializable {
               }
             });
 
-    Map<DisplayData.Identifier, DisplayData.Item<?>> map = data.asMap();
+    Map<DisplayData.Identifier, DisplayData.Item> map = data.asMap();
     assertEquals(map.size(), 1);
     assertThat(data, hasDisplayItem("foo", "bar"));
     assertEquals(map.values(), data.items());
@@ -194,10 +195,10 @@ public class DisplayDataTest implements Serializable {
     });
 
     @SuppressWarnings("unchecked")
-    DisplayData.Item<?> item = (DisplayData.Item<?>) data.items().toArray()[0];
+    DisplayData.Item item = (DisplayData.Item) data.items().toArray()[0];
 
     @SuppressWarnings("unchecked")
-    Matcher<Item<?>> matchesAllOf = Matchers.allOf(
+    Matcher<Item> matchesAllOf = Matchers.allOf(
         hasNamespace(DisplayDataTest.class),
         hasKey("now"),
         hasType(DisplayData.Type.TIMESTAMP),
@@ -327,6 +328,82 @@ public class DisplayDataTest implements Serializable {
     DisplayData.from(component); // should not throw
   }
 
+  @Test
+  public void testRootPath() {
+    DisplayData.Path root = DisplayData.Path.root();
+    assertThat(root.getComponents(), Matchers.empty());
+  }
+
+  @Test
+  public void testExtendPath() {
+    DisplayData.Path a = DisplayData.Path.root().extend("a");
+    assertThat(a.getComponents(), hasItems("a"));
+
+    DisplayData.Path b = a.extend("b");
+    assertThat(b.getComponents(), hasItems("a", "b"));
+  }
+
+  @Test
+  public void testExtendNullPathValidation() {
+    DisplayData.Path root = DisplayData.Path.root();
+    thrown.expect(NullPointerException.class);
+    root.extend(null);
+  }
+
+  @Test
+  public void testExtendEmptyPathValidation() {
+    DisplayData.Path root = DisplayData.Path.root();
+    thrown.expect(IllegalArgumentException.class);
+    root.extend("");
+  }
+
+  @Test
+  public void testAbsolute() {
+    DisplayData.Path path = DisplayData.Path.absolute("a", "b", "c");
+    assertThat(path.getComponents(), hasItems("a", "b", "c"));
+  }
+
+  @Test
+  public void testAbsoluteValidationNullFirstPath() {
+    thrown.expect(NullPointerException.class);
+    DisplayData.Path.absolute(null, "foo", "bar");
+  }
+
+  @Test
+  public void testAbsoluteValidationEmptyFirstPath() {
+    thrown.expect(IllegalArgumentException.class);
+    DisplayData.Path.absolute("", "foo", "bar");
+  }
+
+  @Test
+  public void testAbsoluteValidationNullSubsequentPath() {
+    thrown.expect(NullPointerException.class);
+    DisplayData.Path.absolute("a", "b", null, "c");
+  }
+
+  @Test
+  public void testAbsoluteValidationEmptySubsequentPath() {
+    thrown.expect(IllegalArgumentException.class);
+    DisplayData.Path.absolute("a", "b", "", "c");
+  }
+
+  @Test
+  public void testPathToString() {
+    assertEquals("root string", "[]", DisplayData.Path.root().toString());
+    assertEquals("single component", "[a]", DisplayData.Path.absolute("a").toString());
+    assertEquals("hierarchy", "[a/b/c]", DisplayData.Path.absolute("a", "b", "c").toString());
+  }
+
+  @Test
+  public void testPathEquality() {
+    new EqualsTester()
+        .addEqualityGroup(DisplayData.Path.root(), DisplayData.Path.root())
+        .addEqualityGroup(DisplayData.Path.root().extend("a"), DisplayData.Path.absolute("a"))
+        .addEqualityGroup(
+            DisplayData.Path.root().extend("a").extend("b"),
+            DisplayData.Path.absolute("a", "b"))
+        .testEquals();
+  }
 
   @Test
   public void testIncludes() {
@@ -343,69 +420,56 @@ public class DisplayDataTest implements Serializable {
             new HasDisplayData() {
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                builder.include(subComponent);
+                builder.include("p", subComponent);
               }
             });
 
-    assertThat(data, includesDisplayDataFrom(subComponent));
+    assertThat(data, includesDisplayDataFor("p", subComponent));
   }
 
   @Test
-  public void testIncludesNamespaceOverride() {
-    final HasDisplayData subComponent = new HasDisplayData() {
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          builder.add(DisplayData.item("foo", "bar"));
-        }
-    };
-
-    final HasDisplayData namespaceOverride = new HasDisplayData(){
+  public void testIncludeSameComponentAtDifferentPaths() {
+    final HasDisplayData subComponent1 = new HasDisplayData() {
       @Override
       public void populateDisplayData(Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
       }
     };
-
-    DisplayData data = DisplayData.from(new HasDisplayData() {
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.include(subComponent, namespaceOverride.getClass());
-      }
-    });
-
-    assertThat(data, includesDisplayDataFrom(subComponent, namespaceOverride.getClass()));
-  }
-
-  @Test
-  public void testNamespaceOverrideMultipleLevels() {
-    final HasDisplayData componentA = new HasDisplayData() {
+    final HasDisplayData subComponent2 = new HasDisplayData() {
       @Override
       public void populateDisplayData(Builder builder) {
-        builder.add(DisplayData.item("foo", "bar"));
+        builder.add(DisplayData.item("foo2", "bar2"));
       }
     };
 
-    final HasDisplayData componentB = new HasDisplayData() {
+    HasDisplayData component = new HasDisplayData() {
       @Override
       public void populateDisplayData(Builder builder) {
         builder
-            .add(DisplayData.item("foo", "bar"))
-            .include(componentA);
+            .include("p1", subComponent1)
+            .include("p2", subComponent2);
+
       }
     };
 
-    final HasDisplayData componentC = new HasDisplayData() {
+    DisplayData data = DisplayData.from(component);
+    assertThat(data, includesDisplayDataFor("p1", subComponent1));
+    assertThat(data, includesDisplayDataFor("p2", subComponent2));
+  }
+
+  @Test
+  public void testIncludesComponentsAtSamePath() {
+    HasDisplayData component = new HasDisplayData() {
       @Override
       public void populateDisplayData(Builder builder) {
         builder
-            .add(DisplayData.item("foo", "bar"))
-            .include(componentB, "overrideB");
+            .include("p", new NoopDisplayData())
+            .include("p", new NoopDisplayData());
       }
     };
 
-    DisplayData data = DisplayData.from(componentC);
-    assertThat(data, hasDisplayItem(hasNamespace(componentC.getClass())));
-    assertThat(data, hasDisplayItem(hasNamespace("overrideB")));
-    assertThat(data, hasDisplayItem(hasNamespace(componentA.getClass())));
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    DisplayData.from(component);
   }
 
   @Test
@@ -416,7 +480,7 @@ public class DisplayDataTest implements Serializable {
       @Override
       public void populateDisplayData(Builder builder) {
         builder.add(DisplayData.item("foo", "bar")
-            .withNamespace((Class<?>) null));
+            .withNamespace(null));
       }
     });
   }
@@ -425,10 +489,14 @@ public class DisplayDataTest implements Serializable {
   public void testIdentifierEquality() {
     new EqualsTester()
         .addEqualityGroup(
-            DisplayData.Identifier.of(DisplayDataTest.class, "1"),
-            DisplayData.Identifier.of(DisplayDataTest.class, "1"))
-        .addEqualityGroup(DisplayData.Identifier.of(Object.class, "1"))
-        .addEqualityGroup(DisplayData.Identifier.of(DisplayDataTest.class, "2"))
+            DisplayData.Identifier.of(DisplayData.Path.absolute("a"), DisplayDataTest.class, "1"),
+            DisplayData.Identifier.of(DisplayData.Path.absolute("a"), DisplayDataTest.class, "1"))
+        .addEqualityGroup(
+            DisplayData.Identifier.of(DisplayData.Path.absolute("b"), DisplayDataTest.class, "1"))
+        .addEqualityGroup(
+            DisplayData.Identifier.of(DisplayData.Path.absolute("a"), Object.class, "1"))
+        .addEqualityGroup(
+            DisplayData.Identifier.of(DisplayData.Path.absolute("a"), DisplayDataTest.class, "2"))
         .testEquals();
   }
 
@@ -466,30 +534,6 @@ public class DisplayDataTest implements Serializable {
   }
 
   @Test
-  public void testAnonymousClassNamespace() {
-    DisplayData data =
-        DisplayData.from(
-            new HasDisplayData() {
-              @Override
-              public void populateDisplayData(DisplayData.Builder builder) {
-                builder.add(DisplayData.item("foo", "bar"));
-              }
-            });
-
-    DisplayData.Item<?> item = (DisplayData.Item<?>) data.items().toArray()[0];
-    final Pattern anonClassRegex = Pattern.compile(
-        Pattern.quote(DisplayDataTest.class.getName()) + "\\$\\d+$");
-    assertThat(item.getNamespace(), new CustomTypeSafeMatcher<String>(
-        "anonymous class regex: " + anonClassRegex) {
-      @Override
-      protected boolean matchesSafely(String item) {
-        java.util.regex.Matcher m = anonClassRegex.matcher(item);
-        return m.matches();
-      }
-    });
-  }
-
-  @Test
   public void testAcceptsKeysWithDifferentNamespaces() {
     DisplayData data =
         DisplayData.from(
@@ -498,7 +542,7 @@ public class DisplayDataTest implements Serializable {
               public void populateDisplayData(DisplayData.Builder builder) {
                 builder
                     .add(DisplayData.item("foo", "bar"))
-                    .include(
+                    .include("p",
                         new HasDisplayData() {
                           @Override
                           public void populateDisplayData(DisplayData.Builder builder) {
@@ -551,7 +595,7 @@ public class DisplayDataTest implements Serializable {
     };
 
     DisplayData data = DisplayData.from(component);
-    assertEquals(String.format("%s:foo=bar", component.getClass().getName()), data.toString());
+    assertEquals(String.format("[]%s:foo=bar", component.getClass().getName()), data.toString());
   }
 
   @Test
@@ -576,7 +620,7 @@ public class DisplayDataTest implements Serializable {
         new HasDisplayData() {
           @Override
           public void populateDisplayData(Builder builder) {
-            builder.include(componentA);
+            builder.include("p", componentA);
           }
         };
 
@@ -588,13 +632,38 @@ public class DisplayDataTest implements Serializable {
   }
 
   @Test
+  public void testHandlesIncludeCyclesDifferentInstances() {
+    HasDisplayData component =
+        new DelegatingDisplayData(
+          new DelegatingDisplayData(
+              new NoopDisplayData()));
+
+    DisplayData data = DisplayData.from(component);
+    assertThat(data.items(), hasSize(2));
+  }
+
+  private class DelegatingDisplayData implements HasDisplayData {
+    private final HasDisplayData subComponent;
+    public DelegatingDisplayData(HasDisplayData subComponent) {
+      this.subComponent = subComponent;
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      builder
+          .add(DisplayData.item("subComponent", subComponent.getClass()))
+          .include("p", subComponent);
+    }
+  }
+
+  @Test
   public void testIncludesSubcomponentsWithObjectEquality() {
     DisplayData data = DisplayData.from(new HasDisplayData() {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         builder
-          .include(new EqualsEverything("foo1", "bar1"))
-          .include(new EqualsEverything("foo2", "bar2"));
+          .include("p1", new EqualsEverything("foo1", "bar1"))
+          .include("p2", new EqualsEverything("foo2", "bar2"));
       }
     });
 
@@ -626,6 +695,44 @@ public class DisplayDataTest implements Serializable {
     }
   }
 
+  @Test
+  public void testDelegate() {
+    final HasDisplayData subcomponent = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.add(DisplayData.item("subCompKey", "foo"));
+      }
+    };
+
+    final HasDisplayData wrapped = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder
+            .add(DisplayData.item("wrappedKey", "bar"))
+            .include("p", subcomponent);
+      }
+    };
+
+    HasDisplayData wrapper = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.delegate(wrapped);
+      }
+    };
+
+    DisplayData data = DisplayData.from(wrapper);
+    assertThat(data, hasDisplayItem(allOf(
+        hasKey("wrappedKey"),
+        hasNamespace(wrapped.getClass()),
+        hasPath(/* root */)
+    )));
+    assertThat(data, hasDisplayItem(allOf(
+        hasKey("subCompKey"),
+        hasNamespace(subcomponent.getClass()),
+        hasPath("p")
+    )));
+  }
+
   abstract static class IncludeSubComponent implements HasDisplayData {
     HasDisplayData subComponent;
 
@@ -633,7 +740,7 @@ public class DisplayDataTest implements Serializable {
     public void populateDisplayData(DisplayData.Builder builder) {
       builder
           .add(DisplayData.item("id", getId()))
-          .include(subComponent);
+          .include(getId(), subComponent);
     }
 
     abstract String getId();
@@ -657,7 +764,7 @@ public class DisplayDataTest implements Serializable {
               }
             });
 
-    Collection<Item<?>> items = data.items();
+    Collection<Item> items = data.items();
     assertThat(
         items, hasItem(allOf(hasKey("string"), hasType(DisplayData.Type.STRING))));
     assertThat(
@@ -813,7 +920,7 @@ public class DisplayDataTest implements Serializable {
           @Override
           public void populateDisplayData(DisplayData.Builder builder) {
             builder
-              .include(subComponent)
+              .include("p", subComponent)
               .add(DisplayData.item("alpha", "bravo"));
           }
         };
@@ -840,26 +947,31 @@ public class DisplayDataTest implements Serializable {
         new HasDisplayData() {
           @Override
           public void populateDisplayData(Builder builder) {
-            builder.include(null);
+            builder.include("p", null);
           }
         });
   }
 
   @Test
-  public void testIncludeNullNamespace() {
-    final HasDisplayData subComponent = new HasDisplayData() {
+  public void testIncludeNullPath() {
+    thrown.expectCause(isA(NullPointerException.class));
+    DisplayData.from(new HasDisplayData() {
       @Override
       public void populateDisplayData(Builder builder) {
+        builder.include(null, new NoopDisplayData());
       }
-    };
+    });
+  }
 
-    thrown.expectCause(isA(NullPointerException.class));
+  @Test
+  public void testIncludeEmptyPath() {
+    thrown.expectCause(isA(IllegalArgumentException.class));
     DisplayData.from(new HasDisplayData() {
-        @Override
-        public void populateDisplayData(Builder builder) {
-          builder.include(subComponent, (Class<?>) null);
-        }
-      });
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.include("", new NoopDisplayData());
+      }
+    });
   }
 
   @Test
@@ -964,6 +1076,44 @@ public class DisplayDataTest implements Serializable {
         quoted("DisplayDataTest"), "baz", "http://abc"));
   }
 
+
+  @Test
+  public void testJsonSerializationAnonymousClassNamespace() throws IOException {
+    HasDisplayData component = new HasDisplayData() {
+      @Override
+      public void populateDisplayData(Builder builder) {
+        builder.add(DisplayData.item("foo", "bar"));
+      }
+    };
+    DisplayData data = DisplayData.from(component);
+
+    JsonNode json = MAPPER.readTree(MAPPER.writeValueAsBytes(data));
+    String namespace = json.elements().next().get("namespace").asText();
+    final Pattern anonClassRegex = Pattern.compile(
+        Pattern.quote(DisplayDataTest.class.getName()) + "\\$\\d+$");
+    assertThat(namespace, new CustomTypeSafeMatcher<String>(
+        "anonymous class regex: " + anonClassRegex) {
+      @Override
+      protected boolean matchesSafely(String item) {
+        java.util.regex.Matcher m = anonClassRegex.matcher(item);
+        return m.matches();
+      }
+    });
+  }
+
+  @Test
+  public void testCanSerializeItemSpecReference() {
+    DisplayData.ItemSpec<?> spec = DisplayData.item("clazz", DisplayDataTest.class);
+    SerializableUtils.ensureSerializable(new HoldsItemSpecReference(spec));
+  }
+
+  private static class HoldsItemSpecReference implements Serializable {
+    private final DisplayData.ItemSpec<?> spec;
+    public HoldsItemSpecReference(DisplayData.ItemSpec<?> spec) {
+      this.spec = spec;
+    }
+  }
+
   /**
    * Verify that {@link DisplayData.Builder} can recover from exceptions thrown in user code.
    * This is not used within the Beam SDK since we want all code to produce valid DisplayData.
@@ -993,14 +1143,14 @@ public class DisplayDataTest implements Serializable {
             .add(DisplayData.item("c", "c"));
 
         try {
-          builder.include(failingComponent);
+          builder.include("p", failingComponent);
           fail("Expected exception not thrown");
         } catch (RuntimeException e) {
           // Expected
         }
 
         builder
-            .include(safeComponent)
+            .include("p", safeComponent)
             .add(DisplayData.item("d", "d"));
       }
     });
@@ -1033,7 +1183,7 @@ public class DisplayDataTest implements Serializable {
     HasDisplayData component = new HasDisplayData() {
       @Override
       public void populateDisplayData(Builder builder) {
-        builder.include(new HasDisplayData() {
+        builder.include("p", new HasDisplayData() {
           @Override
           public void populateDisplayData(Builder builder) {
             throw cause;
@@ -1046,18 +1196,6 @@ public class DisplayDataTest implements Serializable {
     DisplayData.from(component);
   }
 
-  private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      return input.apply(ParDo.of(new DoFn<T, T>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-          c.output(c.element());
-        }
-      }));
-    }
-  }
-
   private String quoted(Object obj) {
     return String.format("\"%s\"", obj);
   }
@@ -1101,21 +1239,26 @@ public class DisplayDataTest implements Serializable {
     return hasItem(jsonNode);
   }
 
-  private static Matcher<DisplayData.Item<?>> hasUrl(Matcher<String> urlMatcher) {
-    return new FeatureMatcher<DisplayData.Item<?>, String>(
+  private static class NoopDisplayData implements HasDisplayData {
+    @Override
+    public void populateDisplayData(Builder builder) {}
+  }
+
+  private static Matcher<DisplayData.Item> hasUrl(Matcher<String> urlMatcher) {
+    return new FeatureMatcher<DisplayData.Item, String>(
         urlMatcher, "display item with url", "URL") {
       @Override
-      protected String featureValueOf(DisplayData.Item<?> actual) {
+      protected String featureValueOf(DisplayData.Item actual) {
         return actual.getLinkUrl();
       }
     };
   }
 
-  private static  <T> Matcher<DisplayData.Item<?>> hasShortValue(Matcher<T> valueStringMatcher) {
-    return new FeatureMatcher<DisplayData.Item<?>, T>(
+  private static  <T> Matcher<DisplayData.Item> hasShortValue(Matcher<T> valueStringMatcher) {
+    return new FeatureMatcher<DisplayData.Item, T>(
         valueStringMatcher, "display item with short value", "short value") {
       @Override
-      protected T featureValueOf(DisplayData.Item<?> actual) {
+      protected T featureValueOf(DisplayData.Item actual) {
         @SuppressWarnings("unchecked")
         T shortValue = (T) actual.getShortValue();
         return shortValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 9744fc6..30228fe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.transforms.windowing;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.not;
@@ -256,7 +256,7 @@ public class WindowTest implements Serializable {
     DisplayData displayData = DisplayData.from(window);
 
     assertThat(displayData, hasDisplayItem("windowFn", windowFn.getClass()));
-    assertThat(displayData, includesDisplayDataFrom(windowFn));
+    assertThat(displayData, includesDisplayDataFor("windowFn", windowFn));
 
     assertThat(displayData, hasDisplayItem("trigger", triggerBuilder.toString()));
     assertThat(displayData,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 5914ba2..5626067 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2059,11 +2059,6 @@ public class BigQueryIO {
           c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
         }
       }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index c1b882a..90b9584 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -582,7 +582,7 @@ public class BigtableIO {
 
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        Write.this.populateDisplayData(builder);
+        builder.delegate(Write.this);
       }
 
       ///////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index bfbff32..3727f92 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -802,7 +802,7 @@ public class DatastoreV1 {
       builder
           .addIfNotNull(DisplayData.item("projectId", projectId)
               .withLabel("Output Project"))
-          .include(mutationFn);
+          .include("mutationFn", mutationFn);
     }
 
     public String getProjectId() {


[2/3] incubator-beam git commit: Add Display Data 'path' metadata

Posted by bc...@apache.org.
Add Display Data 'path' metadata

Display Data supports the notion of "sub components", components within
a transform class which can contribute their own display data. We add a
namespace to display data items based on the originating component,
which keeps the display data items unique within the step.

There are instances where a component is included multiple times within
a step. We handle the case of the same instance being shared by simply
ignoring it the second time. However, we don't handle the case of a
separate instance being added of the same class. Currently the separate
instances will add display data with the same namespace and key, causing
a failure.

This can come up for example when infrastructure at different levels
wrap and re-wrap a component. We saw this with a bounded source being
adapted multiple times, Bounded -> Unbounded -> Bounded -> Unbounded.
The BoundedToUnboundedSourceAdapter was included multiple times with
separate instances and caused a failure while populating display data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad03d07a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad03d07a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad03d07a

Branch: refs/heads/master
Commit: ad03d07ae783f054a31e8b2e14100afff8cdf747
Parents: ff6301b
Author: Scott Wegner <sw...@google.com>
Authored: Wed Oct 12 14:49:41 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Oct 20 11:10:13 2016 -0700

----------------------------------------------------------------------
 .../core/UnboundedReadFromBoundedSource.java    |   2 +-
 runners/direct-java/pom.xml                     |   5 -
 .../runners/direct/ForwardingPTransform.java    |   2 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  32 --
 .../direct/ForwardingPTransformTest.java        |   7 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   5 +-
 .../DataflowUnboundedReadFromBoundedSource.java |   4 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   5 +-
 .../apache/beam/sdk/io/CompressedSource.java    |   2 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |   5 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |   4 +-
 .../main/java/org/apache/beam/sdk/io/Write.java |   6 +-
 .../sdk/options/ProxyInvocationHandler.java     | 149 +++---
 .../org/apache/beam/sdk/transforms/Combine.java |  60 +--
 .../apache/beam/sdk/transforms/CombineFns.java  |  33 +-
 .../beam/sdk/transforms/CombineWithContext.java |   3 +-
 .../beam/sdk/transforms/DoFnAdapters.java       |   2 +-
 .../beam/sdk/transforms/FlatMapElements.java    |   6 +-
 .../apache/beam/sdk/transforms/MapElements.java |   8 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  60 ++-
 .../apache/beam/sdk/transforms/Partition.java   |   2 +-
 .../sdk/transforms/display/DisplayData.java     | 518 +++++++++++++------
 .../beam/sdk/transforms/windowing/Window.java   |   2 +-
 .../io/BoundedReadFromUnboundedSourceTest.java  |   4 +-
 .../beam/sdk/io/CompressedSourceTest.java       |   4 +-
 .../java/org/apache/beam/sdk/io/ReadTest.java   |  10 +-
 .../java/org/apache/beam/sdk/io/WriteTest.java  |   6 +-
 .../sdk/options/ProxyInvocationHandlerTest.java |  40 ++
 .../beam/sdk/transforms/CombineFnsTest.java     |   7 +-
 .../apache/beam/sdk/transforms/CombineTest.java |   4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |   8 +-
 .../transforms/display/DisplayDataMatchers.java | 141 +++--
 .../display/DisplayDataMatchersTest.java        |  67 ++-
 .../sdk/transforms/display/DisplayDataTest.java | 367 +++++++++----
 .../sdk/transforms/windowing/WindowTest.java    |   4 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |   5 -
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |   2 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |   2 +-
 38 files changed, 988 insertions(+), 605 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 91a1715..2afdcf2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -108,7 +108,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     // We explicitly do not register base-class data, instead we use the delegate inner source.
     builder
         .add(DisplayData.item("source", source.getClass()))
-        .include(source);
+        .include("source", source);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 354c8c7..6cb1838 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -286,11 +286,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
index 3160b58..77311c2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java
@@ -57,6 +57,6 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
 
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
-    delegate().populateDisplayData(builder);
+    builder.delegate(delegate());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 37af90c..4027d25 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
-import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.io.InputStream;
@@ -242,37 +241,6 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
-  @Test
-  public void pipelineOptionsDisplayDataExceptionShouldFail() {
-    Object brokenValueType = new Object() {
-      @JsonValue
-      public int getValue () {
-        return 42;
-      }
-
-      @Override
-      public String toString() {
-        throw new RuntimeException("oh noes!!");
-      }
-    };
-
-    Pipeline p = getPipeline();
-    p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
-
-    p.apply(Create.of(1, 2, 3));
-
-    thrown.expectMessage(PipelineOptions.class.getName());
-    thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!")));
-    p.run();
-  }
-
-  /** {@link PipelineOptions} to inject bad object implementations. */
-  public interface ObjectPipelineOptions extends PipelineOptions {
-    Object getValue();
-    void setValue(Object value);
-  }
-
-
   /**
    * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
    * {@link DirectRunner}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
index 6abaf92..c75adaa 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -102,10 +103,10 @@ public class ForwardingPTransformTest {
 
   @Test
   public void populateDisplayDataDelegates() {
-    DisplayData.Builder builder = mock(DisplayData.Builder.class);
-    doThrow(RuntimeException.class).when(delegate).populateDisplayData(builder);
+    doThrow(RuntimeException.class)
+        .when(delegate).populateDisplayData(any(DisplayData.Builder.class));
 
     thrown.expect(RuntimeException.class);
-    forwarding.populateDisplayData(builder);
+    DisplayData.from(forwarding);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5f83788..7bf270d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2249,8 +2249,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
-        builder.add(DisplayData.item("source", source.getClass()));
-        builder.include(source);
+        builder
+            .add(DisplayData.item("source", source.getClass()))
+            .include("source", source);
       }
 
       public UnboundedSource<T, ?> getSource() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index e4257d1..96a35bc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -120,7 +120,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
     // We explicitly do not register base-class data, instead we use the delegate inner source.
     builder
         .add(DisplayData.item("source", source.getClass()))
-        .include(source);
+        .include("source", source);
   }
 
   /**
@@ -195,7 +195,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder.add(DisplayData.item("source", boundedSource.getClass()));
-      builder.include(boundedSource);
+      builder.include("source", boundedSource);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index 630a8a3..40c52a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -119,7 +119,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
           .withLabel("Maximum Read Records"), Long.MAX_VALUE)
         .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
           .withLabel("Maximum Read Time"))
-        .include(source);
+        .include("source", source);
   }
 
   private static class UnboundedToBoundedSourceAdapter<T>
@@ -204,8 +204,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(DisplayData.item("source", source.getClass()));
-      builder.include(source);
+      builder.delegate(source);
     }
 
     private class Reader extends BoundedReader<ValueWithRecordId<T>> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 680dc2c..bf871b7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -390,7 +390,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
   public void populateDisplayData(DisplayData.Builder builder) {
     // We explicitly do not register base-class data, instead we use the delegate inner source.
     builder
-        .include(sourceDelegate)
+        .include("source", sourceDelegate)
         .add(DisplayData.item("source", sourceDelegate.getClass())
           .withLabel("Read Source"));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 6091156..72a6399 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -792,8 +792,7 @@ public class PubsubIO {
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
-          super.populateDisplayData(builder);
-          Bound.this.populateDisplayData(builder);
+          builder.delegate(Bound.this);
         }
       }
     }
@@ -1043,7 +1042,7 @@ public class PubsubIO {
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
           super.populateDisplayData(builder);
-          Bound.this.populateDisplayData(builder);
+          builder.delegate(Bound.this);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 29c4e47..f04fbaf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -127,7 +127,7 @@ public class Read {
       builder
           .add(DisplayData.item("source", source.getClass())
             .withLabel("Read Source"))
-          .include(source);
+          .include("source", source);
     }
   }
 
@@ -194,7 +194,7 @@ public class Read {
       builder
           .add(DisplayData.item("source", source.getClass())
             .withLabel("Read Source"))
-          .include(source);
+          .include("source", source);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index e8b19d9..7559fca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -118,7 +118,7 @@ public class Write {
       super.populateDisplayData(builder);
       builder
           .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
-          .include(sink)
+          .include("sink", sink)
           .addIfNotDefault(
               DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"),
               0);
@@ -209,7 +209,7 @@ public class Write {
 
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        Write.Bound.this.populateDisplayData(builder);
+        builder.delegate(Write.Bound.this);
       }
     }
 
@@ -261,7 +261,7 @@ public class Write {
 
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        Write.Bound.this.populateDisplayData(builder);
+        builder.delegate(Write.Bound.this);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index a77dcc6..3e74916 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -86,7 +86,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
  * {@link PipelineOptions#as(Class)}.
  */
 @ThreadSafe
-class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
+class ProxyInvocationHandler implements InvocationHandler {
   private static final ObjectMapper MAPPER = new ObjectMapper();
   /**
    * No two instances of this class are considered equivalent hence we generate a random hash code.
@@ -138,8 +138,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
         && args[0] instanceof DisplayData.Builder) {
       @SuppressWarnings("unchecked")
       DisplayData.Builder builder = (DisplayData.Builder) args[0];
-      // Explicitly set display data namespace so thrown exceptions will have sensible type.
-      builder.include(this, PipelineOptions.class);
+      builder.delegate(new PipelineOptionsDisplayData());
       return Void.TYPE;
     }
     String methodName = method.getName();
@@ -243,88 +242,116 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
   }
 
   /**
-   * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set
-   * pipeline options will be added as display data.
+   * Nested class to handle display data in order to set the display data namespace to something
+   * sensible.
    */
-  public void populateDisplayData(DisplayData.Builder builder) {
-    Set<PipelineOptionSpec> optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
-    Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
-
-    for (Map.Entry<String, BoundValue> option : options.entrySet()) {
-      BoundValue boundValue = option.getValue();
-      if (boundValue.isDefault()) {
-        continue;
-      }
+  class PipelineOptionsDisplayData implements HasDisplayData {
+    /**
+     * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set
+     * pipeline options will be added as display data.
+     */
+    public void populateDisplayData(DisplayData.Builder builder) {
+      Set<PipelineOptionSpec> optionSpecs =
+          PipelineOptionsReflector.getOptionSpecs(knownInterfaces);
 
-      Object value = boundValue.getValue() == null ? "" : boundValue.getValue();
-      DisplayData.Type type = DisplayData.inferType(value);
-      HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey()));
+      Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs);
 
-      for (PipelineOptionSpec optionSpec : specs) {
-        if (!optionSpec.shouldSerialize()) {
-          // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also
-          // excluded from display data. These options are generally not useful for display.
+      for (Map.Entry<String, BoundValue> option : options.entrySet()) {
+        BoundValue boundValue = option.getValue();
+        if (boundValue.isDefault()) {
           continue;
         }
 
-        Class<?> pipelineInterface = optionSpec.getDefiningInterface();
-        if (type != null) {
-          builder.add(DisplayData.item(option.getKey(), type, value)
-              .withNamespace(pipelineInterface));
-        } else {
-          builder.add(DisplayData.item(option.getKey(), displayDataString(value))
-              .withNamespace(pipelineInterface));
+        DisplayDataValue resolved = DisplayDataValue.resolve(boundValue.getValue());
+        HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey()));
+
+        for (PipelineOptionSpec optionSpec : specs) {
+          if (!optionSpec.shouldSerialize()) {
+            // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also
+            // excluded from display data. These options are generally not useful for display.
+            continue;
+          }
+
+          builder.add(DisplayData.item(option.getKey(), resolved.getType(), resolved.getValue())
+              .withNamespace(optionSpec.getDefiningInterface()));
         }
       }
-    }
 
-    for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
-      if (options.containsKey(jsonOption.getKey())) {
-        // Option overwritten since deserialization; don't re-write
-        continue;
-      }
+      for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) {
+        if (options.containsKey(jsonOption.getKey())) {
+          // Option overwritten since deserialization; don't re-write
+          continue;
+        }
+
+        HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey()));
+        if (specs.isEmpty()) {
+          // No PipelineOptions interface for this key not currently loaded
+          builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString())
+              .withNamespace(UnknownPipelineOptions.class));
+          continue;
+        }
 
-      HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey()));
-      if (specs.isEmpty()) {
-        builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString())
-          .withNamespace(UnknownPipelineOptions.class));
-      } else {
         for (PipelineOptionSpec spec : specs) {
           if (!spec.shouldSerialize()) {
             continue;
           }
 
           Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod());
-          value = value == null ? "" : value;
-          DisplayData.Type type = DisplayData.inferType(value);
-          if (type != null) {
-            builder.add(DisplayData.item(jsonOption.getKey(), type, value)
-                .withNamespace(spec.getDefiningInterface()));
-          } else {
-            builder.add(DisplayData.item(jsonOption.getKey(), displayDataString(value))
-                .withNamespace(spec.getDefiningInterface()));
-          }
+          DisplayDataValue resolved = DisplayDataValue.resolve(value);
+          builder.add(DisplayData.item(jsonOption.getKey(), resolved.getType(), resolved.getValue())
+              .withNamespace(spec.getDefiningInterface()));
         }
       }
     }
   }
 
   /**
-   * {@link Object#toString()} wrapper to extract display data values for various types.
+   * Helper class to resolve a {@link DisplayData} type and value from {@link PipelineOptions}.
    */
-  private String displayDataString(Object value) {
-    checkNotNull(value, "value cannot be null");
-    if (!value.getClass().isArray()) {
-      return value.toString();
-    }
-    if (!value.getClass().getComponentType().isPrimitive()) {
-      return Arrays.deepToString((Object[]) value);
+  @AutoValue
+  abstract static class DisplayDataValue {
+    /**
+     * The resolved display data value. May differ from the input to {@link #resolve(Object)}
+     */
+    abstract Object getValue();
+
+    /** The resolved display data type. */
+    abstract DisplayData.Type getType();
+
+    /**
+     * Infer the value and {@link DisplayData.Type type} for the given
+     * {@link PipelineOptions} value.
+     */
+    static DisplayDataValue resolve(@Nullable Object value) {
+      DisplayData.Type type = DisplayData.inferType(value);
+
+      if (type == null) {
+        value = displayDataString(value);
+        type = DisplayData.Type.STRING;
+      }
+
+      return new AutoValue_ProxyInvocationHandler_DisplayDataValue(value, type);
     }
 
-    // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an
-    // Object array, but will unwrap nested primitive arrays.
-    String wrapped = Arrays.deepToString(new Object[] {value});
-    return wrapped.substring(1, wrapped.length() - 1);
+    /**
+     * Safe {@link Object#toString()} wrapper to extract display data values for various types.
+     */
+    private static String displayDataString(@Nullable Object value) {
+      if (value == null) {
+        return "";
+      }
+      if (!value.getClass().isArray()) {
+        return value.toString();
+      }
+      if (!value.getClass().getComponentType().isPrimitive()) {
+        return Arrays.deepToString((Object[]) value);
+      }
+
+      // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an
+      // Object array, but will unwrap nested primitive arrays.
+      String wrapped = Arrays.deepToString(new Object[]{value});
+      return wrapped.substring(1, wrapped.length() - 1);
+    }
   }
 
   /**
@@ -587,7 +614,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData {
 
         List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
         DisplayData displayData = DisplayData.from(value);
-        for (DisplayData.Item<?> item : displayData.items()) {
+        for (DisplayData.Item item : displayData.items()) {
           @SuppressWarnings("unchecked")
           Map<String, Object> serializedItem = MAPPER.convertValue(item, Map.class);
           serializedDisplayData.add(serializedItem);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index df9a306..7719c73 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -124,14 +124,14 @@ public class Combine {
     return globally(fn, displayDataForFn(fn));
   }
 
-  private static <T> DisplayData.Item<? extends Class<?>> displayDataForFn(T fn) {
+  private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) {
     return DisplayData.item("combineFn", fn.getClass())
         .withLabel("Combiner");
   }
 
   private static <InputT, OutputT> Globally<InputT, OutputT> globally(
       GlobalCombineFn<? super InputT, ?, OutputT> fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayData) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new Globally<>(fn, fnDisplayData, true, 0);
   }
 
@@ -200,7 +200,7 @@ public class Combine {
 
   private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(
           PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayData) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/);
   }
 
@@ -210,7 +210,7 @@ public class Combine {
    */
   private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys(
       PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayData) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new PerKey<>(fn, fnDisplayData, true /*fewKeys*/);
   }
 
@@ -294,7 +294,7 @@ public class Combine {
 
   private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(
       PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayData) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     return new GroupedValues<>(fn, fnDisplayData);
   }
 
@@ -521,7 +521,7 @@ public class Combine {
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
-          builder.include(CombineFn.this);
+          builder.delegate(CombineFn.this);
         }
       };
     }
@@ -1258,7 +1258,7 @@ public class Combine {
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
-          builder.include(KeyedCombineFn.this);
+          builder.delegate(KeyedCombineFn.this);
         }
       };
     }
@@ -1325,13 +1325,13 @@ public class Combine {
       extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
     private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final boolean insertDefault;
     private final int fanout;
     private final List<PCollectionView<?>> sideInputs;
 
     private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.insertDefault = insertDefault;
@@ -1340,7 +1340,7 @@ public class Combine {
     }
 
     private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
       super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
@@ -1350,7 +1350,7 @@ public class Combine {
     }
 
     private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout,
         List<PCollectionView<?>> sideInputs) {
       super(name);
       this.fn = fn;
@@ -1498,9 +1498,9 @@ public class Combine {
 
   private static void populateDisplayData(
       DisplayData.Builder builder, HasDisplayData fn,
-      DisplayData.Item<? extends Class<?>> fnDisplayItem) {
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayItem) {
     builder
-        .include(fn)
+        .include("combineFn", fn)
         .add(fnDisplayItem);
   }
 
@@ -1556,13 +1556,13 @@ public class Combine {
       extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
 
     private final GlobalCombineFn<? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final boolean insertDefault;
     private final int fanout;
 
     private GloballyAsSingletonView(
         GlobalCombineFn<? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.insertDefault = insertDefault;
@@ -1762,13 +1762,13 @@ public class Combine {
     extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
     private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final boolean fewKeys;
     private final List<PCollectionView<?>> sideInputs;
 
     private PerKey(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean fewKeys) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.fewKeys = fewKeys;
@@ -1777,7 +1777,7 @@ public class Combine {
 
     private PerKey(String name,
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         boolean fewKeys, List<PCollectionView<?>> sideInputs) {
       super(name);
       this.fn = fn;
@@ -1788,7 +1788,7 @@ public class Combine {
 
     private PerKey(
         String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData, boolean fewKeys) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
       super(name);
       this.fn = fn;
       this.fnDisplayData = fnDisplayData;
@@ -1888,12 +1888,12 @@ public class Combine {
       extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
 
     private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final SerializableFunction<? super K, Integer> hotKeyFanout;
 
     private PerKeyWithHotKeyFanout(String name,
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         SerializableFunction<? super K, Integer> hotKeyFanout) {
       super(name);
       this.fn = fn;
@@ -1976,7 +1976,7 @@ public class Combine {
 
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                builder.include(PerKeyWithHotKeyFanout.this);
+                builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
         postCombine =
@@ -2024,7 +2024,7 @@ public class Combine {
               }
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                builder.include(PerKeyWithHotKeyFanout.this);
+                builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
       } else {
@@ -2068,7 +2068,7 @@ public class Combine {
               }
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                builder.include(PerKeyWithHotKeyFanout.this);
+                builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
         postCombine =
@@ -2117,7 +2117,7 @@ public class Combine {
               }
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                builder.include(PerKeyWithHotKeyFanout.this);
+                builder.delegate(PerKeyWithHotKeyFanout.this);
               }
             };
       }
@@ -2202,7 +2202,7 @@ public class Combine {
 
       Combine.populateDisplayData(builder, fn, fnDisplayData);
       if (hotKeyFanout instanceof HasDisplayData) {
-        builder.include((HasDisplayData) hotKeyFanout);
+        builder.include("hotKeyFanout", (HasDisplayData) hotKeyFanout);
       }
       builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass())
         .withLabel("Fanout Function"));
@@ -2349,12 +2349,12 @@ public class Combine {
                          PCollection<KV<K, OutputT>>> {
 
     private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn;
-    private final DisplayData.Item<? extends Class<?>> fnDisplayData;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
     private final List<PCollectionView<?>> sideInputs;
 
     private GroupedValues(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData) {
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       this.fn = SerializableUtils.clone(fn);
       this.fnDisplayData = fnDisplayData;
       this.sideInputs = ImmutableList.of();
@@ -2362,7 +2362,7 @@ public class Combine {
 
     private GroupedValues(
         PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn,
-        DisplayData.Item<? extends Class<?>> fnDisplayData,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData,
         List<PCollectionView<?>> sideInputs) {
       this.fn = SerializableUtils.clone(fn);
       this.fnDisplayData = fnDisplayData;
@@ -2402,7 +2402,7 @@ public class Combine {
 
             @Override
             public void populateDisplayData(DisplayData.Builder builder) {
-              Combine.GroupedValues.this.populateDisplayData(builder);
+              builder.delegate(Combine.GroupedValues.this);
             }
           }).withSideInputs(sideInputs));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index 229b1d2..1b3e525 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -21,18 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -1044,35 +1040,12 @@ public class CombineFns {
    */
   private static void populateDisplayData(
       DisplayData.Builder builder, List<? extends HasDisplayData> combineFns) {
-
-    // NB: ArrayListMultimap necessary to maintain ordering of combineFns of the same type.
-    Multimap<Class<?>, HasDisplayData> combineFnMap = ArrayListMultimap.create();
-
     for (int i = 0; i < combineFns.size(); i++) {
       HasDisplayData combineFn = combineFns.get(i);
-      builder.add(DisplayData.item("combineFn" + (i + 1), combineFn.getClass())
+      String token = "combineFn" + (i + 1);
+      builder.add(DisplayData.item(token, combineFn.getClass())
         .withLabel("Combine Function"));
-      combineFnMap.put(combineFn.getClass(), combineFn);
-    }
-
-    for (Map.Entry<Class<?>, Collection<HasDisplayData>> combineFnEntries :
-        combineFnMap.asMap().entrySet()) {
-
-      Collection<HasDisplayData> classCombineFns = combineFnEntries.getValue();
-      if (classCombineFns.size() == 1) {
-        // Only one combineFn of this type, include it directly.
-        builder.include(Iterables.getOnlyElement(classCombineFns));
-
-      } else {
-        // Multiple combineFns of same type, add a namespace suffix so display data is
-        // unique and ordered.
-        String baseNamespace = combineFnEntries.getKey().getName();
-        for (int i = 0; i < combineFns.size(); i++) {
-          HasDisplayData combineFn = combineFns.get(i);
-          String namespace = String.format("%s#%d", baseNamespace, i + 1);
-          builder.include(combineFn, namespace);
-        }
-      }
+      builder.include(token, combineFn);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
index 3dd4fe2..7ac952c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
@@ -171,8 +171,7 @@ public class CombineWithContext {
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
-          super.populateDisplayData(builder);
-          CombineFnWithContext.this.populateDisplayData(builder);
+          builder.delegate(CombineFnWithContext.this);
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 12d4824..18d9333 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -246,7 +246,7 @@ public class DoFnAdapters {
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
-      builder.include(fn);
+      builder.delegate(fn);
     }
 
     private void readObject(java.io.ObjectInputStream in)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index b590d45..4ef809f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -119,7 +119,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   //////////////////////////////////////////////////////////////////////////////////////////////////
 
   private final SimpleFunction<InputT, ? extends Iterable<OutputT>> fn;
-  private final DisplayData.Item<?> fnClassDisplayData;
+  private final DisplayData.ItemSpec<?> fnClassDisplayData;
 
   private FlatMapElements(
       SimpleFunction<InputT, ? extends Iterable<OutputT>> fn,
@@ -166,7 +166,9 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.include(fn).add(fnClassDisplayData);
+    builder
+        .include("flatMapFn", fn)
+        .add(fnClassDisplayData);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 73e4359..c109034 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -103,7 +103,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   ///////////////////////////////////////////////////////////////////
 
   private final SimpleFunction<InputT, OutputT> fn;
-  private final DisplayData.Item<?> fnClassDisplayData;
+  private final DisplayData.ItemSpec<?> fnClassDisplayData;
 
   private MapElements(SimpleFunction<InputT, OutputT> fn, Class<?> fnClass) {
     this.fn = fn;
@@ -123,7 +123,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
 
               @Override
               public void populateDisplayData(DisplayData.Builder builder) {
-                MapElements.this.populateDisplayData(builder);
+                builder.delegate(MapElements.this);
               }
 
               @Override
@@ -141,6 +141,8 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.include(fn).add(fnClassDisplayData);
+    builder
+        .include("mapFn", fn)
+        .add(fnClassDisplayData);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 8aa87e4..93eb1ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -521,7 +521,7 @@ public class ParDo {
    */
   public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
     validate(fn);
-    return of(adapt(fn), fn.getClass());
+    return of(adapt(fn), displayDataForFn(fn));
   }
 
   /**
@@ -538,12 +538,17 @@ public class ParDo {
    */
   @Deprecated
   public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
-    return of(fn, fn.getClass());
+    return of(fn, displayDataForFn(fn));
   }
 
   private static <InputT, OutputT> Bound<InputT, OutputT> of(
-          OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
-    return new Unbound().of(fn, fnClass);
+          OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+    return new Unbound().of(fn, fnDisplayData);
+  }
+
+  private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) {
+    return DisplayData.item("fn", fn.getClass())
+        .withLabel("Transform Function");
   }
 
   /**
@@ -666,7 +671,7 @@ public class ParDo {
      */
     public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
       validate(fn);
-      return of(adapt(fn), fn.getClass());
+      return of(adapt(fn), displayDataForFn(fn));
     }
 
     /**
@@ -681,12 +686,12 @@ public class ParDo {
      */
     @Deprecated
     public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
-      return of(fn, fn.getClass());
+      return of(fn, displayDataForFn(fn));
     }
 
     private <InputT, OutputT> Bound<InputT, OutputT> of(
-        OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
-      return new Bound<>(name, sideInputs, fn, fnClass);
+        OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
+      return new Bound<>(name, sideInputs, fn, fnDisplayData);
     }
   }
 
@@ -707,16 +712,16 @@ public class ParDo {
     // Inherits name.
     private final List<PCollectionView<?>> sideInputs;
     private final OldDoFn<InputT, OutputT> fn;
-    private final Class<?> fnClass;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
 
     Bound(String name,
           List<PCollectionView<?>> sideInputs,
           OldDoFn<InputT, OutputT> fn,
-          Class<?> fnClass) {
+          DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       super(name);
       this.sideInputs = sideInputs;
       this.fn = SerializableUtils.clone(fn);
-      this.fnClass = fnClass;
+      this.fnDisplayData = fnDisplayData;
     }
 
     /**
@@ -744,7 +749,7 @@ public class ParDo {
       ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder();
       builder.addAll(this.sideInputs);
       builder.addAll(sideInputs);
-      return new Bound<>(name, builder.build(), fn, fnClass);
+      return new Bound<>(name, builder.build(), fn, fnDisplayData);
     }
 
     /**
@@ -758,7 +763,7 @@ public class ParDo {
     public BoundMulti<InputT, OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag,
                                            TupleTagList sideOutputTags) {
       return new BoundMulti<>(
-          name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
+          name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData);
     }
 
     @Override
@@ -802,7 +807,7 @@ public class ParDo {
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      ParDo.populateDisplayData(builder, fn, fnClass);
+      ParDo.populateDisplayData(builder, fn, fnDisplayData);
     }
 
     public OldDoFn<InputT, OutputT> getFn() {
@@ -883,7 +888,7 @@ public class ParDo {
      */
     public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
       validate(fn);
-      return of(adapt(fn), fn.getClass());
+      return of(adapt(fn), displayDataForFn(fn));
     }
 
     /**
@@ -898,12 +903,13 @@ public class ParDo {
      */
     @Deprecated
     public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
-      return of(fn, fn.getClass());
+      return of(fn, displayDataForFn(fn));
     }
 
-    private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
+    private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn,
+        DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       return new BoundMulti<>(
-              name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
+              name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData);
     }
   }
 
@@ -925,20 +931,20 @@ public class ParDo {
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList sideOutputTags;
     private final OldDoFn<InputT, OutputT> fn;
-    private final Class<?> fnClass;
+    private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
 
     BoundMulti(String name,
                List<PCollectionView<?>> sideInputs,
                TupleTag<OutputT> mainOutputTag,
                TupleTagList sideOutputTags,
                OldDoFn<InputT, OutputT> fn,
-               Class<?> fnClass) {
+               DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
       super(name);
       this.sideInputs = sideInputs;
       this.mainOutputTag = mainOutputTag;
       this.sideOutputTags = sideOutputTags;
       this.fn = SerializableUtils.clone(fn);
-      this.fnClass = fnClass;
+      this.fnDisplayData = fnDisplayData;
     }
 
     /**
@@ -969,7 +975,7 @@ public class ParDo {
       builder.addAll(sideInputs);
       return new BoundMulti<>(
           name, builder.build(),
-          mainOutputTag, sideOutputTags, fn, fnClass);
+          mainOutputTag, sideOutputTags, fn, fnDisplayData);
     }
 
 
@@ -1023,7 +1029,7 @@ public class ParDo {
     @Override
     public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      ParDo.populateDisplayData(builder, fn, fnClass);
+      ParDo.populateDisplayData(builder, fn, fnDisplayData);
     }
 
     public OldDoFn<InputT, OutputT> getFn() {
@@ -1044,11 +1050,11 @@ public class ParDo {
   }
 
   private static void populateDisplayData(
-      DisplayData.Builder builder, OldDoFn<?, ?> fn, Class<?> fnClass) {
+      DisplayData.Builder builder, OldDoFn<?, ?> fn,
+      DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
     builder
-        .include(fn)
-        .add(DisplayData.item("fn", fnClass)
-            .withLabel("Transform Function"));
+        .include("fn", fn)
+        .add(fnDisplayData);
   }
 
   private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 9247942..5b4eead 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -124,7 +124,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.include(partitionDoFn);
+    builder.include("partitionFn", partitionDoFn);
   }
 
   private final transient PartitionDoFn<T> partitionDoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 394666b..5ab6342 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -20,15 +20,19 @@ package org.apache.beam.sdk.transforms.display;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import autovalue.shaded.com.google.common.common.base.Joiner;
 import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -48,12 +52,12 @@ import org.joda.time.format.ISODateTimeFormat;
  * interface.
  */
 public class DisplayData implements Serializable {
-  private static final DisplayData EMPTY = new DisplayData(Maps.<Identifier, Item<?>>newHashMap());
+  private static final DisplayData EMPTY = new DisplayData(Maps.<Identifier, Item>newHashMap());
   private static final DateTimeFormatter TIMESTAMP_FORMATTER = ISODateTimeFormat.dateTime();
 
-  private final ImmutableMap<Identifier, Item<?>> entries;
+  private final ImmutableMap<Identifier, Item> entries;
 
-  private DisplayData(Map<Identifier, Item<?>> entries) {
+  private DisplayData(Map<Identifier, Item> entries) {
     this.entries = ImmutableMap.copyOf(entries);
   }
 
@@ -71,7 +75,11 @@ public class DisplayData implements Serializable {
    */
   public static DisplayData from(HasDisplayData component) {
     checkNotNull(component, "component argument cannot be null");
-    return InternalBuilder.forRoot(component).build();
+
+    InternalBuilder builder = new InternalBuilder();
+    builder.include(Path.root(), component);
+
+    return builder.build();
   }
 
   /**
@@ -99,11 +107,11 @@ public class DisplayData implements Serializable {
   }
 
   @JsonValue
-  public Collection<Item<?>> items() {
+  public Collection<Item> items() {
     return entries.values();
   }
 
-  public Map<Identifier, Item<?>> asMap() {
+  public Map<Identifier, Item> asMap() {
     return entries;
   }
 
@@ -126,7 +134,7 @@ public class DisplayData implements Serializable {
   public String toString() {
     StringBuilder builder = new StringBuilder();
     boolean isFirstLine = true;
-    for (Item<?> entry : entries.values()) {
+    for (Item entry : entries.values()) {
       if (isFirstLine) {
         isFirstLine = false;
       } else {
@@ -149,70 +157,81 @@ public class DisplayData implements Serializable {
    */
   public interface Builder {
     /**
-     * Register display data from the specified subcomponent. For example, a {@link PTransform}
-     * which delegates to a user-provided function can implement {@link HasDisplayData} on the
-     * function and include it from the {@link PTransform}:
+     * Register display data from the specified subcomponent at the given path. For example, a
+     * {@link PTransform} which delegates to a user-provided function can implement
+     * {@link HasDisplayData} on the function and include it from the {@link PTransform}:
      *
      * <pre><code>{@literal @Override}
      * public void populateDisplayData(DisplayData.Builder builder) {
      *   super.populateDisplayData(builder);
      *
      *   builder
-     *     .add(DisplayData.item("userFn", userFn)) // To register the class name of the userFn
-     *     .include(userFn); // To allow the userFn to register additional display data
+     *     // To register the class name of the userFn
+     *     .add(DisplayData.item("userFn", userFn.getClass()))
+     *     // To allow the userFn to register additional display data
+     *     .include("userFn", userFn);
      * }
      * </code></pre>
      *
-     * <p>Using {@code include(subcomponent)} will associate each of the registered items with the
-     * namespace of the {@code subcomponent} being registered. To register display data in the
-     * current namespace, such as from a base class implementation, use
+     * <p>Using {@code include(path, subcomponent)} will associate each of the registered items with
+     * the namespace of the {@code subcomponent} being registered, with the specified path element
+     * relative to the current path. To register display data in the current path and namespace,
+     * such as from a base class implementation, use
      * {@code subcomponent.populateDisplayData(builder)} instead.
      *
      * @see HasDisplayData#populateDisplayData(DisplayData.Builder)
      */
-    Builder include(HasDisplayData subComponent);
+    Builder include(String path, HasDisplayData subComponent);
 
     /**
-     * Register display data from the specified subcomponent, overriding the namespace of
-     * subcomponent display items with the specified namespace.
+     * Register display data from the specified component on behalf of the current component.
+     * Display data items will be added with the subcomponent namespace but the current component
+     * path.
      *
-     * @see #include(HasDisplayData)
-     */
-    Builder include(HasDisplayData subComponent, Class<?> namespace);
-
-    /**
-     * Register display data from the specified subcomponent, overriding the namespace of
-     * subcomponent display items with the specified namespace.
+     * <p>This is useful for components which simply wrap other components and wish to retain the
+     * display data from the wrapped component. Such components should implement
+     * {@code populateDisplayData} as:
      *
-     * @see #include(HasDisplayData)
+     * <pre><code>{@literal @Override}
+     * public void populateDisplayData(DisplayData.Builder builder) {
+     *   builder.delegate(wrapped);
+     * }
+     * </code></pre>
      */
-    Builder include(HasDisplayData subComponent, String namespace);
+    Builder delegate(HasDisplayData component);
 
     /**
      * Register the given display item.
      */
-    Builder add(Item<?> item);
+    Builder add(ItemSpec<?> item);
 
     /**
      * Register the given display item if the value is not null.
      */
-    Builder addIfNotNull(Item<?> item);
+    Builder addIfNotNull(ItemSpec<?> item);
 
     /**
      * Register the given display item if the value is different than the specified default.
      */
-    <T> Builder addIfNotDefault(Item<T> item, @Nullable T defaultValue);
+    <T> Builder addIfNotDefault(ItemSpec<T> item, @Nullable T defaultValue);
   }
 
   /**
-   * {@link Item Items} are the unit of display data. Each item is identified by a given key
+   * {@link Item Items} are the unit of display data. Each item is identified by a given path, key,
    * and namespace from the component the display item belongs to.
    *
    * <p>{@link Item Items} are registered via {@link DisplayData.Builder#add}
    * within {@link HasDisplayData#populateDisplayData} implementations.
    */
   @AutoValue
-  public abstract static class Item<T> implements Serializable {
+  public abstract static class Item {
+
+    /**
+     * The path for the display item within a component hierarchy.
+     */
+    @Nullable
+    @JsonIgnore
+    public abstract Path getPath();
 
     /**
      * The namespace for the display item. The namespace defaults to the component which
@@ -220,7 +239,7 @@ public class DisplayData implements Serializable {
      */
     @Nullable
     @JsonGetter("namespace")
-    public abstract String getNamespace();
+    public abstract Class<?> getNamespace();
 
     /**
      * The key for the display item. Each display item is created with a key and value
@@ -240,11 +259,8 @@ public class DisplayData implements Serializable {
      * Retrieve the value of the display item. The value is translated from the input to
      * {@link DisplayData#item} into a format suitable for display. Translation is based on the
      * item's {@link #getType() type}.
-     *
-     * <p>The value will only be {@literal null} if the input value during creation was null.
      */
     @JsonGetter("value")
-    @Nullable
     public abstract Object getValue();
 
     /**
@@ -285,27 +301,104 @@ public class DisplayData implements Serializable {
     @Nullable
     public abstract String getLinkUrl();
 
-    private static <T> Item<T> create(String key, Type type, @Nullable T value) {
-      FormattedItemValue formatted = type.safeFormat(value);
-      return of(null, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null);
+    private static Item create(ItemSpec<?> spec, Path path) {
+      checkNotNull(spec, "spec cannot be null");
+      checkNotNull(path, "path cannot be null");
+      Class<?> ns = checkNotNull(spec.getNamespace(), "namespace must be set");
+
+      return new AutoValue_DisplayData_Item(path, ns, spec.getKey(), spec.getType(),
+          spec.getValue(), spec.getShortValue(), spec.getLabel(), spec.getLinkUrl());
     }
 
+    @Override
+    public String toString() {
+      return String.format("%s%s:%s=%s", getPath(), getNamespace().getName(), getKey(), getValue());
+    }
+  }
+
+  /**
+   * Specifies an {@link Item} to register as display data. Each item is identified by a given
+   * path, key, and namespace from the component the display item belongs to.
+   *
+   * <p>{@link Item Items} are registered via {@link DisplayData.Builder#add}
+   * within {@link HasDisplayData#populateDisplayData} implementations.
+   */
+  @AutoValue
+  public abstract static class ItemSpec<T> implements Serializable {
+    /**
+     * The namespace for the display item. If unset, defaults to the component which
+     * the display item is registered to.
+     */
+    @Nullable
+    public abstract Class<?> getNamespace();
+
+    /**
+     * The key for the display item. Each display item is created with a key and value
+     * via {@link DisplayData#item}.
+     */
+    public abstract String getKey();
+
+    /**
+     * The {@link DisplayData.Type} of display data. All display data conforms to a predefined set
+     * of allowed types.
+     */
+    public abstract Type getType();
+
+    /**
+     * The value of the display item. The value is translated from the input to
+     * {@link DisplayData#item} into a format suitable for display. Translation is based on the
+     * item's {@link #getType() type}.
+     */
+    @Nullable
+    public abstract Object getValue();
+
     /**
-     * Set the item {@link Item#getNamespace() namespace} from the given {@link Class}.
+     * The optional short value for an item, or {@code null} if none is provided.
      *
-     * <p>This method does not alter the current instance, but instead returns a new {@link Item}
-     * with the namespace set.
+     * <p>The short value is an alternative display representation for items having a long display
+     * value. For example, the {@link #getValue() value} for {@link Type#JAVA_CLASS} items contains
+     * the full class name with package, while the short value contains just the class name.
+     *
+     * <p>A {@link #getValue() value} will be provided for each display item, and some types may
+     * also provide a short-value. If a short value is provided, display data consumers may
+     * choose to display it instead of or in addition to the {@link #getValue() value}.
      */
-    public Item<T> withNamespace(Class<?> namespace) {
-      checkNotNull(namespace, "namespace argument cannot be null");
-      return withNamespace(namespaceOf(namespace));
+    @Nullable
+    public abstract Object getShortValue();
+
+    /**
+     * The optional label for an item. The label is a human-readable description of what
+     * the metadata represents. UIs may choose to display the label instead of the item key.
+     */
+    @Nullable
+    public abstract String getLabel();
+
+    /**
+     * The optional link URL for an item. The URL points to an address where the reader
+     * can find additional context for the display data.
+     */
+    @Nullable
+    public abstract String getLinkUrl();
+
+    private static <T> ItemSpec<T> create(String key, Type type, @Nullable T value) {
+      return ItemSpec.<T>builder()
+          .setKey(key)
+          .setType(type)
+          .setRawValue(value)
+          .build();
     }
 
-    /** @see #withNamespace(Class) */
-    public Item<T> withNamespace(String namespace) {
+    /**
+     * Set the item {@link ItemSpec#getNamespace() namespace} from the given {@link Class}.
+     *
+     * <p>This method does not alter the current instance, but instead returns a new
+     * {@link ItemSpec} with the namespace set.
+     */
+    public ItemSpec<T> withNamespace(Class<?> namespace) {
       checkNotNull(namespace, "namespace argument cannot be null");
-      return of(
-          namespace, getKey(), getType(), getValue(), getShortValue(), getLabel(), getLinkUrl());
+      return toBuilder()
+          .setNamespace(namespace)
+          .build();
     }
 
     /**
@@ -313,12 +406,13 @@ public class DisplayData implements Serializable {
      *
      * <p>Specifying a null value will clear the label if it was previously defined.
      *
-     * <p>This method does not alter the current instance, but instead returns a new {@link Item}
-     * with the label set.
+     * <p>This method does not alter the current instance, but instead returns a new
+     * {@link ItemSpec} with the label set.
      */
-    public Item<T> withLabel(String label) {
-      return of(
-          getNamespace(), getKey(), getType(), getValue(), getShortValue(), label, getLinkUrl());
+    public ItemSpec<T> withLabel(@Nullable String label) {
+      return toBuilder()
+          .setLabel(label)
+          .build();
     }
 
     /**
@@ -326,11 +420,13 @@ public class DisplayData implements Serializable {
      *
      * <p>Specifying a null value will clear the link url if it was previously defined.
      *
-     * <p>This method does not alter the current instance, but instead returns a new {@link Item}
-     * with the link url set.
+     * <p>This method does not alter the current instance, but instead returns a new
+     * {@link ItemSpec} with the link url set.
      */
-    public Item<T> withLinkUrl(String url) {
-      return of(getNamespace(), getKey(), getType(), getValue(), getShortValue(), getLabel(), url);
+    public ItemSpec<T> withLinkUrl(@Nullable String url) {
+      return toBuilder()
+          .setLinkUrl(url)
+          .build();
     }
 
     /**
@@ -339,84 +435,166 @@ public class DisplayData implements Serializable {
      * <p>This should only be used internally. It is useful to compare the value of a
      * {@link DisplayData.Item} to the value derived from a specified input.
      */
-    private Item<T> withValue(Object value) {
-      FormattedItemValue formatted = getType().safeFormat(value);
-      return of(getNamespace(), getKey(), getType(), formatted.getLongValue(),
-          formatted.getShortValue(), getLabel(), getLinkUrl());
-    }
-
-    private static <T> Item<T> of(
-        @Nullable String namespace,
-        String key,
-        Type type,
-        @Nullable Object value,
-        @Nullable Object shortValue,
-        @Nullable String label,
-        @Nullable String linkUrl) {
-      return new AutoValue_DisplayData_Item<>(
-          namespace, key, type, value, shortValue, label, linkUrl);
+    private ItemSpec<T> withValue(T value) {
+      return toBuilder()
+          .setRawValue(value)
+          .build();
     }
 
     @Override
     public String toString() {
       return String.format("%s:%s=%s", getNamespace(), getKey(), getValue());
     }
+
+    static <T> ItemSpec.Builder<T> builder() {
+      return new AutoValue_DisplayData_ItemSpec.Builder<>();
+    }
+
+    abstract ItemSpec.Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      public abstract ItemSpec.Builder<T> setKey(String key);
+      public abstract ItemSpec.Builder<T> setNamespace(@Nullable Class<?> namespace);
+      public abstract ItemSpec.Builder<T> setType(Type type);
+      public abstract ItemSpec.Builder<T> setValue(@Nullable Object longValue);
+      public abstract ItemSpec.Builder<T> setShortValue(@Nullable Object shortValue);
+      public abstract ItemSpec.Builder<T> setLabel(@Nullable String label);
+      public abstract ItemSpec.Builder<T> setLinkUrl(@Nullable String url);
+      public abstract ItemSpec<T> build();
+
+
+      abstract Type getType();
+
+      ItemSpec.Builder<T> setRawValue(@Nullable T value) {
+        FormattedItemValue formatted = getType().safeFormat(value);
+        return this
+            .setValue(formatted.getLongValue())
+            .setShortValue(formatted.getShortValue());
+      }
+    }
   }
 
   /**
    * Unique identifier for a display data item within a component.
-   * Identifiers are composed of the key they are registered with and a namespace generated from
-   * the class of the component which registered the item.
+   *
+   * <p>Identifiers are composed of:
+   *
+   * <ul>
+   *   <li>A {@link #getPath() path} based on the component hierarchy</li>
+   *   <li>The {@link #getKey() key} it is registered with</li>
+   *   <li>A {@link #getNamespace() namespace} generated from the class of the component which
+   *   registered the item.</li>
+   * </ul>
    *
    * <p>Display data registered with the same key from different components will have different
    * namespaces and thus will both be represented in the composed {@link DisplayData}. If a
    * single component registers multiple metadata items with the same key, only the most recent
    * item will be retained; previous versions are discarded.
    */
-  public static class Identifier {
-    private final String ns;
-    private final String key;
+  @AutoValue
+  public abstract static class Identifier {
+    public abstract Path getPath();
+    public abstract Class<?> getNamespace();
+    public abstract String getKey();
 
-    public static Identifier of(Class<?> namespace, String key) {
-      return of(namespaceOf(namespace), key);
+    public static Identifier of(Path path, Class<?> namespace, String key) {
+      return new AutoValue_DisplayData_Identifier(path, namespace, key);
     }
 
-    public static Identifier of(String namespace, String key) {
-      return new Identifier(namespace, key);
+    @Override
+    public String toString() {
+      return String.format("%s%s:%s", getPath(), getNamespace(), getKey());
     }
+  }
 
-    private Identifier(String ns, String key) {
-      this.ns = ns;
-      this.key = key;
+  /**
+   * Structured path of registered display data within a component hierarchy.
+   *
+   * <p>Display data items registered directly by a component will have the {@link Path#root() root}
+   * path. If the component {@link Builder#include includes} a sub-component, its display data will
+   * be registered at the path specified. Each sub-component path is created by appending a child
+   * element to the path of its parent component, forming a hierarchy.
+   */
+  public static class Path {
+    private final ImmutableList<String> components;
+    private Path(ImmutableList<String> components) {
+      this.components = components;
     }
 
-    public String getNamespace() {
-      return ns;
+    /**
+     * Path for display data registered by a top-level component.
+     */
+    public static Path root() {
+      return new Path(ImmutableList.<String>of());
     }
 
-    public String getKey() {
-      return key;
+    /**
+     * Construct a path from an absolute component path hierarchy.
+     *
+     * <p>For the root path, use {@link Path#root()}.
+     *
+     * @param firstPath Path of the first sub-component.
+     * @param paths Additional path components.
+     */
+    public static Path absolute(String firstPath, String... paths) {
+      ImmutableList.Builder<String> builder = ImmutableList.builder();
+
+      validatePathElement(firstPath);
+      builder.add(firstPath);
+      for (String path : paths) {
+        validatePathElement(path);
+        builder.add(path);
+      }
+
+      return new Path(builder.build());
     }
 
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof Identifier) {
-        Identifier that = (Identifier) obj;
-        return Objects.equals(this.ns, that.ns)
-          && Objects.equals(this.key, that.key);
-      }
+    /**
+     * Hierarchy list of component paths making up the full path, starting with the top-level child
+     * component path. For the {@link #root root} path, returns the empty list.
+     */
+    public List<String> getComponents() {
+      return components;
+    }
 
-      return false;
+    /**
+     * Extend the path by appending a sub-component path. The new path element is added to the end
+     * of the path hierarchy.
+     *
+     * <p>Returns a new {@link Path} instance; the originating {@link Path} is not modified.
+     */
+    public Path extend(String path) {
+      validatePathElement(path);
+      return new Path(ImmutableList.<String>builder()
+          .addAll(components.iterator())
+          .add(path)
+          .build());
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hash(ns, key);
+    private static void validatePathElement(String path) {
+      checkNotNull(path);
+      checkArgument(!"".equals(path), "path cannot be empty");
     }
 
     @Override
     public String toString() {
-      return String.format("%s:%s", ns, key);
+      StringBuilder b = new StringBuilder().append("[");
+      Joiner.on("/").appendTo(b, components);
+      b.append("]");
+      return b.toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof Path
+          && Objects.equals(components, ((Path) obj).components);
+
+    }
+
+    @Override
+    public int hashCode() {
+      return components.hashCode();
     }
   }
 
@@ -551,65 +729,79 @@ public class DisplayData implements Serializable {
     Object getLongValue() {
       return this.longValue;
     }
-
     Object getShortValue() {
       return this.shortValue;
     }
   }
 
   private static class InternalBuilder implements Builder {
-    private final Map<Identifier, Item<?>> entries;
-    private final Set<Object> visited;
+    private final Map<Identifier, Item> entries;
+    private final Set<HasDisplayData> visitedComponents;
+    private final Map<Path, HasDisplayData> visitedPathMap;
 
-    private String latestNs;
+    private Path latestPath;
+    private Class<?> latestNs;
 
     private InternalBuilder() {
       this.entries = Maps.newHashMap();
-      this.visited = Sets.newIdentityHashSet();
-    }
-
-    private static InternalBuilder forRoot(HasDisplayData instance) {
-      InternalBuilder builder = new InternalBuilder();
-      builder.include(instance);
-      return builder;
+      this.visitedComponents = Sets.newIdentityHashSet();
+      this.visitedPathMap = Maps.newHashMap();
     }
 
     @Override
-    public Builder include(HasDisplayData subComponent) {
+    public Builder include(String path, HasDisplayData subComponent) {
       checkNotNull(subComponent, "subComponent argument cannot be null");
-      return include(subComponent, subComponent.getClass());
+      checkNotNull(path, "path argument cannot be null");
+
+      Path absolutePath = latestPath.extend(path);
+
+      HasDisplayData existingComponent = visitedPathMap.get(absolutePath);
+      if (existingComponent != null) {
+        throw new IllegalArgumentException(String.format("Specified path '%s' already used for "
+                + "subcomponent %s. Subcomponents must be included using unique paths.",
+            path, existingComponent));
+      }
+
+      return include(absolutePath, subComponent);
     }
 
     @Override
-    public Builder include(HasDisplayData subComponent, Class<?> namespace) {
-      checkNotNull(namespace, "Input namespace override cannot be null");
-      return include(subComponent, namespaceOf(namespace));
+    public Builder delegate(HasDisplayData component) {
+      checkNotNull(component);
+
+      return include(latestPath, component);
     }
 
-    @Override
-    public Builder include(HasDisplayData subComponent, String namespace) {
-      checkNotNull(subComponent, "subComponent argument cannot be null");
-      checkNotNull(namespace, "Input namespace override cannot be null");
-
-      boolean newComponent = visited.add(subComponent);
-      if (newComponent) {
-        String prevNs = this.latestNs;
-        this.latestNs = namespace;
-
-        try {
-          subComponent.populateDisplayData(this);
-        } catch (PopulateDisplayDataException e) {
-          // Don't re-wrap exceptions recursively.
-          throw e;
-        } catch (Throwable e) {
-          String msg = String.format("Error while populating display data for component: %s",
-              namespace);
-          throw new PopulateDisplayDataException(msg, e);
-        }
+    private Builder include(Path path, HasDisplayData subComponent) {
+      if (visitedComponents.contains(subComponent)) {
+        // Component previously registered; ignore in order to break cyclic dependencies
+        return this;
+      }
 
-        this.latestNs = prevNs;
+      // New component; add it.
+      visitedComponents.add(subComponent);
+      visitedPathMap.put(path, subComponent);
+      Class<?> namespace = subComponent.getClass();
+
+      Path prevPath = latestPath;
+      Class<?> prevNs = latestNs;
+      latestPath = path;
+      latestNs = namespace;
+
+      try {
+        subComponent.populateDisplayData(this);
+      } catch (PopulateDisplayDataException e) {
+        // Don't re-wrap exceptions recursively.
+        throw e;
+      } catch (Throwable e) {
+        String msg = String.format("Error while populating display data for component: %s",
+            namespace.getName());
+        throw new PopulateDisplayDataException(msg, e);
       }
 
+      latestPath = prevPath;
+      latestNs = prevNs;
+
       return this;
     }
 
@@ -623,39 +815,41 @@ public class DisplayData implements Serializable {
     }
 
     @Override
-    public Builder add(Item<?> item) {
+    public Builder add(ItemSpec<?> item) {
       checkNotNull(item, "Input display item cannot be null");
       return addItemIf(true, item);
     }
 
     @Override
-    public Builder addIfNotNull(Item<?> item) {
+    public Builder addIfNotNull(ItemSpec<?> item) {
       checkNotNull(item, "Input display item cannot be null");
       return addItemIf(item.getValue() != null, item);
     }
 
     @Override
-    public <T> Builder addIfNotDefault(Item<T> item, @Nullable T defaultValue) {
+    public <T> Builder addIfNotDefault(ItemSpec<T> item, @Nullable T defaultValue) {
       checkNotNull(item, "Input display item cannot be null");
-      Item<T> defaultItem = item.withValue(defaultValue);
+      ItemSpec<T> defaultItem = item.withValue(defaultValue);
       return addItemIf(!Objects.equals(item, defaultItem), item);
     }
 
-    private Builder addItemIf(boolean condition, Item<?> item) {
+    private Builder addItemIf(boolean condition, ItemSpec<?> spec) {
       if (!condition) {
         return this;
       }
 
-      checkNotNull(item, "Input display item cannot be null");
-      checkNotNull(item.getValue(), "Input display value cannot be null");
-      if (item.getNamespace() == null) {
-        item = item.withNamespace(latestNs);
+      checkNotNull(spec, "Input display item cannot be null");
+      checkNotNull(spec.getValue(), "Input display value cannot be null");
+
+      if (spec.getNamespace() == null) {
+        spec = spec.withNamespace(latestNs);
       }
+      Item item = Item.create(spec, latestPath);
 
-      Identifier id = Identifier.of(item.getNamespace(), item.getKey());
+      Identifier id = Identifier.of(item.getPath(), item.getNamespace(), item.getKey());
       checkArgument(!entries.containsKey(id),
-          "Display data key (%s) is not unique within the specified namespace (%s).",
-          item.getKey(), item.getNamespace());
+          "Display data key (%s) is not unique within the specified path and namespace: %s%s.",
+          item.getKey(), item.getPath(), item.getNamespace());
 
       entries.put(id, item);
       return this;
@@ -669,63 +863,63 @@ public class DisplayData implements Serializable {
   /**
    * Create a display item for the specified key and string value.
    */
-  public static Item<String> item(String key, @Nullable String value) {
+  public static ItemSpec<String> item(String key, @Nullable String value) {
     return item(key, Type.STRING, value);
   }
 
   /**
    * Create a display item for the specified key and integer value.
    */
-  public static Item<Integer> item(String key, @Nullable Integer value) {
+  public static ItemSpec<Integer> item(String key, @Nullable Integer value) {
     return item(key, Type.INTEGER, value);
   }
 
   /**
    * Create a display item for the specified key and integer value.
    */
-  public static Item<Long> item(String key, @Nullable Long value) {
+  public static ItemSpec<Long> item(String key, @Nullable Long value) {
     return item(key, Type.INTEGER, value);
   }
 
   /**
    * Create a display item for the specified key and floating point value.
    */
-  public static Item<Float> item(String key, @Nullable Float value) {
+  public static ItemSpec<Float> item(String key, @Nullable Float value) {
     return item(key, Type.FLOAT, value);
   }
 
   /**
    * Create a display item for the specified key and floating point value.
    */
-  public static Item<Double> item(String key, @Nullable Double value) {
+  public static ItemSpec<Double> item(String key, @Nullable Double value) {
     return item(key, Type.FLOAT, value);
   }
 
   /**
    * Create a display item for the specified key and boolean value.
    */
-  public static Item<Boolean> item(String key, @Nullable Boolean value) {
+  public static ItemSpec<Boolean> item(String key, @Nullable Boolean value) {
     return item(key, Type.BOOLEAN, value);
   }
 
   /**
    * Create a display item for the specified key and timestamp value.
    */
-  public static Item<Instant> item(String key, @Nullable Instant value) {
+  public static ItemSpec<Instant> item(String key, @Nullable Instant value) {
     return item(key, Type.TIMESTAMP, value);
   }
 
   /**
    * Create a display item for the specified key and duration value.
    */
-  public static Item<Duration> item(String key, @Nullable Duration value) {
+  public static ItemSpec<Duration> item(String key, @Nullable Duration value) {
     return item(key, Type.DURATION, value);
   }
 
   /**
    * Create a display item for the specified key and class value.
    */
-  public static <T> Item<Class<T>> item(String key, @Nullable Class<T> value) {
+  public static <T> ItemSpec<Class<T>> item(String key, @Nullable Class<T> value) {
     return item(key, Type.JAVA_CLASS, value);
   }
 
@@ -739,10 +933,10 @@ public class DisplayData implements Serializable {
    *
    *  @see Type#inferType(Object)
    */
-  public static <T> Item<T> item(String key, Type type, @Nullable T value) {
+  public static <T> ItemSpec<T> item(String key, Type type, @Nullable T value) {
     checkNotNull(key, "key argument cannot be null");
     checkNotNull(type, "type argument cannot be null");
 
-    return Item.create(key, type, value);
+    return ItemSpec.create(key, type, value);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 57f7716..684a776 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -578,7 +578,7 @@ public class Window {
         builder
             .add(DisplayData.item("windowFn", windowFn.getClass())
               .withLabel("Windowing Function"))
-            .include(windowFn);
+            .include("windowFn", windowFn);
       }
 
       if (allowedLateness != null) {



[3/3] incubator-beam git commit: Closes #1088

Posted by bc...@apache.org.
Closes #1088


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5047cf74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5047cf74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5047cf74

Branch: refs/heads/master
Commit: 5047cf746f5040e54e840c0cf9898c41affe17bd
Parents: ff6301b ad03d07
Author: bchambers <bc...@google.com>
Authored: Thu Oct 20 11:10:27 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Thu Oct 20 11:10:27 2016 -0700

----------------------------------------------------------------------
 .../core/UnboundedReadFromBoundedSource.java    |   2 +-
 runners/direct-java/pom.xml                     |   5 -
 .../runners/direct/ForwardingPTransform.java    |   2 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  32 --
 .../direct/ForwardingPTransformTest.java        |   7 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   5 +-
 .../DataflowUnboundedReadFromBoundedSource.java |   4 +-
 .../sdk/io/BoundedReadFromUnboundedSource.java  |   5 +-
 .../apache/beam/sdk/io/CompressedSource.java    |   2 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |   5 +-
 .../main/java/org/apache/beam/sdk/io/Read.java  |   4 +-
 .../main/java/org/apache/beam/sdk/io/Write.java |   6 +-
 .../sdk/options/ProxyInvocationHandler.java     | 149 +++---
 .../org/apache/beam/sdk/transforms/Combine.java |  60 +--
 .../apache/beam/sdk/transforms/CombineFns.java  |  33 +-
 .../beam/sdk/transforms/CombineWithContext.java |   3 +-
 .../beam/sdk/transforms/DoFnAdapters.java       |   2 +-
 .../beam/sdk/transforms/FlatMapElements.java    |   6 +-
 .../apache/beam/sdk/transforms/MapElements.java |   8 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  60 ++-
 .../apache/beam/sdk/transforms/Partition.java   |   2 +-
 .../sdk/transforms/display/DisplayData.java     | 518 +++++++++++++------
 .../beam/sdk/transforms/windowing/Window.java   |   2 +-
 .../io/BoundedReadFromUnboundedSourceTest.java  |   4 +-
 .../beam/sdk/io/CompressedSourceTest.java       |   4 +-
 .../java/org/apache/beam/sdk/io/ReadTest.java   |  10 +-
 .../java/org/apache/beam/sdk/io/WriteTest.java  |   6 +-
 .../sdk/options/ProxyInvocationHandlerTest.java |  40 ++
 .../beam/sdk/transforms/CombineFnsTest.java     |   7 +-
 .../apache/beam/sdk/transforms/CombineTest.java |   4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |   8 +-
 .../transforms/display/DisplayDataMatchers.java | 141 +++--
 .../display/DisplayDataMatchersTest.java        |  67 ++-
 .../sdk/transforms/display/DisplayDataTest.java | 367 +++++++++----
 .../sdk/transforms/windowing/WindowTest.java    |   4 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |   5 -
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    |   2 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |   2 +-
 38 files changed, 988 insertions(+), 605 deletions(-)
----------------------------------------------------------------------