You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:54 UTC

[20/50] [abbrv] beam git commit: [BEAM-1914] XmlIO now complies with PTransform style guide

[BEAM-1914]�XmlIO now complies with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0c0a60c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0c0a60c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0c0a60c

Branch: refs/heads/DSL_SQL
Commit: d0c0a60c83a9d2a6caa29f91f89d8c0ee3b0eb93
Parents: 57929fb
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 16:25:42 2017 -0700
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Apr 19 10:34:46 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    |   4 +-
 .../main/java/org/apache/beam/sdk/io/XmlIO.java | 477 +++++++++++++++++++
 .../java/org/apache/beam/sdk/io/XmlSink.java    | 226 ++-------
 .../java/org/apache/beam/sdk/io/XmlSource.java  | 191 +-------
 .../sdk/transforms/display/DisplayData.java     |   6 +
 .../org/apache/beam/sdk/io/XmlSinkTest.java     |  89 ++--
 .../org/apache/beam/sdk/io/XmlSourceTest.java   | 248 ++++++----
 .../sdk/transforms/display/DisplayDataTest.java |  17 +
 8 files changed, 740 insertions(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index ecd0fd9..1d940cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -46,10 +46,10 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
  * A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate
  * {@link FileBasedSource} that is able to read the decompressed file format.
  *
- * <p>For example, use the following to read from a gzip-compressed XML file:
+ * <p>For example, use the following to read from a gzip-compressed file-based source:
  *
  * <pre> {@code
- * XmlSource mySource = XmlSource.from(...);
+ * FileBasedSource<T> mySource = ...;
  * PCollection<T> collection = p.apply(Read.from(CompressedSource
  *     .from(mySource)
  *     .withDecompression(CompressedSource.CompressionMode.GZIP)));

http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
new file mode 100644
index 0000000..a53fb86
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.Nullable;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/** Transforms for reading and writing XML files using JAXB mappers. */
+public class XmlIO {
+  // CHECKSTYLE.OFF: JavadocStyle
+  /**
+   * Reads XML files. This source reads one or more XML files and
+   * creates a {@link PCollection} of a given type. Please note the example given below.
+   *
+   * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML
+   * element names that are defined by the user:
+   *
+   * <pre>{@code
+   * <root>
+   * <record> ... </record>
+   * <record> ... </record>
+   * <record> ... </record>
+   * ...
+   * <record> ... </record>
+   * </root>
+   * }</pre>
+   *
+   * <p>Basically, the XML document should contain a single root element with an inner list
+   * consisting entirely of record elements. The records may contain arbitrary XML content; however,
+   * that content <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags.
+   * This restriction enables reading from large XML files in parallel from different offsets in the
+   * file.
+   *
+   * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.
+   * Additionally users must provide a class of a JAXB annotated Java type that can be used convert
+   * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms.
+   * Reading the source will generate a {@code PCollection} of the given JAXB annotated Java type.
+   * Optionally users may provide a minimum size of a bundle that should be created for the source.
+   *
+   * <p>The following example shows how to use this method in a Beam pipeline:
+   *
+   * <pre>{@code
+   * PCollection<String> output = p.apply(XmlIO.<Record>read()
+   *     .from(file.toPath().toString())
+   *     .withRootElement("root")
+   *     .withRecordElement("record")
+   *     .withRecordClass(Record.class));
+   * }</pre>
+   *
+   * <p>Currently, only XML files that use single-byte characters are supported. Using a file that
+   * contains multi-byte characters may result in data loss or duplication.
+   *
+   * <p>To use this method:
+   *
+   * <ol>
+   * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api
+   * <li>Include a compatible implementation on the classpath at run-time, such as
+   *     org.codehaus.woodstox:woodstox-core-asl
+   * </ol>
+   *
+   * <p>These dependencies have been declared as optional in the sdks/java/core/pom.xml file of
+   * Apache Beam.
+   *
+   * <h3>Permissions</h3>
+   * Permission requirements depend on the {@link org.apache.beam.sdk.runners.PipelineRunner
+   * PipelineRunner} that is used to execute the Beam pipeline. Please refer to the documentation of
+   * corresponding {@link PipelineRunner PipelineRunners} for more details.
+   *
+   * @param <T> Type of the objects that represent the records of the XML file. The {@code
+   *     PCollection} generated by this source will be of this type.
+   */
+  // CHECKSTYLE.ON: JavadocStyle
+  public static <T> Read<T> read() {
+    return new AutoValue_XmlIO_Read.Builder<T>()
+        .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE)
+        .setCompressionType(Read.CompressionType.AUTO)
+        .build();
+  }
+
+  // CHECKSTYLE.OFF: JavadocStyle
+  /**
+   * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of
+   * records from JAXB-annotated classes to a single file location.
+   *
+   * <p>Given a PCollection containing records of type T that can be marshalled to XML elements,
+   * this Sink will produce a single file consisting of a single root element that contains all of
+   * the elements in the PCollection.
+   *
+   * <p>XML Sinks are created with a base filename to write to, a root element name that will be
+   * used for the root element of the output files, and a class to bind to an XML element. This
+   * class will be used in the marshalling of records in an input PCollection to their XML
+   * representation and must be able to be bound using JAXB annotations (checked at pipeline
+   * construction time).
+   *
+   * <p>XML Sinks can be written to using the {@link Write} transform:
+   *
+   * <pre>{@code
+   * p.apply(XmlIO.<Type>write()
+   *      .withRecordClass(Type.class)
+   *      .withRootElement(root_element)
+   *      .toFilenamePrefix(output_filename));
+   * }</pre>
+   *
+   * <p>For example, consider the following class with JAXB annotations:
+   *
+   * <pre>
+   *  {@literal @}XmlRootElement(name = "word_count_result")
+   *  {@literal @}XmlType(propOrder = {"word", "frequency"})
+   *  public class WordFrequency {
+   *    private String word;
+   *    private long frequency;
+   *
+   *    public WordFrequency() { }
+   *
+   *    public WordFrequency(String word, long frequency) {
+   *      this.word = word;
+   *      this.frequency = frequency;
+   *    }
+   *
+   *    public void setWord(String word) {
+   *      this.word = word;
+   *    }
+   *
+   *    public void setFrequency(long frequency) {
+   *      this.frequency = frequency;
+   *    }
+   *
+   *    public long getFrequency() {
+   *      return frequency;
+   *    }
+   *
+   *    public String getWord() {
+   *      return word;
+   *    }
+   *  }
+   * </pre>
+   *
+   * <p>The following will produce XML output with a root element named "words" from a PCollection
+   * of WordFrequency objects:
+   *
+   * <pre>{@code
+   * p.apply(XmlIO.<WordFrequency>write()
+   *     .withRecordClass(WordFrequency.class)
+   *     .withRootElement("words")
+   *     .toFilenamePrefix(output_file));
+   * }</pre>
+   *
+   * <p>The output of which will look like:
+   *
+   * <pre>{@code
+   * <words>
+   *
+   *  <word_count_result>
+   *    <word>decreased</word>
+   *    <frequency>1</frequency>
+   *  </word_count_result>
+   *
+   *  <word_count_result>
+   *    <word>War</word>
+   *    <frequency>4</frequency>
+   *  </word_count_result>
+   *
+   *  <word_count_result>
+   *    <word>empress'</word>
+   *    <frequency>14</frequency>
+   *  </word_count_result>
+   *
+   *  <word_count_result>
+   *    <word>stoops</word>
+   *    <frequency>6</frequency>
+   *  </word_count_result>
+   *
+   *  ...
+   * </words>
+   * }</pre>
+   */
+  // CHECKSTYLE.ON: JavadocStyle
+  public static <T> Write<T> write() {
+    return new AutoValue_XmlIO_Write.Builder<T>().build();
+  }
+
+  /** Implementation of {@link #read}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
+
+    @Nullable
+    abstract String getFileOrPatternSpec();
+
+    @Nullable
+    abstract String getRootElement();
+
+    @Nullable
+    abstract String getRecordElement();
+
+    @Nullable
+    abstract Class<T> getRecordClass();
+
+    abstract CompressionType getCompressionType();
+
+    abstract long getMinBundleSize();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec);
+
+      abstract Builder<T> setRootElement(String rootElement);
+
+      abstract Builder<T> setRecordElement(String recordElement);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setMinBundleSize(long minBundleSize);
+
+      abstract Builder<T> setCompressionType(CompressionType compressionType);
+
+      abstract Read<T> build();
+    }
+
+    /** Strategy for determining the compression type of XML files being read. */
+    public enum CompressionType {
+      /** Automatically determine the compression type based on filename extension. */
+      AUTO(""),
+      /** Uncompressed (i.e., may be split). */
+      UNCOMPRESSED(""),
+      /** GZipped. */
+      GZIP(".gz"),
+      /** BZipped. */
+      BZIP2(".bz2"),
+      /** Zipped. */
+      ZIP(".zip"),
+      /** Deflate compressed. */
+      DEFLATE(".deflate");
+
+      private String filenameSuffix;
+
+      CompressionType(String suffix) {
+        this.filenameSuffix = suffix;
+      }
+
+      /**
+       * Determine if a given filename matches a compression type based on its extension.
+       * @param filename the filename to match
+       * @return true iff the filename ends with the compression type's known extension.
+       */
+      public boolean matches(String filename) {
+        return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
+      }
+    }
+
+    /**
+     * Reads a single XML file or a set of XML files defined by a Java "glob"
+     * file pattern. Each XML file should be of the form defined in {@link #read}.
+     */
+    public Read<T> from(String fileOrPatternSpec) {
+      return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build();
+    }
+
+    /**
+     * Sets name of the root element of the XML document. This will be used to create a valid
+     * starting root element when initiating a bundle of records created from an XML document. This
+     * is a required parameter.
+     */
+    public Read<T> withRootElement(String rootElement) {
+      return toBuilder().setRootElement(rootElement).build();
+    }
+
+    /**
+     * Sets name of the record element of the XML document. This will be used to determine offset of
+     * the first record of a bundle created from the XML document. This is a required parameter.
+     */
+    public Read<T> withRecordElement(String recordElement) {
+      return toBuilder().setRecordElement(recordElement).build();
+    }
+
+    /**
+     * Sets a JAXB annotated class that can be populated using a record of the provided XML file.
+     * This will be used when unmarshalling record objects from the XML file. This is a required
+     * parameter.
+     */
+    public Read<T> withRecordClass(Class<T> recordClass) {
+      return toBuilder().setRecordClass(recordClass).build();
+    }
+
+    /**
+     * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please
+     * refer to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional
+     * parameter.
+     */
+    public Read<T> withMinBundleSize(long minBundleSize) {
+      return toBuilder().setMinBundleSize(minBundleSize).build();
+    }
+
+    /**
+     * Decompresses all input files using the specified compression type.
+     *
+     * <p>If no compression type is specified, the default is {@link CompressionType#AUTO}.
+     * In this mode, the compression type of the file is determined by its extension.
+     * Supports .gz, .bz2, .zip and .deflate compression.
+     */
+    public Read<T> withCompressionType(CompressionType compressionType) {
+      return toBuilder().setCompressionType(compressionType).build();
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      checkNotNull(
+          getRootElement(),
+          "rootElement is null. Use builder method withRootElement() to set this.");
+      checkNotNull(
+          getRecordElement(),
+          "recordElement is null. Use builder method withRecordElement() to set this.");
+      checkNotNull(
+          getRecordClass(),
+          "recordClass is null. Use builder method withRecordClass() to set this.");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .addIfNotDefault(
+              DisplayData.item("minBundleSize", getMinBundleSize())
+                  .withLabel("Minimum Bundle Size"),
+              1L)
+          .add(DisplayData.item("filePattern", getFileOrPatternSpec()).withLabel("File Pattern"))
+          .addIfNotNull(
+              DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element"))
+          .addIfNotNull(
+              DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element"))
+          .addIfNotNull(
+              DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class"));
+    }
+
+    @VisibleForTesting
+    BoundedSource<T> createSource() {
+      XmlSource<T> source = new XmlSource<>(this);
+      switch (getCompressionType()) {
+        case UNCOMPRESSED:
+          return source;
+        case AUTO:
+          return CompressedSource.from(source);
+        case BZIP2:
+          return CompressedSource.from(source)
+              .withDecompression(CompressedSource.CompressionMode.BZIP2);
+        case GZIP:
+          return CompressedSource.from(source)
+              .withDecompression(CompressedSource.CompressionMode.GZIP);
+        case ZIP:
+          return CompressedSource.from(source)
+              .withDecompression(CompressedSource.CompressionMode.ZIP);
+        case DEFLATE:
+          return CompressedSource.from(source)
+              .withDecompression(CompressedSource.CompressionMode.DEFLATE);
+        default:
+          throw new IllegalArgumentException("Unknown compression type: " + getCompressionType());
+      }
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      return input.apply(org.apache.beam.sdk.io.Read.from(createSource()));
+    }
+  }
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract String getFilenamePrefix();
+
+    @Nullable
+    abstract Class<T> getRecordClass();
+
+    @Nullable
+    abstract String getRootElement();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilenamePrefix(String baseOutputFilename);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setRootElement(String rootElement);
+
+      abstract Write<T> build();
+    }
+
+
+    /**
+     * Writes to files with the given path prefix.
+     *
+     * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
+     * the number of output bundles.
+     */
+    public Write<T> toFilenamePrefix(String filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    }
+
+    /**
+     * Writes objects of the given class mapped to XML elements using JAXB.
+     *
+     * <p>The specified class must be able to be used to create a JAXB context.
+     */
+    public Write<T> withRecordClass(Class<T> recordClass) {
+      return toBuilder().setRecordClass(recordClass).build();
+    }
+
+    /**
+     * Sets the enclosing root element for the generated XML files.
+     */
+    public Write<T> withRootElement(String rootElement) {
+      return toBuilder().setRootElement(rootElement).build();
+    }
+
+    @Override
+    public void validate(PCollection<T> input) {
+      checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context.");
+      checkNotNull(getRootElement(), "Missing a root element name.");
+      checkNotNull(getFilenamePrefix(), "Missing a filename to write to.");
+      try {
+        JAXBContext.newInstance(getRecordClass());
+      } catch (JAXBException e) {
+        throw new RuntimeException("Error binding classes to a JAXB Context.", e);
+      }
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      return input.apply(org.apache.beam.sdk.io.Write.to(createSink()));
+    }
+
+    @VisibleForTesting
+    XmlSink<T> createSink() {
+      return new XmlSink<>(this);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      createSink().populateFileBasedDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("rootElement", getRootElement())
+              .withLabel("XML Root Element"))
+          .addIfNotNull(DisplayData.item("recordClass", getRecordClass())
+              .withLabel("XML Record Class"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
index 2159c8f..7700329 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
@@ -17,226 +17,58 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import java.io.OutputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PCollection;
 
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of
- * records from JAXB-annotated classes to a single file location.
- *
- * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, this
- * Sink will produce a single file consisting of a single root element that contains all of the
- * elements in the PCollection.
- *
- * <p>XML Sinks are created with a base filename to write to, a root element name that will be used
- * for the root element of the output files, and a class to bind to an XML element. This class
- * will be used in the marshalling of records in an input PCollection to their XML representation
- * and must be able to be bound using JAXB annotations (checked at pipeline construction time).
- *
- * <p>XML Sinks can be written to using the {@link Write} transform:
- *
- * <pre>
- * p.apply(Write.to(
- *      XmlSink.ofRecordClass(Type.class)
- *          .withRootElementName(root_element)
- *          .toFilenamePrefix(output_filename)));
- * </pre>
- *
- * <p>For example, consider the following class with JAXB annotations:
- *
- * <pre>
- *  {@literal @}XmlRootElement(name = "word_count_result")
- *  {@literal @}XmlType(propOrder = {"word", "frequency"})
- *  public class WordFrequency {
- *    private String word;
- *    private long frequency;
- *
- *    public WordFrequency() { }
- *
- *    public WordFrequency(String word, long frequency) {
- *      this.word = word;
- *      this.frequency = frequency;
- *    }
- *
- *    public void setWord(String word) {
- *      this.word = word;
- *    }
- *
- *    public void setFrequency(long frequency) {
- *      this.frequency = frequency;
- *    }
- *
- *    public long getFrequency() {
- *      return frequency;
- *    }
- *
- *    public String getWord() {
- *      return word;
- *    }
- *  }
- * </pre>
- *
- * <p>The following will produce XML output with a root element named "words" from a PCollection of
- * WordFrequency objects:
- * <pre>
- * p.apply(Write.to(
- *  XmlSink.ofRecordClass(WordFrequency.class)
- *      .withRootElement("words")
- *      .toFilenamePrefix(output_file)));
- * </pre>
- *
- * <p>The output of which will look like:
- * <pre>
- * {@code
- * <words>
- *
- *  <word_count_result>
- *    <word>decreased</word>
- *    <frequency>1</frequency>
- *  </word_count_result>
- *
- *  <word_count_result>
- *    <word>War</word>
- *    <frequency>4</frequency>
- *  </word_count_result>
- *
- *  <word_count_result>
- *    <word>empress'</word>
- *    <frequency>14</frequency>
- *  </word_count_result>
- *
- *  <word_count_result>
- *    <word>stoops</word>
- *    <frequency>6</frequency>
- *  </word_count_result>
- *
- *  ...
- * </words>
- * }</pre>
- */
-// CHECKSTYLE.ON: JavadocStyle
-@SuppressWarnings("checkstyle:javadocstyle")
-public class XmlSink {
+/** Implementation of {@link XmlIO#write}. */
+class XmlSink<T> extends FileBasedSink<T> {
   protected static final String XML_EXTENSION = "xml";
 
-  /**
-   * Returns a builder for an XmlSink. You'll need to configure the class to bind, the root
-   * element name, and the output file prefix with {@link Bound#ofRecordClass}, {@link
-   * Bound#withRootElement}, and {@link Bound#toFilenamePrefix}, respectively.
-   */
-  public static Bound<?> write() {
-    return new Bound<>(null, null, null);
+  private final XmlIO.Write<T> spec;
+
+  XmlSink(XmlIO.Write<T> spec) {
+    super(spec.getFilenamePrefix(), XML_EXTENSION);
+    this.spec = spec;
   }
 
   /**
-   * Returns an XmlSink that writes objects as XML entities.
-   *
-   * <p>Output files will have the name {@literal {baseOutputFilename}-0000i-of-0000n.xml} where n
-   * is the number of output bundles.
-   *
-   * @param klass the class of the elements to write.
-   * @param rootElementName the enclosing root element.
-   * @param baseOutputFilename the output filename prefix.
+   * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have
+   * been set and that the class can be bound in a JAXB context.
    */
-  public static <T> Bound<T> writeOf(
-      Class<T> klass, String rootElementName, String baseOutputFilename) {
-    return new Bound<>(klass, rootElementName, baseOutputFilename);
+  @Override
+  public void validate(PipelineOptions options) {
+    spec.validate(null);
   }
 
   /**
-   * A {@link FileBasedSink} that writes objects as XML elements.
+   * Creates an {@link XmlWriteOperation}.
    */
-  public static class Bound<T> extends FileBasedSink<T> {
-    final Class<T> classToBind;
-    final String rootElementName;
-
-    private Bound(Class<T> classToBind, String rootElementName, String baseOutputFilename) {
-      super(baseOutputFilename, XML_EXTENSION);
-      this.classToBind = classToBind;
-      this.rootElementName = rootElementName;
-    }
-
-    /**
-     * Returns an XmlSink that writes objects of the class specified as XML elements.
-     *
-     * <p>The specified class must be able to be used to create a JAXB context.
-     */
-    public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
-      return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
-    }
-
-    /**
-     * Returns an XmlSink that writes to files with the given prefix.
-     *
-     * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
-     * the number of output bundles.
-     */
-    public Bound<T> toFilenamePrefix(String baseOutputFilename) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename);
-    }
-
-    /**
-     * Returns an XmlSink that writes XML files with an enclosing root element of the
-     * supplied name.
-     */
-    public Bound<T> withRootElement(String rootElementName) {
-      return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
-    }
-
-    /**
-     * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have
-     * been set and that the class can be bound in a JAXB context.
-     */
-    @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(classToBind, "Missing a class to bind to a JAXB context.");
-      checkNotNull(rootElementName, "Missing a root element name.");
-      checkNotNull(getBaseOutputFilenameProvider().get(), "Missing a filename to write to.");
-      try {
-        JAXBContext.newInstance(classToBind);
-      } catch (JAXBException e) {
-        throw new RuntimeException("Error binding classes to a JAXB Context.", e);
-      }
-    }
+  @Override
+  public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
+    return new XmlWriteOperation<>(this);
+  }
 
-    /**
-     * Creates an {@link XmlWriteOperation}.
-     */
-    @Override
-    public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
-      return new XmlWriteOperation<>(this);
-    }
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    spec.populateDisplayData(builder);
+  }
 
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("rootElement", rootElementName)
-            .withLabel("XML Root Element"))
-          .addIfNotNull(DisplayData.item("recordClass", classToBind)
-            .withLabel("XML Record Class"));
-    }
+  void populateFileBasedDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
   }
 
   /**
    * {@link Sink.WriteOperation} for XML {@link Sink}s.
    */
   protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> {
-    public XmlWriteOperation(XmlSink.Bound<T> sink) {
+    public XmlWriteOperation(XmlSink<T> sink) {
       super(sink);
     }
 
@@ -247,7 +79,7 @@ public class XmlSink {
     public XmlWriter<T> createWriter(PipelineOptions options) throws Exception {
       JAXBContext context;
       Marshaller marshaller;
-      context = JAXBContext.newInstance(getSink().classToBind);
+      context = JAXBContext.newInstance(getSink().spec.getRecordClass());
       marshaller = context.createMarshaller();
       marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
       marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
@@ -259,8 +91,8 @@ public class XmlSink {
      * Return the XmlSink.Bound for this write operation.
      */
     @Override
-    public XmlSink.Bound<T> getSink() {
-      return (XmlSink.Bound<T>) super.getSink();
+    public XmlSink<T> getSink() {
+      return (XmlSink<T>) super.getSink();
     }
   }
 
@@ -289,7 +121,7 @@ public class XmlSink {
      */
     @Override
     protected void writeHeader() throws Exception {
-      String rootElementName = getWriteOperation().getSink().rootElementName;
+      String rootElementName = getWriteOperation().getSink().spec.getRootElement();
       os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + rootElementName + ">\n"));
     }
 
@@ -298,7 +130,7 @@ public class XmlSink {
      */
     @Override
     protected void writeFooter() throws Exception {
-      String rootElementName = getWriteOperation().getSink().rootElementName;
+      String rootElementName = getWriteOperation().getSink().spec.getRootElement();
       os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + rootElementName + ">"));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
index 6bf2015..7416c85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -45,154 +43,29 @@ import javax.xml.stream.XMLStreamReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.JAXBCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
 import org.codehaus.stax2.XMLInputFactory2;
 
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A source that can be used to read XML files. This source reads one or more
- * XML files and creates a {@link PCollection} of a given type. A {@link Read} transform can be
- * created by passing an {@link XmlSource} object to {@link Read#from}. Please note the
- * example given below.
- *
- * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML
- * element names that are defined by the user:
- *
- * <pre>
- * {@code
- * <root>
- * <record> ... </record>
- * <record> ... </record>
- * <record> ... </record>
- * ...
- * <record> ... </record>
- * </root>
- * }
- * </pre>
- *
- * <p>Basically, the XML document should contain a single root element with an inner list consisting
- * entirely of record elements. The records may contain arbitrary XML content; however, that content
- * <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. This
- * restriction enables reading from large XML files in parallel from different offsets in the file.
- *
- * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.
- * Additionally users must provide a class of a JAXB annotated Java type that can be used convert
- * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. Reading
- * the source will generate a {@code PCollection} of the given JAXB annotated Java type.
- * Optionally users may provide a minimum size of a bundle that should be created for the source.
- *
- * <p>The following example shows how to read from {@link XmlSource} in a Beam pipeline:
- *
- * <pre>
- * {@code
- * XmlSource<String> source = XmlSource.<String>from(file.toPath().toString())
- *     .withRootElement("root")
- *     .withRecordElement("record")
- *     .withRecordClass(Record.class);
- * PCollection<String> output = p.apply(Read.from(source));
- * }
- * </pre>
- *
- * <p>Currently, only XML files that use single-byte characters are supported. Using a file that
- * contains multi-byte characters may result in data loss or duplication.
- *
- * <p>To use {@link XmlSource}:
- * <ol>
- *   <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api</li>
- *   <li>Include a compatible implementation on the classpath at run-time,
- *       such as org.codehaus.woodstox:woodstox-core-asl</li>
- * </ol>
- *
- * <p>These dependencies have been declared as optional in the sdks/java/core/pom.xml file of
- * Apache Beam.
- *
- * <h3>Permissions</h3>
- * Permission requirements depend on the
- * {@link org.apache.beam.sdk.runners.PipelineRunner PipelineRunner} that is
- * used to execute the Beam pipeline. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- *
- * @param <T> Type of the objects that represent the records of the XML file. The
- *        {@code PCollection} generated by this source will be of this type.
- */
-// CHECKSTYLE.ON: JavadocStyle
+/** Implementation of {@link XmlIO#read}. */
 public class XmlSource<T> extends FileBasedSource<T> {
 
   private static final String XML_VERSION = "1.1";
-  private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
-  private final String rootElement;
-  private final String recordElement;
-  private final Class<T> recordClass;
-
-  /**
-   * Creates an XmlSource for a single XML file or a set of XML files defined by a Java "glob" file
-   * pattern. Each XML file should be of the form defined in {@link XmlSource}.
-   */
-  public static <T> XmlSource<T> from(String fileOrPatternSpec) {
-    return new XmlSource<>(fileOrPatternSpec, DEFAULT_MIN_BUNDLE_SIZE, null, null, null);
-  }
-
-  /**
-   * Sets name of the root element of the XML document. This will be used to create a valid starting
-   * root element when initiating a bundle of records created from an XML document. This is a
-   * required parameter.
-   */
-  public XmlSource<T> withRootElement(String rootElement) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
-  }
 
-  /**
-   * Sets name of the record element of the XML document. This will be used to determine offset of
-   * the first record of a bundle created from the XML document. This is a required parameter.
-   */
-  public XmlSource<T> withRecordElement(String recordElement) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
-  }
+  private final XmlIO.Read<T> spec;
 
-  /**
-   * Sets a JAXB annotated class that can be populated using a record of the provided XML file. This
-   * will be used when unmarshalling record objects from the XML file.  This is a required
-   * parameter.
-   */
-  public XmlSource<T> withRecordClass(Class<T> recordClass) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
+  XmlSource(XmlIO.Read<T> spec) {
+    super(spec.getFileOrPatternSpec(), spec.getMinBundleSize());
+    this.spec = spec;
   }
 
-  /**
-   * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please refer
-   * to {@link OffsetBasedSource} for the definition of minBundleSize.  This is an optional
-   * parameter.
-   */
-  public XmlSource<T> withMinBundleSize(long minBundleSize) {
-    return new XmlSource<>(
-        getFileOrPatternSpec(), minBundleSize, rootElement, recordElement, recordClass);
-  }
-
-  private XmlSource(String fileOrPattern, long minBundleSize, String rootElement,
-      String recordElement, Class<T> recordClass) {
-    super(fileOrPattern, minBundleSize);
-    this.rootElement = rootElement;
-    this.recordElement = recordElement;
-    this.recordClass = recordClass;
-  }
-
-  private XmlSource(String fileOrPattern, long minBundleSize, long startOffset, long endOffset,
-      String rootElement, String recordElement, Class<T> recordClass) {
-    super(fileOrPattern, minBundleSize, startOffset, endOffset);
-    this.rootElement = rootElement;
-    this.recordElement = recordElement;
-    this.recordClass = recordClass;
+  private XmlSource(XmlIO.Read<T> spec, long startOffset, long endOffset) {
+    super(spec.getFileOrPatternSpec(), spec.getMinBundleSize(), startOffset, endOffset);
+    this.spec = spec;
   }
 
   @Override
   protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
-    return new XmlSource<T>(
-        fileName, getMinBundleSize(), start, end, rootElement, recordElement, recordClass);
+    return new XmlSource<T>(spec.from(fileName), start, end);
   }
 
   @Override
@@ -203,42 +76,17 @@ public class XmlSource<T> extends FileBasedSource<T> {
   @Override
   public void validate() {
     super.validate();
-    checkNotNull(
-        rootElement, "rootElement is null. Use builder method withRootElement() to set this.");
-    checkNotNull(
-        recordElement,
-        "recordElement is null. Use builder method withRecordElement() to set this.");
-    checkNotNull(
-        recordClass, "recordClass is null. Use builder method withRecordClass() to set this.");
+    spec.validate(null);
   }
 
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    builder
-        .addIfNotNull(DisplayData.item("rootElement", rootElement)
-          .withLabel("XML Root Element"))
-        .addIfNotNull(DisplayData.item("recordElement", recordElement)
-          .withLabel("XML Record Element"))
-        .addIfNotNull(DisplayData.item("recordClass", recordClass)
-          .withLabel("XML Record Class"));
+    spec.populateDisplayData(builder);
   }
 
   @Override
   public Coder<T> getDefaultOutputCoder() {
-    return JAXBCoder.of(recordClass);
-  }
-
-  public String getRootElement() {
-    return rootElement;
-  }
-
-  public String getRecordElement() {
-    return recordElement;
-  }
-
-  public Class<T> getRecordClass() {
-    return recordClass;
+    return JAXBCoder.of(spec.getRecordClass());
   }
 
   /**
@@ -289,7 +137,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
 
       // Set up a JAXB Unmarshaller that can be used to unmarshall record objects.
       try {
-        JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().recordClass);
+        JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().spec.getRecordClass());
         jaxbUnmarshaller = jaxbContext.createUnmarshaller();
 
         // Throw errors if validation fails. JAXB by default ignores validation errors.
@@ -334,8 +182,10 @@ public class XmlSource<T> extends FileBasedSource<T> {
       // this XML parsing may fail or may produce incorrect results.
 
       byte[] dummyStartDocumentBytes =
-          ("<?xml version=\"" + XML_VERSION + "\" encoding=\"UTF-8\" ?>"
-              + "<" + getCurrentSource().rootElement + ">").getBytes(StandardCharsets.UTF_8);
+          (String.format(
+                  "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>",
+                  XML_VERSION, getCurrentSource().spec.getRootElement()))
+              .getBytes(StandardCharsets.UTF_8);
       preambleByteBuffer.write(dummyStartDocumentBytes);
       // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This
       // method returns the offset and stores any bytes that should be used when creating the XML
@@ -383,7 +233,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
 
       ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
       byte[] recordStartBytes =
-          ("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8);
+          ("<" + getCurrentSource().spec.getRecordElement()).getBytes(StandardCharsets.UTF_8);
 
       outer: while (channel.read(buf) > 0) {
         buf.flip();
@@ -494,7 +344,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
           int event = parser.next();
           if (event == XMLStreamConstants.START_ELEMENT) {
             String localName = parser.getLocalName();
-            if (localName.equals(getCurrentSource().recordElement)) {
+            if (localName.equals(getCurrentSource().spec.getRecordElement())) {
               break;
             }
           }
@@ -521,7 +371,8 @@ public class XmlSource<T> extends FileBasedSource<T> {
             return false;
           }
         }
-        JAXBElement<T> jb = jaxbUnmarshaller.unmarshal(parser, getCurrentSource().recordClass);
+        JAXBElement<T> jb =
+            jaxbUnmarshaller.unmarshal(parser, getCurrentSource().spec.getRecordClass());
         currentRecord = jb.getValue();
         return true;
       } catch (JAXBException | XMLStreamException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 669dc6d..3c4337b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -778,6 +778,12 @@ public class DisplayData implements Serializable {
       visitedComponents.add(subComponent);
       visitedPathMap.put(path, subComponent);
       Class<?> namespace = subComponent.getClass();
+      // Common case: AutoValue classes such as AutoValue_FooIO_Read. It's more useful
+      // to show the user the FooIO.Read class, which is the direct superclass of the AutoValue
+      // generated class.
+      if (namespace.getSimpleName().startsWith("AutoValue_")) {
+        namespace = namespace.getSuperclass();
+      }
 
       Path prevPath = latestPath;
       Class<?> prevNs = latestNs;

http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 63b5d11..7f559d1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -59,7 +59,6 @@ public class XmlSinkTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  private Class<Bird> testClass = Bird.class;
   private String testRootElement = "testElement";
   private String testFilePrefix = "/path/to/testPrefix";
 
@@ -70,7 +69,12 @@ public class XmlSinkTest {
   public void testXmlWriter() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     XmlWriteOperation<Bird> writeOp =
-        XmlSink.writeOf(Bird.class, "birds", testFilePrefix).createWriteOperation(options);
+        XmlIO.<Bird>write()
+            .toFilenamePrefix(testFilePrefix)
+            .withRecordClass(Bird.class)
+            .withRootElement("birds")
+            .createSink()
+            .createWriteOperation(options);
     XmlWriter<Bird> writer = writeOp.createWriter(options);
 
     List<Bird> bundle =
@@ -85,51 +89,37 @@ public class XmlSinkTest {
    * Builder methods correctly initialize an XML Sink.
    */
   @Test
-  public void testBuildXmlSink() {
-    XmlSink.Bound<Bird> sink =
-        XmlSink.write()
+  public void testBuildXmlWriteTransform() {
+    XmlIO.Write<Bird> write =
+        XmlIO.<Bird>write()
             .toFilenamePrefix(testFilePrefix)
-            .ofRecordClass(testClass)
+            .withRecordClass(Bird.class)
             .withRootElement(testRootElement);
-    assertEquals(testClass, sink.classToBind);
-    assertEquals(testRootElement, sink.rootElementName);
-    assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
+    assertEquals(Bird.class, write.getRecordClass());
+    assertEquals(testRootElement, write.getRootElement());
+    assertEquals(testFilePrefix, write.getFilenamePrefix());
   }
 
-  /**
-   * Alternate builder method correctly initializes an XML Sink.
-   */
+  /** Validation ensures no fields are missing. */
   @Test
-  public void testBuildXmlSinkDirect() {
-    XmlSink.Bound<Bird> sink =
-        XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix);
-    assertEquals(testClass, sink.classToBind);
-    assertEquals(testRootElement, sink.rootElementName);
-    assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
+  public void testValidateXmlSinkMissingRecordClass() {
+    thrown.expect(NullPointerException.class);
+    XmlIO.<Bird>write()
+        .withRootElement(testRootElement)
+        .toFilenamePrefix(testFilePrefix)
+        .validate(null);
   }
 
-  /**
-   * Validation ensures no fields are missing.
-   */
   @Test
-  public void testValidateXmlSinkMissingFields() {
-    XmlSink.Bound<Bird> sink;
-    sink = XmlSink.writeOf(null, testRootElement, testFilePrefix);
-    validateAndFailIfSucceeds(sink, NullPointerException.class);
-    sink = XmlSink.writeOf(testClass, null, testFilePrefix);
-    validateAndFailIfSucceeds(sink, NullPointerException.class);
-    sink = XmlSink.writeOf(testClass, testRootElement, null);
-    validateAndFailIfSucceeds(sink, NullPointerException.class);
+  public void testValidateXmlSinkMissingRootElement() {
+    thrown.expect(NullPointerException.class);
+    XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null);
   }
 
-  /**
-   * Call validate and fail if validation does not throw the expected exception.
-   */
-  private <T> void validateAndFailIfSucceeds(
-      XmlSink.Bound<T> sink, Class<? extends Exception> expected) {
-    thrown.expect(expected);
-    PipelineOptions options = PipelineOptionsFactory.create();
-    sink.validate(options);
+  @Test
+  public void testValidateXmlSinkMissingFilePrefix() {
+    thrown.expect(NullPointerException.class);
+    XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null);
   }
 
   /**
@@ -138,13 +128,13 @@ public class XmlSinkTest {
   @Test
   public void testCreateWriteOperations() {
     PipelineOptions options = PipelineOptionsFactory.create();
-    XmlSink.Bound<Bird> sink =
-        XmlSink.writeOf(testClass, testRootElement, testFilePrefix);
+    XmlSink<Bird> sink =
+        XmlIO.<Bird>write()
+            .withRecordClass(Bird.class)
+            .withRootElement(testRootElement)
+            .toFilenamePrefix(testFilePrefix)
+            .createSink();
     XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
-    assertEquals(testClass, writeOp.getSink().classToBind);
-    assertEquals(testFilePrefix, writeOp.getSink().getBaseOutputFilenameProvider().get());
-    assertEquals(testRootElement, writeOp.getSink().rootElementName);
-   // assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().getFilenamePolicy().extension);
     Path outputPath = new File(testFilePrefix).toPath();
     Path tempPath = new File(writeOp.tempDirectory.get()).toPath();
     assertEquals(outputPath.getParent(), tempPath.getParent());
@@ -159,7 +149,11 @@ public class XmlSinkTest {
   public void testCreateWriter() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     XmlWriteOperation<Bird> writeOp =
-        XmlSink.writeOf(testClass, testRootElement, testFilePrefix)
+        XmlIO.<Bird>write()
+            .withRecordClass(Bird.class)
+            .withRootElement(testRootElement)
+            .toFilenamePrefix(testFilePrefix)
+            .createSink()
             .createWriteOperation(options);
     XmlWriter<Bird> writer = writeOp.createWriter(options);
     Path outputPath = new File(testFilePrefix).toPath();
@@ -167,18 +161,17 @@ public class XmlSinkTest {
     assertEquals(outputPath.getParent(), tempPath.getParent());
     assertThat(
         tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
-    assertEquals(testRootElement, writer.getWriteOperation().getSink().rootElementName);
     assertNotNull(writer.marshaller);
   }
 
   @Test
   public void testDisplayData() {
-    XmlSink.Bound<Integer> sink = XmlSink.write()
+    XmlIO.Write<Integer> write = XmlIO.<Integer>write()
         .toFilenamePrefix("foobar")
         .withRootElement("bird")
-        .ofRecordClass(Integer.class);
+        .withRecordClass(Integer.class);
 
-    DisplayData displayData = DisplayData.from(sink);
+    DisplayData displayData = DisplayData.from(write);
 
     assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml"));
     assertThat(displayData, hasDisplayItem("rootElement", "bird"));

http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
index 5f71f30..0120b8b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
@@ -285,12 +285,14 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLTiny");
     Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
+            .withMinBundleSize(1024)
+            .createSource();
 
     List<Train> expectedResults = ImmutableList.of(
         new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
@@ -308,12 +310,14 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLTiny");
     Files.write(file.toPath(), xmlWithMultiByteChars.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
+            .withMinBundleSize(1024)
+            .createSource();
 
     List<Train> expectedResults = ImmutableList.of(
         new Train("Thomas�", Train.TRAIN_NUMBER_UNDEFINED, null, null),
@@ -334,12 +338,14 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLTiny");
     Files.write(file.toPath(), xmlWithMultiByteElementName.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("\u0daf\u0dd4\u0db8\u0dca\u0dbb\u0dd2\u0dba\u0db1\u0dca")
             .withRecordElement("\u0daf\u0dd4\u0db8\u0dca\u0dbb\u0dd2\u0dba")
             .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
+            .withMinBundleSize(1024)
+            .createSource();
 
     List<Train> expectedResults = ImmutableList.of(
         new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
@@ -357,18 +363,20 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLTiny");
     Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(10);
-    List<? extends FileBasedSource<Train>> splits = source.split(50, null);
+            .withMinBundleSize(10)
+            .createSource();
+    List<? extends BoundedSource<Train>> splits = source.split(50, null);
 
     assertTrue(splits.size() > 2);
 
     List<Train> results = new ArrayList<>();
-    for (FileBasedSource<Train> split : splits) {
+    for (BoundedSource<Train> split : splits) {
       results.addAll(readEverythingFromReader(split.createReader(null)));
     }
 
@@ -394,12 +402,14 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
+            .withMinBundleSize(1024)
+            .createSource();
 
     List<Train> expectedResults =
         ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null),
@@ -417,10 +427,12 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRecordElement("train")
-            .withRecordClass(Train.class);
+            .withRecordClass(Train.class)
+            .createSource();
 
     exception.expect(NullPointerException.class);
     exception.expectMessage(
@@ -433,10 +445,12 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
-            .withRecordClass(Train.class);
+            .withRecordClass(Train.class)
+            .createSource();
 
     exception.expect(NullPointerException.class);
     exception.expectMessage(
@@ -449,10 +463,12 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
-            .withRecordElement("train");
+            .withRecordElement("train")
+            .createSource();
 
     exception.expect(NullPointerException.class);
     exception.expectMessage(
@@ -465,11 +481,13 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("something")
             .withRecordElement("train")
-            .withRecordClass(Train.class);
+            .withRecordClass(Train.class)
+            .createSource();
 
     exception.expectMessage("Unexpected close tag </trains>; expected </something>.");
     readEverythingFromReader(source.createReader(null));
@@ -480,11 +498,13 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("something")
-            .withRecordClass(Train.class);
+            .withRecordClass(Train.class)
+            .createSource();
 
     assertEquals(readEverythingFromReader(source.createReader(null)), new ArrayList<Train>());
   }
@@ -500,11 +520,13 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<WrongTrainType> source =
-        XmlSource.<WrongTrainType>from(file.toPath().toString())
+    BoundedSource<WrongTrainType> source =
+        XmlIO.<WrongTrainType>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
-            .withRecordClass(WrongTrainType.class);
+            .withRecordClass(WrongTrainType.class)
+            .createSource();
 
     exception.expect(RuntimeException.class);
 
@@ -525,11 +547,13 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
-            .withRecordClass(Train.class);
+            .withRecordClass(Train.class)
+            .createSource();
 
     List<Train> expectedResults =
         ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null),
@@ -548,12 +572,14 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXMLWithEmptyTags.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
+            .withMinBundleSize(1024)
+            .createSource();
 
     List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null),
         new Train("Henry", 3, "green", null), new Train("Toby", 7, "brown", null),
@@ -572,14 +598,15 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
-
-    PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
+    PCollection<Train> output =
+        p.apply(
+            "ReadFileData",
+            XmlIO.<Train>read()
+                .from(file.toPath().toString())
+                .withRootElement("trains")
+                .withRecordElement("train")
+                .withRecordClass(Train.class)
+                .withMinBundleSize(1024));
 
     List<Train> expectedResults =
         ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null),
@@ -595,12 +622,14 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXMLWithAttributes.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
+            .withMinBundleSize(1024)
+            .createSource();
 
     List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", "small"),
         new Train("Henry", 3, "green", "big"), new Train("Toby", 7, "brown", "small"),
@@ -618,12 +647,14 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXMLWithSpaces.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
+            .withMinBundleSize(1024)
+            .createSource();
 
     List<Train> expectedResults = ImmutableList.of(new Train("Thomas   ", 1, "blue", null),
         new Train("Henry", 3, "green", null), new Train("Toby", 7, "  brown  ", null),
@@ -642,12 +673,14 @@ public class XmlSourceTest {
     List<Train> trains = generateRandomTrainList(100);
     File file = createRandomTrainXML(fileName, trains);
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
+            .withMinBundleSize(1024)
+            .createSource();
 
     assertThat(
         trainsToStrings(trains),
@@ -662,13 +695,15 @@ public class XmlSourceTest {
     List<Train> trains = generateRandomTrainList(100);
     File file = createRandomTrainXML(fileName, trains);
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
-    PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
+    PCollection<Train> output =
+        p.apply(
+            "ReadFileData",
+            XmlIO.<Train>read()
+                .from(file.toPath().toString())
+                .withRootElement("trains")
+                .withRecordElement("train")
+                .withRecordClass(Train.class)
+                .withMinBundleSize(1024));
 
     PAssert.that(output).containsInAnyOrder(trains);
     p.run();
@@ -680,18 +715,20 @@ public class XmlSourceTest {
     List<Train> trains = generateRandomTrainList(10);
     File file = createRandomTrainXML(fileName, trains);
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(10);
-    List<? extends FileBasedSource<Train>> splits = source.split(100, null);
+            .withMinBundleSize(10)
+            .createSource();
+    List<? extends BoundedSource<Train>> splits = source.split(100, null);
 
     assertTrue(splits.size() > 2);
 
     List<Train> results = new ArrayList<>();
-    for (FileBasedSource<Train> split : splits) {
+    for (BoundedSource<Train> split : splits) {
       results.addAll(readEverythingFromReader(split.createReader(null)));
     }
 
@@ -704,19 +741,21 @@ public class XmlSourceTest {
     List<Train> trains = generateRandomTrainList(100);
     File file = createRandomTrainXML(fileName, trains);
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(10);
-    List<? extends FileBasedSource<Train>> splits = source.split(256, null);
+            .withMinBundleSize(10)
+            .createSource();
+    List<? extends BoundedSource<Train>> splits = source.split(256, null);
 
     // Not a trivial split
     assertTrue(splits.size() > 2);
 
     List<Train> results = new ArrayList<>();
-    for (FileBasedSource<Train> split : splits) {
+    for (BoundedSource<Train> split : splits) {
       results.addAll(readEverythingFromReader(split.createReader(null)));
     }
     assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray()));
@@ -729,14 +768,16 @@ public class XmlSourceTest {
     List<Train> trains = generateRandomTrainList(100);
     File file = createRandomTrainXML(fileName, trains);
 
-    XmlSource<Train> fileSource =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> fileSource =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
             .withRecordClass(Train.class)
-            .withMinBundleSize(10);
+            .withMinBundleSize(10)
+            .createSource();
 
-    List<? extends FileBasedSource<Train>> splits =
+    List<? extends BoundedSource<Train>> splits =
         fileSource.split(file.length() / 3, null);
     for (BoundedSource<Train> splitSource : splits) {
       int numItems = readEverythingFromReader(splitSource.createReader(null)).size();
@@ -771,11 +812,13 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXMLWithAllFeaturesSingleByte.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("trains")
             .withRecordElement("train")
-            .withRecordClass(Train.class);
+            .withRecordClass(Train.class)
+            .createSource();
     assertSplitAtFractionExhaustive(source, options);
   }
 
@@ -788,11 +831,13 @@ public class XmlSourceTest {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXMLWithAllFeaturesMultiByte.getBytes(StandardCharsets.UTF_8));
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.toPath().toString())
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
             .withRootElement("\u0daf\u0dd4\u0db8\u0dca\u0dbb\u0dd2\u0dba\u0db1\u0dca")
             .withRecordElement("\u0daf\u0dd4\u0db8\u0dca\u0dbb\u0dd2\u0dba")
-            .withRecordClass(Train.class);
+            .withRecordClass(Train.class)
+            .createSource();
     assertSplitAtFractionExhaustive(source, options);
   }
 
@@ -808,13 +853,15 @@ public class XmlSourceTest {
     generateRandomTrainList(8);
     createRandomTrainXML("otherfile.xml", trains1);
 
-    XmlSource<Train> source =
-        XmlSource.<Train>from(file.getParent() + "/" + "temp*.xml")
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .withMinBundleSize(1024);
-    PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
+    PCollection<Train> output =
+        p.apply(
+            "ReadFileData",
+            XmlIO.<Train>read()
+                .from(file.getParent() + "/" + "temp*.xml")
+                .withRootElement("trains")
+                .withRecordElement("train")
+                .withRecordClass(Train.class)
+                .withMinBundleSize(1024));
 
     List<Train> expectedResults = new ArrayList<>();
     expectedResults.addAll(trains1);
@@ -827,15 +874,14 @@ public class XmlSourceTest {
 
   @Test
   public void testDisplayData() {
-
-
-    XmlSource<?> source = XmlSource
-        .<Integer>from("foo.xml")
-        .withRootElement("bird")
-        .withRecordElement("cat")
-        .withMinBundleSize(1234)
-        .withRecordClass(Integer.class);
-    DisplayData displayData = DisplayData.from(source);
+    DisplayData displayData =
+        DisplayData.from(
+            XmlIO.<Integer>read()
+                .from("foo.xml")
+                .withRootElement("bird")
+                .withRecordElement("cat")
+                .withMinBundleSize(1234)
+                .withRecordClass(Integer.class));
 
     assertThat(displayData, hasDisplayItem("filePattern", "foo.xml"));
     assertThat(displayData, hasDisplayItem("rootElement", "bird"));

http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index c617f06..9b24b69 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -44,8 +44,10 @@ import static org.junit.Assert.fail;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.testing.EqualsTester;
 import java.io.IOException;
@@ -1299,6 +1301,21 @@ public class DisplayDataTest implements Serializable {
     DisplayData.from(component);
   }
 
+  @AutoValue
+  abstract static class Foo implements HasDisplayData {
+    @Override
+    public void populateDisplayData(Builder builder) {
+      builder.add(DisplayData.item("someKey", "someValue"));
+    }
+  }
+
+  @Test
+  public void testAutoValue() {
+    DisplayData data = DisplayData.from(new AutoValue_DisplayDataTest_Foo());
+    Item item = Iterables.getOnlyElement(data.asMap().values());
+    assertEquals(Foo.class, item.getNamespace());
+  }
+
   private String quoted(Object obj) {
     return String.format("\"%s\"", obj);
   }