You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/26 03:46:29 UTC

[2/2] incubator-beam git commit: Register DisplayData from composite IO transforms

Register DisplayData from composite IO transforms


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15b11ede
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15b11ede
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15b11ede

Branch: refs/heads/master
Commit: 15b11edef1e24fbffaa37b57494746d53522e04e
Parents: ebc7035
Author: Scott Wegner <sw...@google.com>
Authored: Thu Apr 21 14:04:36 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 25 18:46:13 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 25 ++++++++++-
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 34 ++++++++++++++-
 .../org/apache/beam/sdk/io/CountingInput.java   | 22 ++++++++++
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 33 ++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIO.java     | 27 +++++++++++-
 .../apache/beam/sdk/io/bigtable/BigtableIO.java | 27 ++++++++++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 33 ++++++++++++++
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 46 ++++++++++++++++++++
 .../apache/beam/sdk/io/CountingInputTest.java   | 32 ++++++++++++++
 .../org/apache/beam/sdk/io/PubsubIOTest.java    | 44 +++++++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 34 +++++++++++++++
 .../beam/sdk/io/bigtable/BigtableIOTest.java    | 39 ++++++++++++++++-
 12 files changed, 392 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 8f76ae8..c706fc1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
@@ -327,6 +328,14 @@ public class AvroIO {
       }
 
       @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+        builder
+          .addIfNotNull("filePattern", filepattern)
+          .addIfNotDefault("validation", validate, true);
+      }
+
+      @Override
       protected Coder<T> getDefaultOutputCoder() {
         return AvroCoder.of(type, schema);
       }
@@ -467,6 +476,8 @@ public class AvroIO {
      * @param <T> the type of each of the elements of the input PCollection
      */
     public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
+      private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
       /** The filename to write to. */
       @Nullable
       final String filenamePrefix;
@@ -485,7 +496,7 @@ public class AvroIO {
       final boolean validate;
 
       Bound(Class<T> type) {
-        this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, type, null, true);
+        this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, type, null, true);
       }
 
       Bound(
@@ -679,6 +690,18 @@ public class AvroIO {
                     filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema))));
       }
 
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+        builder
+            .add("schema", type)
+            .addIfNotNull("filePrefix", filenamePrefix)
+            .addIfNotDefault("shardNameTemplate", shardTemplate, DEFAULT_SHARD_TEMPLATE)
+            .addIfNotDefault("fileSuffix", filenameSuffix, "")
+            .addIfNotDefault("numShards", numShards, 0)
+            .addIfNotDefault("validation", validate, true);
+      }
+
       /**
        * Returns the current shard name template string.
        */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 9239514..1d5b993 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.BigQueryServices;
 import org.apache.beam.sdk.util.BigQueryServices.LoadService;
@@ -80,7 +81,6 @@ import com.google.common.collect.Lists;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -517,6 +517,20 @@ public class BigQueryIO {
         return TableRowJsonCoder.of();
       }
 
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+
+        if (table != null) {
+          builder.add("table", toTableSpec(table));
+        }
+
+        builder
+            .addIfNotNull("query", query)
+            .addIfNotNull("flattenResults", flattenResults)
+            .addIfNotDefault("validation", validate, true);
+      }
+
       static {
         DirectPipelineRunner.registerDefaultTransformEvaluator(
             Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() {
@@ -1048,6 +1062,24 @@ public class BigQueryIO {
         return VoidCoder.of();
       }
 
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+
+        builder
+            .addIfNotNull("table", jsonTableRef)
+            .addIfNotNull("schema", jsonSchema);
+
+        if (tableRefFunction != null) {
+          builder.add("tableFn", tableRefFunction.getClass());
+        }
+
+        builder
+            .add("createDisposition", createDisposition.toString())
+            .add("writeDisposition", writeDisposition.toString())
+            .addIfNotDefault("validation", validate, true);
+      }
+
       /** Returns the create disposition. */
       public CreateDisposition getCreateDisposition() {
         return createDisposition;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index 55e6d96..f6b1256 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.io.CountingSource.NowTimestampFn;
 import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -114,6 +115,12 @@ public class CountingInput {
     public PCollection<Long> apply(PBegin begin) {
       return begin.apply(Read.from(CountingSource.upTo(numElements)));
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add("upTo", numElements);
+    }
   }
 
   /**
@@ -221,5 +228,20 @@ public class CountingInput {
             read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
       }
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.add("timestampFn", timestampFn.getClass());
+
+      if (maxReadTime.isPresent()) {
+        builder.add("maxReadTime", maxReadTime.get());
+      }
+
+      if (maxNumRecords.isPresent()) {
+        builder.add("maxRecords", maxNumRecords.get());
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 970fb51..1b0ec4a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.Transport;
@@ -689,6 +690,25 @@ public class PubsubIO {
       }
 
       @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+
+        builder
+            .addIfNotNull("timestampLabel", timestampLabel)
+            .addIfNotNull("idLabel", idLabel)
+            .addIfNotNull("maxReadTime", maxReadTime)
+            .addIfNotDefault("maxNumRecords", maxNumRecords, 0);
+
+        if (topic != null) {
+          builder.add("topic", topic.asPath());
+        }
+
+        if (subscription != null) {
+          builder.add("subscription", subscription.asPath());
+        }
+      }
+
+      @Override
       protected Coder<T> getDefaultOutputCoder() {
         return coder;
       }
@@ -974,6 +994,19 @@ public class PubsubIO {
       }
 
       @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+
+        builder
+            .addIfNotNull("timestampLabel", timestampLabel)
+            .addIfNotNull("idLabel", idLabel);
+
+        if (topic != null) {
+          builder.add("topic", topic.asPath());
+        }
+      }
+
+      @Override
       protected Coder<Void> getDefaultOutputCoder() {
         return VoidCoder.of();
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index b95fe15..3882ee1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PCollection;
@@ -339,6 +340,16 @@ public class TextIO {
       }
 
       @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+
+        builder
+            .add("compressionType", compressionType.toString())
+            .addIfNotDefault("validation", validate, true)
+            .addIfNotNull("filePattern", filepattern);
+      }
+
+      @Override
       protected Coder<T> getDefaultOutputCoder() {
         return coder;
       }
@@ -467,6 +478,8 @@ public class TextIO {
      * @param <T> the type of the elements of the input PCollection
      */
     public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
+      private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
       /** The prefix of each file written, combined with suffix and shardTemplate. */
       @Nullable private final String filenamePrefix;
       /** The suffix of each file written, combined with prefix and shardTemplate. */
@@ -485,7 +498,7 @@ public class TextIO {
       private final boolean validate;
 
       Bound(Coder<T> coder) {
-        this(null, null, "", coder, 0, ShardNameTemplate.INDEX_OF_MAX, true);
+        this(null, null, "", coder, 0, DEFAULT_SHARD_TEMPLATE, true);
       }
 
       private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder,
@@ -631,6 +644,18 @@ public class TextIO {
                 filenamePrefix, filenameSuffix, shardTemplate, coder)));
       }
 
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+
+        builder
+            .addIfNotNull("filePrefix", filenamePrefix)
+            .addIfNotDefault("fileSuffix", filenameSuffix, "")
+            .addIfNotDefault("shardNameTemplate", shardTemplate, DEFAULT_SHARD_TEMPLATE)
+            .addIfNotDefault("validation", validate, true)
+            .addIfNotDefault("numShards", numShards, 0);
+      }
+
       /**
        * Returns the current shard name template string.
        */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
index 5177262..28ff335 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
@@ -262,6 +263,21 @@ public class BigtableIO {
     }
 
     @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.add("tableId", tableId);
+
+      if (options != null) {
+        builder.add("bigtableOptions", options.toString());
+      }
+
+      if (filter != null) {
+        builder.add("rowFilter", filter.toString());
+      }
+    }
+
+    @Override
     public String toString() {
       return MoreObjects.toStringHelper(Read.class)
           .add("options", options)
@@ -429,6 +445,17 @@ public class BigtableIO {
     }
 
     @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.add("tableId", tableId);
+
+      if (options != null) {
+        builder.add("bigtableOptions", options.toString());
+      }
+    }
+
+    @Override
     public String toString() {
       return MoreObjects.toStringHelper(Write.class)
           .add("options", options)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 57312c0..43b1219 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.io.AvroIO.Write.Bound;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -256,4 +259,34 @@ public class AvroIOTest {
   }
   // TODO: for Write only, test withSuffix,
   // withShardNameTemplate and withoutSharding.
+
+  @Test
+  public void testReadDisplayData() {
+    AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(read);
+    assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
+
+  @Test
+  public void testWriteDisplayData() {
+    AvroIO.Write.Bound<?> write = AvroIO.Write
+        .to("foo")
+        .withShardNameTemplate("-SS-of-NN-")
+        .withSuffix("bar")
+        .withSchema(GenericClass.class)
+        .withNumShards(100)
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
+    assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
+    assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
+    assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
+    assertThat(displayData, hasDisplayItem("numShards", 100));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index e1f8e4d..63ff22c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -17,8 +17,12 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderException;
@@ -32,6 +36,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.BigQueryServices;
 import org.apache.beam.sdk.util.BigQueryServices.Status;
@@ -391,6 +396,24 @@ public class BigQueryIOTest {
   }
 
   @Test
+  public void testBuildSourceDisplayData() {
+    String tableSpec = "project:dataset.tableid";
+
+    BigQueryIO.Read.Bound read = BigQueryIO.Read
+        .from(tableSpec)
+        .fromQuery("myQuery")
+        .withoutResultFlattening()
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("table", tableSpec));
+    assertThat(displayData, hasDisplayItem("query", "myQuery"));
+    assertThat(displayData, hasDisplayItem("flattenResults", false));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
+
+  @Test
   public void testBuildSink() {
     BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
         .to("foo.com:project:somedataset.sometable");
@@ -502,6 +525,29 @@ public class BigQueryIOTest {
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
   }
 
+  @Test
+  public void testBuildSinkDisplayData() {
+    String tableSpec = "project:dataset.table";
+    TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
+
+    BigQueryIO.Write.Bound write = BigQueryIO.Write
+        .to(tableSpec)
+        .withSchema(schema)
+        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem(hasKey("table")));
+    assertThat(displayData, hasDisplayItem(hasKey("schema")));
+    assertThat(displayData,
+        hasDisplayItem("createDisposition", CreateDisposition.CREATE_IF_NEEDED.toString()));
+    assertThat(displayData,
+        hasDisplayItem("writeDisposition", WriteDisposition.WRITE_APPEND.toString()));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
+
   private void testWriteValidatesDataset(boolean streaming) {
     BigQueryOptions options = PipelineOptionsFactory.as(BigQueryOptions.class);
     options.setProject("someproject");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index b3b9b08..c08b6f8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -30,9 +31,11 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 
 import org.joda.time.Duration;
@@ -76,6 +79,13 @@ public class CountingInputTest {
   }
 
   @Test
+  public void testBoundedDisplayData() {
+    PTransform<?, ?> input = CountingInput.upTo(1234);
+    DisplayData displayData = DisplayData.from(input);
+    assertThat(displayData, hasDisplayItem("upTo", 1234));
+  }
+
+  @Test
   @Category(RunnableOnService.class)
   public void testUnboundedInput() {
     Pipeline p = TestPipeline.create();
@@ -138,6 +148,28 @@ public class CountingInputTest {
     p.run();
   }
 
+  @Test
+  public void testUnboundedDisplayData() {
+    Duration maxReadTime = Duration.standardHours(5);
+    SerializableFunction<Long, Instant> timestampFn = new SerializableFunction<Long, Instant>() {
+      @Override
+      public Instant apply(Long input) {
+        return Instant.now();
+      }
+    };
+
+    PTransform<?, ?> input = CountingInput.unbounded()
+        .withMaxNumRecords(1234)
+        .withMaxReadTime(maxReadTime)
+        .withTimestampFn(timestampFn);
+
+    DisplayData displayData = DisplayData.from(input);
+
+    assertThat(displayData, hasDisplayItem("maxRecords", 1234));
+    assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+    assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass()));
+  }
+
   /**
    * A timestamp function that uses the given value as the timestamp. Because the input values will
    * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 852ab7d..1e5bf51 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -17,12 +17,18 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.transforms.display.DisplayData;
 
 import com.google.api.client.testing.http.FixedClock;
 import com.google.api.client.util.Clock;
 import com.google.api.services.pubsub.model.PubsubMessage;
 
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -231,4 +237,42 @@ public class PubsubIOTest {
     // Year 10000 out of range.
     parseTimestamp("10000-10-29T23:41:41.123999Z");
   }
+
+  @Test
+  public void testReadDisplayData() {
+    String topic = "projects/project/topics/topic";
+    String subscription = "projects/project/subscriptions/subscription";
+    Duration maxReadTime = Duration.standardMinutes(5);
+    PubsubIO.Read.Bound<String> read = PubsubIO.Read
+        .topic(topic)
+        .subscription(subscription)
+        .timestampLabel("myTimestamp")
+        .idLabel("myId")
+        .maxNumRecords(1234)
+        .maxReadTime(maxReadTime);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("topic", topic));
+    assertThat(displayData, hasDisplayItem("subscription", subscription));
+    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
+    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+    assertThat(displayData, hasDisplayItem("maxNumRecords", 1234));
+    assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+  }
+
+  @Test
+  public void testWriteDisplayData() {
+    String topic = "projects/project/topics/topic";
+    PubsubIO.Write.Bound<?> write = PubsubIO.Write
+        .topic(topic)
+        .timestampLabel("myTimestamp")
+        .idLabel("myId");
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("topic", topic));
+    assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
+    assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 09b7d22..4d6d8dd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.TestUtils.INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
@@ -41,6 +42,7 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.PCollection;
@@ -155,6 +157,20 @@ public class TextIOTest {
     }
   }
 
+  @Test
+  public void testReadDisplayData() {
+    TextIO.Read.Bound<?> read = TextIO.Read
+        .from("foo.*")
+        .withCompressionType(CompressionType.BZIP2)
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+    assertThat(displayData, hasDisplayItem("compressionType", CompressionType.BZIP2.toString()));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
+
   <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
     runTestWrite(elems, coder, 1);
   }
@@ -275,6 +291,24 @@ public class TextIOTest {
   }
 
   @Test
+  public void testWriteDisplayData() {
+    TextIO.Write.Bound<?> write = TextIO.Write
+        .to("foo")
+        .withSuffix("bar")
+        .withShardNameTemplate("-SS-of-NN-")
+        .withNumShards(100)
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
+    assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
+    assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
+    assertThat(displayData, hasDisplayItem("numShards", 100));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
+
+  @Test
   public void testUnsupportedFilePattern() throws IOException {
     File outFolder = tmpFolder.newFolder();
     // Windows doesn't like resolving paths with * in them.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
index 135fcb8..7c176e4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
@@ -20,7 +20,10 @@ package org.apache.beam.sdk.io.bigtable;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
-import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
+import static org.apache.beam.sdk.testing.SourceTestUtils
+    .assertSplitAtFractionSucceedsAndConsistent;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Verify.verifyNotNull;
@@ -39,6 +42,7 @@ import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -407,6 +411,26 @@ public class BigtableIOTest {
     assertSourcesEqualReferenceSource(source, splits, null /* options */);
   }
 
+  @Test
+  public void testReadingDisplayData() {
+    RowFilter rowFilter = RowFilter.newBuilder()
+        .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
+        .build();
+
+    BigtableIO.Read read = BigtableIO.read()
+        .withBigtableOptions(BIGTABLE_OPTIONS)
+        .withTableId("fooTable")
+        .withRowFilter(rowFilter);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+    assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
+
+    // BigtableIO adds user-agent to options; assert only on key and not value.
+    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+  }
+
   /** Tests that a record gets written to the service and messages are logged. */
   @Test
   public void testWriting() throws Exception {
@@ -463,6 +487,19 @@ public class BigtableIOTest {
     p.run();
   }
 
+  @Test
+  public void testWritingDisplayData() {
+    BigtableIO.Write write = BigtableIO.write()
+        .withTableId("fooTable")
+        .withBigtableOptions(BIGTABLE_OPTIONS);
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+    // BigtableIO adds user-agent to options; assert only on key and not value.
+    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+  }
+
   ////////////////////////////////////////////////////////////////////////////////////////////
   private static final String COLUMN_FAMILY_NAME = "family";
   private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");