You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:52:48 UTC
[04/50] [abbrv] beam git commit: [BEAM-1914] XmlIO now complies with PTransform style guide
[BEAM-1914]�XmlIO now complies with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0c0a60c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0c0a60c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0c0a60c
Branch: refs/heads/gearpump-runner
Commit: d0c0a60c83a9d2a6caa29f91f89d8c0ee3b0eb93
Parents: 57929fb
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 16:25:42 2017 -0700
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Apr 19 10:34:46 2017 +0200
----------------------------------------------------------------------
.../apache/beam/sdk/io/CompressedSource.java | 4 +-
.../main/java/org/apache/beam/sdk/io/XmlIO.java | 477 +++++++++++++++++++
.../java/org/apache/beam/sdk/io/XmlSink.java | 226 ++-------
.../java/org/apache/beam/sdk/io/XmlSource.java | 191 +-------
.../sdk/transforms/display/DisplayData.java | 6 +
.../org/apache/beam/sdk/io/XmlSinkTest.java | 89 ++--
.../org/apache/beam/sdk/io/XmlSourceTest.java | 248 ++++++----
.../sdk/transforms/display/DisplayDataTest.java | 17 +
8 files changed, 740 insertions(+), 518 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index ecd0fd9..1d940cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -46,10 +46,10 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
* A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate
* {@link FileBasedSource} that is able to read the decompressed file format.
*
- * <p>For example, use the following to read from a gzip-compressed XML file:
+ * <p>For example, use the following to read from a gzip-compressed file-based source:
*
* <pre> {@code
- * XmlSource mySource = XmlSource.from(...);
+ * FileBasedSource<T> mySource = ...;
* PCollection<T> collection = p.apply(Read.from(CompressedSource
* .from(mySource)
* .withDecompression(CompressedSource.CompressionMode.GZIP)));
http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
new file mode 100644
index 0000000..a53fb86
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.Nullable;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/** Transforms for reading and writing XML files using JAXB mappers. */
+public class XmlIO {
+ // CHECKSTYLE.OFF: JavadocStyle
+ /**
+ * Reads XML files. This source reads one or more XML files and
+ * creates a {@link PCollection} of a given type. Please note the example given below.
+ *
+ * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML
+ * element names that are defined by the user:
+ *
+ * <pre>{@code
+ * <root>
+ * <record> ... </record>
+ * <record> ... </record>
+ * <record> ... </record>
+ * ...
+ * <record> ... </record>
+ * </root>
+ * }</pre>
+ *
+ * <p>Basically, the XML document should contain a single root element with an inner list
+ * consisting entirely of record elements. The records may contain arbitrary XML content; however,
+ * that content <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags.
+ * This restriction enables reading from large XML files in parallel from different offsets in the
+ * file.
+ *
+ * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.
+ * Additionally users must provide a class of a JAXB annotated Java type that can be used convert
+ * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms.
+ * Reading the source will generate a {@code PCollection} of the given JAXB annotated Java type.
+ * Optionally users may provide a minimum size of a bundle that should be created for the source.
+ *
+ * <p>The following example shows how to use this method in a Beam pipeline:
+ *
+ * <pre>{@code
+ * PCollection<String> output = p.apply(XmlIO.<Record>read()
+ * .from(file.toPath().toString())
+ * .withRootElement("root")
+ * .withRecordElement("record")
+ * .withRecordClass(Record.class));
+ * }</pre>
+ *
+ * <p>Currently, only XML files that use single-byte characters are supported. Using a file that
+ * contains multi-byte characters may result in data loss or duplication.
+ *
+ * <p>To use this method:
+ *
+ * <ol>
+ * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api
+ * <li>Include a compatible implementation on the classpath at run-time, such as
+ * org.codehaus.woodstox:woodstox-core-asl
+ * </ol>
+ *
+ * <p>These dependencies have been declared as optional in the sdks/java/core/pom.xml file of
+ * Apache Beam.
+ *
+ * <h3>Permissions</h3>
+ * Permission requirements depend on the {@link org.apache.beam.sdk.runners.PipelineRunner
+ * PipelineRunner} that is used to execute the Beam pipeline. Please refer to the documentation of
+ * corresponding {@link PipelineRunner PipelineRunners} for more details.
+ *
+ * @param <T> Type of the objects that represent the records of the XML file. The {@code
+ * PCollection} generated by this source will be of this type.
+ */
+ // CHECKSTYLE.ON: JavadocStyle
+ public static <T> Read<T> read() {
+ return new AutoValue_XmlIO_Read.Builder<T>()
+ .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE)
+ .setCompressionType(Read.CompressionType.AUTO)
+ .build();
+ }
+
+ // CHECKSTYLE.OFF: JavadocStyle
+ /**
+ * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of
+ * records from JAXB-annotated classes to a single file location.
+ *
+ * <p>Given a PCollection containing records of type T that can be marshalled to XML elements,
+ * this Sink will produce a single file consisting of a single root element that contains all of
+ * the elements in the PCollection.
+ *
+ * <p>XML Sinks are created with a base filename to write to, a root element name that will be
+ * used for the root element of the output files, and a class to bind to an XML element. This
+ * class will be used in the marshalling of records in an input PCollection to their XML
+ * representation and must be able to be bound using JAXB annotations (checked at pipeline
+ * construction time).
+ *
+ * <p>XML Sinks can be written to using the {@link Write} transform:
+ *
+ * <pre>{@code
+ * p.apply(XmlIO.<Type>write()
+ * .withRecordClass(Type.class)
+ * .withRootElement(root_element)
+ * .toFilenamePrefix(output_filename));
+ * }</pre>
+ *
+ * <p>For example, consider the following class with JAXB annotations:
+ *
+ * <pre>
+ * {@literal @}XmlRootElement(name = "word_count_result")
+ * {@literal @}XmlType(propOrder = {"word", "frequency"})
+ * public class WordFrequency {
+ * private String word;
+ * private long frequency;
+ *
+ * public WordFrequency() { }
+ *
+ * public WordFrequency(String word, long frequency) {
+ * this.word = word;
+ * this.frequency = frequency;
+ * }
+ *
+ * public void setWord(String word) {
+ * this.word = word;
+ * }
+ *
+ * public void setFrequency(long frequency) {
+ * this.frequency = frequency;
+ * }
+ *
+ * public long getFrequency() {
+ * return frequency;
+ * }
+ *
+ * public String getWord() {
+ * return word;
+ * }
+ * }
+ * </pre>
+ *
+ * <p>The following will produce XML output with a root element named "words" from a PCollection
+ * of WordFrequency objects:
+ *
+ * <pre>{@code
+ * p.apply(XmlIO.<WordFrequency>write()
+ * .withRecordClass(WordFrequency.class)
+ * .withRootElement("words")
+ * .toFilenamePrefix(output_file));
+ * }</pre>
+ *
+ * <p>The output of which will look like:
+ *
+ * <pre>{@code
+ * <words>
+ *
+ * <word_count_result>
+ * <word>decreased</word>
+ * <frequency>1</frequency>
+ * </word_count_result>
+ *
+ * <word_count_result>
+ * <word>War</word>
+ * <frequency>4</frequency>
+ * </word_count_result>
+ *
+ * <word_count_result>
+ * <word>empress'</word>
+ * <frequency>14</frequency>
+ * </word_count_result>
+ *
+ * <word_count_result>
+ * <word>stoops</word>
+ * <frequency>6</frequency>
+ * </word_count_result>
+ *
+ * ...
+ * </words>
+ * }</pre>
+ */
+ // CHECKSTYLE.ON: JavadocStyle
+ public static <T> Write<T> write() {
+ return new AutoValue_XmlIO_Write.Builder<T>().build();
+ }
+
+ /** Implementation of {@link #read}. */
+ @AutoValue
+ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+ private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
+
+ @Nullable
+ abstract String getFileOrPatternSpec();
+
+ @Nullable
+ abstract String getRootElement();
+
+ @Nullable
+ abstract String getRecordElement();
+
+ @Nullable
+ abstract Class<T> getRecordClass();
+
+ abstract CompressionType getCompressionType();
+
+ abstract long getMinBundleSize();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec);
+
+ abstract Builder<T> setRootElement(String rootElement);
+
+ abstract Builder<T> setRecordElement(String recordElement);
+
+ abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+ abstract Builder<T> setMinBundleSize(long minBundleSize);
+
+ abstract Builder<T> setCompressionType(CompressionType compressionType);
+
+ abstract Read<T> build();
+ }
+
+ /** Strategy for determining the compression type of XML files being read. */
+ public enum CompressionType {
+ /** Automatically determine the compression type based on filename extension. */
+ AUTO(""),
+ /** Uncompressed (i.e., may be split). */
+ UNCOMPRESSED(""),
+ /** GZipped. */
+ GZIP(".gz"),
+ /** BZipped. */
+ BZIP2(".bz2"),
+ /** Zipped. */
+ ZIP(".zip"),
+ /** Deflate compressed. */
+ DEFLATE(".deflate");
+
+ private String filenameSuffix;
+
+ CompressionType(String suffix) {
+ this.filenameSuffix = suffix;
+ }
+
+ /**
+ * Determine if a given filename matches a compression type based on its extension.
+ * @param filename the filename to match
+ * @return true iff the filename ends with the compression type's known extension.
+ */
+ public boolean matches(String filename) {
+ return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
+ }
+ }
+
+ /**
+ * Reads a single XML file or a set of XML files defined by a Java "glob"
+ * file pattern. Each XML file should be of the form defined in {@link #read}.
+ */
+ public Read<T> from(String fileOrPatternSpec) {
+ return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build();
+ }
+
+ /**
+ * Sets name of the root element of the XML document. This will be used to create a valid
+ * starting root element when initiating a bundle of records created from an XML document. This
+ * is a required parameter.
+ */
+ public Read<T> withRootElement(String rootElement) {
+ return toBuilder().setRootElement(rootElement).build();
+ }
+
+ /**
+ * Sets name of the record element of the XML document. This will be used to determine offset of
+ * the first record of a bundle created from the XML document. This is a required parameter.
+ */
+ public Read<T> withRecordElement(String recordElement) {
+ return toBuilder().setRecordElement(recordElement).build();
+ }
+
+ /**
+ * Sets a JAXB annotated class that can be populated using a record of the provided XML file.
+ * This will be used when unmarshalling record objects from the XML file. This is a required
+ * parameter.
+ */
+ public Read<T> withRecordClass(Class<T> recordClass) {
+ return toBuilder().setRecordClass(recordClass).build();
+ }
+
+ /**
+ * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please
+ * refer to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional
+ * parameter.
+ */
+ public Read<T> withMinBundleSize(long minBundleSize) {
+ return toBuilder().setMinBundleSize(minBundleSize).build();
+ }
+
+ /**
+ * Decompresses all input files using the specified compression type.
+ *
+ * <p>If no compression type is specified, the default is {@link CompressionType#AUTO}.
+ * In this mode, the compression type of the file is determined by its extension.
+ * Supports .gz, .bz2, .zip and .deflate compression.
+ */
+ public Read<T> withCompressionType(CompressionType compressionType) {
+ return toBuilder().setCompressionType(compressionType).build();
+ }
+
+ @Override
+ public void validate(PBegin input) {
+ checkNotNull(
+ getRootElement(),
+ "rootElement is null. Use builder method withRootElement() to set this.");
+ checkNotNull(
+ getRecordElement(),
+ "recordElement is null. Use builder method withRecordElement() to set this.");
+ checkNotNull(
+ getRecordClass(),
+ "recordClass is null. Use builder method withRecordClass() to set this.");
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder
+ .addIfNotDefault(
+ DisplayData.item("minBundleSize", getMinBundleSize())
+ .withLabel("Minimum Bundle Size"),
+ 1L)
+ .add(DisplayData.item("filePattern", getFileOrPatternSpec()).withLabel("File Pattern"))
+ .addIfNotNull(
+ DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element"))
+ .addIfNotNull(
+ DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element"))
+ .addIfNotNull(
+ DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class"));
+ }
+
+ @VisibleForTesting
+ BoundedSource<T> createSource() {
+ XmlSource<T> source = new XmlSource<>(this);
+ switch (getCompressionType()) {
+ case UNCOMPRESSED:
+ return source;
+ case AUTO:
+ return CompressedSource.from(source);
+ case BZIP2:
+ return CompressedSource.from(source)
+ .withDecompression(CompressedSource.CompressionMode.BZIP2);
+ case GZIP:
+ return CompressedSource.from(source)
+ .withDecompression(CompressedSource.CompressionMode.GZIP);
+ case ZIP:
+ return CompressedSource.from(source)
+ .withDecompression(CompressedSource.CompressionMode.ZIP);
+ case DEFLATE:
+ return CompressedSource.from(source)
+ .withDecompression(CompressedSource.CompressionMode.DEFLATE);
+ default:
+ throw new IllegalArgumentException("Unknown compression type: " + getCompressionType());
+ }
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ return input.apply(org.apache.beam.sdk.io.Read.from(createSource()));
+ }
+ }
+
+ /** Implementation of {@link #write}. */
+ @AutoValue
+ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ @Nullable
+ abstract String getFilenamePrefix();
+
+ @Nullable
+ abstract Class<T> getRecordClass();
+
+ @Nullable
+ abstract String getRootElement();
+
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setFilenamePrefix(String baseOutputFilename);
+
+ abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+ abstract Builder<T> setRootElement(String rootElement);
+
+ abstract Write<T> build();
+ }
+
+
+ /**
+ * Writes to files with the given path prefix.
+ *
+ * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
+ * the number of output bundles.
+ */
+ public Write<T> toFilenamePrefix(String filenamePrefix) {
+ return toBuilder().setFilenamePrefix(filenamePrefix).build();
+ }
+
+ /**
+ * Writes objects of the given class mapped to XML elements using JAXB.
+ *
+ * <p>The specified class must be able to be used to create a JAXB context.
+ */
+ public Write<T> withRecordClass(Class<T> recordClass) {
+ return toBuilder().setRecordClass(recordClass).build();
+ }
+
+ /**
+ * Sets the enclosing root element for the generated XML files.
+ */
+ public Write<T> withRootElement(String rootElement) {
+ return toBuilder().setRootElement(rootElement).build();
+ }
+
+ @Override
+ public void validate(PCollection<T> input) {
+ checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context.");
+ checkNotNull(getRootElement(), "Missing a root element name.");
+ checkNotNull(getFilenamePrefix(), "Missing a filename to write to.");
+ try {
+ JAXBContext.newInstance(getRecordClass());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Error binding classes to a JAXB Context.", e);
+ }
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ return input.apply(org.apache.beam.sdk.io.Write.to(createSink()));
+ }
+
+ @VisibleForTesting
+ XmlSink<T> createSink() {
+ return new XmlSink<>(this);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ createSink().populateFileBasedDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("rootElement", getRootElement())
+ .withLabel("XML Root Element"))
+ .addIfNotNull(DisplayData.item("recordClass", getRecordClass())
+ .withLabel("XML Record Class"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
index 2159c8f..7700329 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
@@ -17,226 +17,58 @@
*/
package org.apache.beam.sdk.io;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PCollection;
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of
- * records from JAXB-annotated classes to a single file location.
- *
- * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, this
- * Sink will produce a single file consisting of a single root element that contains all of the
- * elements in the PCollection.
- *
- * <p>XML Sinks are created with a base filename to write to, a root element name that will be used
- * for the root element of the output files, and a class to bind to an XML element. This class
- * will be used in the marshalling of records in an input PCollection to their XML representation
- * and must be able to be bound using JAXB annotations (checked at pipeline construction time).
- *
- * <p>XML Sinks can be written to using the {@link Write} transform:
- *
- * <pre>
- * p.apply(Write.to(
- * XmlSink.ofRecordClass(Type.class)
- * .withRootElementName(root_element)
- * .toFilenamePrefix(output_filename)));
- * </pre>
- *
- * <p>For example, consider the following class with JAXB annotations:
- *
- * <pre>
- * {@literal @}XmlRootElement(name = "word_count_result")
- * {@literal @}XmlType(propOrder = {"word", "frequency"})
- * public class WordFrequency {
- * private String word;
- * private long frequency;
- *
- * public WordFrequency() { }
- *
- * public WordFrequency(String word, long frequency) {
- * this.word = word;
- * this.frequency = frequency;
- * }
- *
- * public void setWord(String word) {
- * this.word = word;
- * }
- *
- * public void setFrequency(long frequency) {
- * this.frequency = frequency;
- * }
- *
- * public long getFrequency() {
- * return frequency;
- * }
- *
- * public String getWord() {
- * return word;
- * }
- * }
- * </pre>
- *
- * <p>The following will produce XML output with a root element named "words" from a PCollection of
- * WordFrequency objects:
- * <pre>
- * p.apply(Write.to(
- * XmlSink.ofRecordClass(WordFrequency.class)
- * .withRootElement("words")
- * .toFilenamePrefix(output_file)));
- * </pre>
- *
- * <p>The output of which will look like:
- * <pre>
- * {@code
- * <words>
- *
- * <word_count_result>
- * <word>decreased</word>
- * <frequency>1</frequency>
- * </word_count_result>
- *
- * <word_count_result>
- * <word>War</word>
- * <frequency>4</frequency>
- * </word_count_result>
- *
- * <word_count_result>
- * <word>empress'</word>
- * <frequency>14</frequency>
- * </word_count_result>
- *
- * <word_count_result>
- * <word>stoops</word>
- * <frequency>6</frequency>
- * </word_count_result>
- *
- * ...
- * </words>
- * }</pre>
- */
-// CHECKSTYLE.ON: JavadocStyle
-@SuppressWarnings("checkstyle:javadocstyle")
-public class XmlSink {
+/** Implementation of {@link XmlIO#write}. */
+class XmlSink<T> extends FileBasedSink<T> {
protected static final String XML_EXTENSION = "xml";
- /**
- * Returns a builder for an XmlSink. You'll need to configure the class to bind, the root
- * element name, and the output file prefix with {@link Bound#ofRecordClass}, {@link
- * Bound#withRootElement}, and {@link Bound#toFilenamePrefix}, respectively.
- */
- public static Bound<?> write() {
- return new Bound<>(null, null, null);
+ private final XmlIO.Write<T> spec;
+
+ XmlSink(XmlIO.Write<T> spec) {
+ super(spec.getFilenamePrefix(), XML_EXTENSION);
+ this.spec = spec;
}
/**
- * Returns an XmlSink that writes objects as XML entities.
- *
- * <p>Output files will have the name {@literal {baseOutputFilename}-0000i-of-0000n.xml} where n
- * is the number of output bundles.
- *
- * @param klass the class of the elements to write.
- * @param rootElementName the enclosing root element.
- * @param baseOutputFilename the output filename prefix.
+ * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have
+ * been set and that the class can be bound in a JAXB context.
*/
- public static <T> Bound<T> writeOf(
- Class<T> klass, String rootElementName, String baseOutputFilename) {
- return new Bound<>(klass, rootElementName, baseOutputFilename);
+ @Override
+ public void validate(PipelineOptions options) {
+ spec.validate(null);
}
/**
- * A {@link FileBasedSink} that writes objects as XML elements.
+ * Creates an {@link XmlWriteOperation}.
*/
- public static class Bound<T> extends FileBasedSink<T> {
- final Class<T> classToBind;
- final String rootElementName;
-
- private Bound(Class<T> classToBind, String rootElementName, String baseOutputFilename) {
- super(baseOutputFilename, XML_EXTENSION);
- this.classToBind = classToBind;
- this.rootElementName = rootElementName;
- }
-
- /**
- * Returns an XmlSink that writes objects of the class specified as XML elements.
- *
- * <p>The specified class must be able to be used to create a JAXB context.
- */
- public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
- return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
- }
-
- /**
- * Returns an XmlSink that writes to files with the given prefix.
- *
- * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
- * the number of output bundles.
- */
- public Bound<T> toFilenamePrefix(String baseOutputFilename) {
- return new Bound<>(classToBind, rootElementName, baseOutputFilename);
- }
-
- /**
- * Returns an XmlSink that writes XML files with an enclosing root element of the
- * supplied name.
- */
- public Bound<T> withRootElement(String rootElementName) {
- return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
- }
-
- /**
- * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have
- * been set and that the class can be bound in a JAXB context.
- */
- @Override
- public void validate(PipelineOptions options) {
- checkNotNull(classToBind, "Missing a class to bind to a JAXB context.");
- checkNotNull(rootElementName, "Missing a root element name.");
- checkNotNull(getBaseOutputFilenameProvider().get(), "Missing a filename to write to.");
- try {
- JAXBContext.newInstance(classToBind);
- } catch (JAXBException e) {
- throw new RuntimeException("Error binding classes to a JAXB Context.", e);
- }
- }
+ @Override
+ public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
+ return new XmlWriteOperation<>(this);
+ }
- /**
- * Creates an {@link XmlWriteOperation}.
- */
- @Override
- public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
- return new XmlWriteOperation<>(this);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ spec.populateDisplayData(builder);
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("rootElement", rootElementName)
- .withLabel("XML Root Element"))
- .addIfNotNull(DisplayData.item("recordClass", classToBind)
- .withLabel("XML Record Class"));
- }
+ void populateFileBasedDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
}
/**
* {@link Sink.WriteOperation} for XML {@link Sink}s.
*/
protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> {
- public XmlWriteOperation(XmlSink.Bound<T> sink) {
+ public XmlWriteOperation(XmlSink<T> sink) {
super(sink);
}
@@ -247,7 +79,7 @@ public class XmlSink {
public XmlWriter<T> createWriter(PipelineOptions options) throws Exception {
JAXBContext context;
Marshaller marshaller;
- context = JAXBContext.newInstance(getSink().classToBind);
+ context = JAXBContext.newInstance(getSink().spec.getRecordClass());
marshaller = context.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
@@ -259,8 +91,8 @@ public class XmlSink {
* Return the XmlSink.Bound for this write operation.
*/
@Override
- public XmlSink.Bound<T> getSink() {
- return (XmlSink.Bound<T>) super.getSink();
+ public XmlSink<T> getSink() {
+ return (XmlSink<T>) super.getSink();
}
}
@@ -289,7 +121,7 @@ public class XmlSink {
*/
@Override
protected void writeHeader() throws Exception {
- String rootElementName = getWriteOperation().getSink().rootElementName;
+ String rootElementName = getWriteOperation().getSink().spec.getRootElement();
os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + rootElementName + ">\n"));
}
@@ -298,7 +130,7 @@ public class XmlSink {
*/
@Override
protected void writeFooter() throws Exception {
- String rootElementName = getWriteOperation().getSink().rootElementName;
+ String rootElementName = getWriteOperation().getSink().spec.getRootElement();
os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + rootElementName + ">"));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
index 6bf2015..7416c85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.io;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -45,154 +43,29 @@ import javax.xml.stream.XMLStreamReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.JAXBCoder;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PCollection;
import org.codehaus.stax2.XMLInputFactory2;
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A source that can be used to read XML files. This source reads one or more
- * XML files and creates a {@link PCollection} of a given type. A {@link Read} transform can be
- * created by passing an {@link XmlSource} object to {@link Read#from}. Please note the
- * example given below.
- *
- * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML
- * element names that are defined by the user:
- *
- * <pre>
- * {@code
- * <root>
- * <record> ... </record>
- * <record> ... </record>
- * <record> ... </record>
- * ...
- * <record> ... </record>
- * </root>
- * }
- * </pre>
- *
- * <p>Basically, the XML document should contain a single root element with an inner list consisting
- * entirely of record elements. The records may contain arbitrary XML content; however, that content
- * <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. This
- * restriction enables reading from large XML files in parallel from different offsets in the file.
- *
- * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.
- * Additionally users must provide a class of a JAXB annotated Java type that can be used convert
- * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. Reading
- * the source will generate a {@code PCollection} of the given JAXB annotated Java type.
- * Optionally users may provide a minimum size of a bundle that should be created for the source.
- *
- * <p>The following example shows how to read from {@link XmlSource} in a Beam pipeline:
- *
- * <pre>
- * {@code
- * XmlSource<String> source = XmlSource.<String>from(file.toPath().toString())
- * .withRootElement("root")
- * .withRecordElement("record")
- * .withRecordClass(Record.class);
- * PCollection<String> output = p.apply(Read.from(source));
- * }
- * </pre>
- *
- * <p>Currently, only XML files that use single-byte characters are supported. Using a file that
- * contains multi-byte characters may result in data loss or duplication.
- *
- * <p>To use {@link XmlSource}:
- * <ol>
- * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api</li>
- * <li>Include a compatible implementation on the classpath at run-time,
- * such as org.codehaus.woodstox:woodstox-core-asl</li>
- * </ol>
- *
- * <p>These dependencies have been declared as optional in the sdks/java/core/pom.xml file of
- * Apache Beam.
- *
- * <h3>Permissions</h3>
- * Permission requirements depend on the
- * {@link org.apache.beam.sdk.runners.PipelineRunner PipelineRunner} that is
- * used to execute the Beam pipeline. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- *
- * @param <T> Type of the objects that represent the records of the XML file. The
- * {@code PCollection} generated by this source will be of this type.
- */
-// CHECKSTYLE.ON: JavadocStyle
+/** Implementation of {@link XmlIO#read}. */
public class XmlSource<T> extends FileBasedSource<T> {
private static final String XML_VERSION = "1.1";
- private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
- private final String rootElement;
- private final String recordElement;
- private final Class<T> recordClass;
-
- /**
- * Creates an XmlSource for a single XML file or a set of XML files defined by a Java "glob" file
- * pattern. Each XML file should be of the form defined in {@link XmlSource}.
- */
- public static <T> XmlSource<T> from(String fileOrPatternSpec) {
- return new XmlSource<>(fileOrPatternSpec, DEFAULT_MIN_BUNDLE_SIZE, null, null, null);
- }
-
- /**
- * Sets name of the root element of the XML document. This will be used to create a valid starting
- * root element when initiating a bundle of records created from an XML document. This is a
- * required parameter.
- */
- public XmlSource<T> withRootElement(String rootElement) {
- return new XmlSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
- }
- /**
- * Sets name of the record element of the XML document. This will be used to determine offset of
- * the first record of a bundle created from the XML document. This is a required parameter.
- */
- public XmlSource<T> withRecordElement(String recordElement) {
- return new XmlSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
- }
+ private final XmlIO.Read<T> spec;
- /**
- * Sets a JAXB annotated class that can be populated using a record of the provided XML file. This
- * will be used when unmarshalling record objects from the XML file. This is a required
- * parameter.
- */
- public XmlSource<T> withRecordClass(Class<T> recordClass) {
- return new XmlSource<>(
- getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass);
+ XmlSource(XmlIO.Read<T> spec) {
+ super(spec.getFileOrPatternSpec(), spec.getMinBundleSize());
+ this.spec = spec;
}
- /**
- * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please refer
- * to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional
- * parameter.
- */
- public XmlSource<T> withMinBundleSize(long minBundleSize) {
- return new XmlSource<>(
- getFileOrPatternSpec(), minBundleSize, rootElement, recordElement, recordClass);
- }
-
- private XmlSource(String fileOrPattern, long minBundleSize, String rootElement,
- String recordElement, Class<T> recordClass) {
- super(fileOrPattern, minBundleSize);
- this.rootElement = rootElement;
- this.recordElement = recordElement;
- this.recordClass = recordClass;
- }
-
- private XmlSource(String fileOrPattern, long minBundleSize, long startOffset, long endOffset,
- String rootElement, String recordElement, Class<T> recordClass) {
- super(fileOrPattern, minBundleSize, startOffset, endOffset);
- this.rootElement = rootElement;
- this.recordElement = recordElement;
- this.recordClass = recordClass;
+ private XmlSource(XmlIO.Read<T> spec, long startOffset, long endOffset) {
+ super(spec.getFileOrPatternSpec(), spec.getMinBundleSize(), startOffset, endOffset);
+ this.spec = spec;
}
@Override
protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
- return new XmlSource<T>(
- fileName, getMinBundleSize(), start, end, rootElement, recordElement, recordClass);
+ return new XmlSource<T>(spec.from(fileName), start, end);
}
@Override
@@ -203,42 +76,17 @@ public class XmlSource<T> extends FileBasedSource<T> {
@Override
public void validate() {
super.validate();
- checkNotNull(
- rootElement, "rootElement is null. Use builder method withRootElement() to set this.");
- checkNotNull(
- recordElement,
- "recordElement is null. Use builder method withRecordElement() to set this.");
- checkNotNull(
- recordClass, "recordClass is null. Use builder method withRecordClass() to set this.");
+ spec.validate(null);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("rootElement", rootElement)
- .withLabel("XML Root Element"))
- .addIfNotNull(DisplayData.item("recordElement", recordElement)
- .withLabel("XML Record Element"))
- .addIfNotNull(DisplayData.item("recordClass", recordClass)
- .withLabel("XML Record Class"));
+ spec.populateDisplayData(builder);
}
@Override
public Coder<T> getDefaultOutputCoder() {
- return JAXBCoder.of(recordClass);
- }
-
- public String getRootElement() {
- return rootElement;
- }
-
- public String getRecordElement() {
- return recordElement;
- }
-
- public Class<T> getRecordClass() {
- return recordClass;
+ return JAXBCoder.of(spec.getRecordClass());
}
/**
@@ -289,7 +137,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
// Set up a JAXB Unmarshaller that can be used to unmarshall record objects.
try {
- JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().recordClass);
+ JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().spec.getRecordClass());
jaxbUnmarshaller = jaxbContext.createUnmarshaller();
// Throw errors if validation fails. JAXB by default ignores validation errors.
@@ -334,8 +182,10 @@ public class XmlSource<T> extends FileBasedSource<T> {
// this XML parsing may fail or may produce incorrect results.
byte[] dummyStartDocumentBytes =
- ("<?xml version=\"" + XML_VERSION + "\" encoding=\"UTF-8\" ?>"
- + "<" + getCurrentSource().rootElement + ">").getBytes(StandardCharsets.UTF_8);
+ (String.format(
+ "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>",
+ XML_VERSION, getCurrentSource().spec.getRootElement()))
+ .getBytes(StandardCharsets.UTF_8);
preambleByteBuffer.write(dummyStartDocumentBytes);
// Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This
// method returns the offset and stores any bytes that should be used when creating the XML
@@ -383,7 +233,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
byte[] recordStartBytes =
- ("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8);
+ ("<" + getCurrentSource().spec.getRecordElement()).getBytes(StandardCharsets.UTF_8);
outer: while (channel.read(buf) > 0) {
buf.flip();
@@ -494,7 +344,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
int event = parser.next();
if (event == XMLStreamConstants.START_ELEMENT) {
String localName = parser.getLocalName();
- if (localName.equals(getCurrentSource().recordElement)) {
+ if (localName.equals(getCurrentSource().spec.getRecordElement())) {
break;
}
}
@@ -521,7 +371,8 @@ public class XmlSource<T> extends FileBasedSource<T> {
return false;
}
}
- JAXBElement<T> jb = jaxbUnmarshaller.unmarshal(parser, getCurrentSource().recordClass);
+ JAXBElement<T> jb =
+ jaxbUnmarshaller.unmarshal(parser, getCurrentSource().spec.getRecordClass());
currentRecord = jb.getValue();
return true;
} catch (JAXBException | XMLStreamException e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 669dc6d..3c4337b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -778,6 +778,12 @@ public class DisplayData implements Serializable {
visitedComponents.add(subComponent);
visitedPathMap.put(path, subComponent);
Class<?> namespace = subComponent.getClass();
+ // Common case: AutoValue classes such as AutoValue_FooIO_Read. It's more useful
+ // to show the user the FooIO.Read class, which is the direct superclass of the AutoValue
+ // generated class.
+ if (namespace.getSimpleName().startsWith("AutoValue_")) {
+ namespace = namespace.getSuperclass();
+ }
Path prevPath = latestPath;
Class<?> prevNs = latestNs;
http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 63b5d11..7f559d1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -59,7 +59,6 @@ public class XmlSinkTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
- private Class<Bird> testClass = Bird.class;
private String testRootElement = "testElement";
private String testFilePrefix = "/path/to/testPrefix";
@@ -70,7 +69,12 @@ public class XmlSinkTest {
public void testXmlWriter() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
XmlWriteOperation<Bird> writeOp =
- XmlSink.writeOf(Bird.class, "birds", testFilePrefix).createWriteOperation(options);
+ XmlIO.<Bird>write()
+ .toFilenamePrefix(testFilePrefix)
+ .withRecordClass(Bird.class)
+ .withRootElement("birds")
+ .createSink()
+ .createWriteOperation(options);
XmlWriter<Bird> writer = writeOp.createWriter(options);
List<Bird> bundle =
@@ -85,51 +89,37 @@ public class XmlSinkTest {
* Builder methods correctly initialize an XML Sink.
*/
@Test
- public void testBuildXmlSink() {
- XmlSink.Bound<Bird> sink =
- XmlSink.write()
+ public void testBuildXmlWriteTransform() {
+ XmlIO.Write<Bird> write =
+ XmlIO.<Bird>write()
.toFilenamePrefix(testFilePrefix)
- .ofRecordClass(testClass)
+ .withRecordClass(Bird.class)
.withRootElement(testRootElement);
- assertEquals(testClass, sink.classToBind);
- assertEquals(testRootElement, sink.rootElementName);
- assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
+ assertEquals(Bird.class, write.getRecordClass());
+ assertEquals(testRootElement, write.getRootElement());
+ assertEquals(testFilePrefix, write.getFilenamePrefix());
}
- /**
- * Alternate builder method correctly initializes an XML Sink.
- */
+ /** Validation ensures no fields are missing. */
@Test
- public void testBuildXmlSinkDirect() {
- XmlSink.Bound<Bird> sink =
- XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix);
- assertEquals(testClass, sink.classToBind);
- assertEquals(testRootElement, sink.rootElementName);
- assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
+ public void testValidateXmlSinkMissingRecordClass() {
+ thrown.expect(NullPointerException.class);
+ XmlIO.<Bird>write()
+ .withRootElement(testRootElement)
+ .toFilenamePrefix(testFilePrefix)
+ .validate(null);
}
- /**
- * Validation ensures no fields are missing.
- */
@Test
- public void testValidateXmlSinkMissingFields() {
- XmlSink.Bound<Bird> sink;
- sink = XmlSink.writeOf(null, testRootElement, testFilePrefix);
- validateAndFailIfSucceeds(sink, NullPointerException.class);
- sink = XmlSink.writeOf(testClass, null, testFilePrefix);
- validateAndFailIfSucceeds(sink, NullPointerException.class);
- sink = XmlSink.writeOf(testClass, testRootElement, null);
- validateAndFailIfSucceeds(sink, NullPointerException.class);
+ public void testValidateXmlSinkMissingRootElement() {
+ thrown.expect(NullPointerException.class);
+ XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null);
}
- /**
- * Call validate and fail if validation does not throw the expected exception.
- */
- private <T> void validateAndFailIfSucceeds(
- XmlSink.Bound<T> sink, Class<? extends Exception> expected) {
- thrown.expect(expected);
- PipelineOptions options = PipelineOptionsFactory.create();
- sink.validate(options);
+ @Test
+ public void testValidateXmlSinkMissingFilePrefix() {
+ thrown.expect(NullPointerException.class);
+ XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null);
}
/**
@@ -138,13 +128,13 @@ public class XmlSinkTest {
@Test
public void testCreateWriteOperations() {
PipelineOptions options = PipelineOptionsFactory.create();
- XmlSink.Bound<Bird> sink =
- XmlSink.writeOf(testClass, testRootElement, testFilePrefix);
+ XmlSink<Bird> sink =
+ XmlIO.<Bird>write()
+ .withRecordClass(Bird.class)
+ .withRootElement(testRootElement)
+ .toFilenamePrefix(testFilePrefix)
+ .createSink();
XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
- assertEquals(testClass, writeOp.getSink().classToBind);
- assertEquals(testFilePrefix, writeOp.getSink().getBaseOutputFilenameProvider().get());
- assertEquals(testRootElement, writeOp.getSink().rootElementName);
- // assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().getFilenamePolicy().extension);
Path outputPath = new File(testFilePrefix).toPath();
Path tempPath = new File(writeOp.tempDirectory.get()).toPath();
assertEquals(outputPath.getParent(), tempPath.getParent());
@@ -159,7 +149,11 @@ public class XmlSinkTest {
public void testCreateWriter() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
XmlWriteOperation<Bird> writeOp =
- XmlSink.writeOf(testClass, testRootElement, testFilePrefix)
+ XmlIO.<Bird>write()
+ .withRecordClass(Bird.class)
+ .withRootElement(testRootElement)
+ .toFilenamePrefix(testFilePrefix)
+ .createSink()
.createWriteOperation(options);
XmlWriter<Bird> writer = writeOp.createWriter(options);
Path outputPath = new File(testFilePrefix).toPath();
@@ -167,18 +161,17 @@ public class XmlSinkTest {
assertEquals(outputPath.getParent(), tempPath.getParent());
assertThat(
tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
- assertEquals(testRootElement, writer.getWriteOperation().getSink().rootElementName);
assertNotNull(writer.marshaller);
}
@Test
public void testDisplayData() {
- XmlSink.Bound<Integer> sink = XmlSink.write()
+ XmlIO.Write<Integer> write = XmlIO.<Integer>write()
.toFilenamePrefix("foobar")
.withRootElement("bird")
- .ofRecordClass(Integer.class);
+ .withRecordClass(Integer.class);
- DisplayData displayData = DisplayData.from(sink);
+ DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml"));
assertThat(displayData, hasDisplayItem("rootElement", "bird"));
http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
index 5f71f30..0120b8b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
@@ -285,12 +285,14 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLTiny");
Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(1024);
+ .withMinBundleSize(1024)
+ .createSource();
List<Train> expectedResults = ImmutableList.of(
new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
@@ -308,12 +310,14 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLTiny");
Files.write(file.toPath(), xmlWithMultiByteChars.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(1024);
+ .withMinBundleSize(1024)
+ .createSource();
List<Train> expectedResults = ImmutableList.of(
new Train("Thomas�", Train.TRAIN_NUMBER_UNDEFINED, null, null),
@@ -334,12 +338,14 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLTiny");
Files.write(file.toPath(), xmlWithMultiByteElementName.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("\u0daf\u0dd4\u0db8\u0dca\u0dbb\u0dd2\u0dba\u0db1\u0dca")
.withRecordElement("\u0daf\u0dd4\u0db8\u0dca\u0dbb\u0dd2\u0dba")
.withRecordClass(Train.class)
- .withMinBundleSize(1024);
+ .withMinBundleSize(1024)
+ .createSource();
List<Train> expectedResults = ImmutableList.of(
new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
@@ -357,18 +363,20 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLTiny");
Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(10);
- List<? extends FileBasedSource<Train>> splits = source.split(50, null);
+ .withMinBundleSize(10)
+ .createSource();
+ List<? extends BoundedSource<Train>> splits = source.split(50, null);
assertTrue(splits.size() > 2);
List<Train> results = new ArrayList<>();
- for (FileBasedSource<Train> split : splits) {
+ for (BoundedSource<Train> split : splits) {
results.addAll(readEverythingFromReader(split.createReader(null)));
}
@@ -394,12 +402,14 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(1024);
+ .withMinBundleSize(1024)
+ .createSource();
List<Train> expectedResults =
ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null),
@@ -417,10 +427,12 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRecordElement("train")
- .withRecordClass(Train.class);
+ .withRecordClass(Train.class)
+ .createSource();
exception.expect(NullPointerException.class);
exception.expectMessage(
@@ -433,10 +445,12 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
- .withRecordClass(Train.class);
+ .withRecordClass(Train.class)
+ .createSource();
exception.expect(NullPointerException.class);
exception.expectMessage(
@@ -449,10 +463,12 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
- .withRecordElement("train");
+ .withRecordElement("train")
+ .createSource();
exception.expect(NullPointerException.class);
exception.expectMessage(
@@ -465,11 +481,13 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("something")
.withRecordElement("train")
- .withRecordClass(Train.class);
+ .withRecordClass(Train.class)
+ .createSource();
exception.expectMessage("Unexpected close tag </trains>; expected </something>.");
readEverythingFromReader(source.createReader(null));
@@ -480,11 +498,13 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("something")
- .withRecordClass(Train.class);
+ .withRecordClass(Train.class)
+ .createSource();
assertEquals(readEverythingFromReader(source.createReader(null)), new ArrayList<Train>());
}
@@ -500,11 +520,13 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<WrongTrainType> source =
- XmlSource.<WrongTrainType>from(file.toPath().toString())
+ BoundedSource<WrongTrainType> source =
+ XmlIO.<WrongTrainType>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
- .withRecordClass(WrongTrainType.class);
+ .withRecordClass(WrongTrainType.class)
+ .createSource();
exception.expect(RuntimeException.class);
@@ -525,11 +547,13 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
- .withRecordClass(Train.class);
+ .withRecordClass(Train.class)
+ .createSource();
List<Train> expectedResults =
ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null),
@@ -548,12 +572,14 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithEmptyTags.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(1024);
+ .withMinBundleSize(1024)
+ .createSource();
List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null),
new Train("Henry", 3, "green", null), new Train("Toby", 7, "brown", null),
@@ -572,14 +598,15 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
- .withRootElement("trains")
- .withRecordElement("train")
- .withRecordClass(Train.class)
- .withMinBundleSize(1024);
-
- PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
+ PCollection<Train> output =
+ p.apply(
+ "ReadFileData",
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
+ .withRootElement("trains")
+ .withRecordElement("train")
+ .withRecordClass(Train.class)
+ .withMinBundleSize(1024));
List<Train> expectedResults =
ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null),
@@ -595,12 +622,14 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithAttributes.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(1024);
+ .withMinBundleSize(1024)
+ .createSource();
List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", "small"),
new Train("Henry", 3, "green", "big"), new Train("Toby", 7, "brown", "small"),
@@ -618,12 +647,14 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithSpaces.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(1024);
+ .withMinBundleSize(1024)
+ .createSource();
List<Train> expectedResults = ImmutableList.of(new Train("Thomas ", 1, "blue", null),
new Train("Henry", 3, "green", null), new Train("Toby", 7, " brown ", null),
@@ -642,12 +673,14 @@ public class XmlSourceTest {
List<Train> trains = generateRandomTrainList(100);
File file = createRandomTrainXML(fileName, trains);
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(1024);
+ .withMinBundleSize(1024)
+ .createSource();
assertThat(
trainsToStrings(trains),
@@ -662,13 +695,15 @@ public class XmlSourceTest {
List<Train> trains = generateRandomTrainList(100);
File file = createRandomTrainXML(fileName, trains);
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
- .withRootElement("trains")
- .withRecordElement("train")
- .withRecordClass(Train.class)
- .withMinBundleSize(1024);
- PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
+ PCollection<Train> output =
+ p.apply(
+ "ReadFileData",
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
+ .withRootElement("trains")
+ .withRecordElement("train")
+ .withRecordClass(Train.class)
+ .withMinBundleSize(1024));
PAssert.that(output).containsInAnyOrder(trains);
p.run();
@@ -680,18 +715,20 @@ public class XmlSourceTest {
List<Train> trains = generateRandomTrainList(10);
File file = createRandomTrainXML(fileName, trains);
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(10);
- List<? extends FileBasedSource<Train>> splits = source.split(100, null);
+ .withMinBundleSize(10)
+ .createSource();
+ List<? extends BoundedSource<Train>> splits = source.split(100, null);
assertTrue(splits.size() > 2);
List<Train> results = new ArrayList<>();
- for (FileBasedSource<Train> split : splits) {
+ for (BoundedSource<Train> split : splits) {
results.addAll(readEverythingFromReader(split.createReader(null)));
}
@@ -704,19 +741,21 @@ public class XmlSourceTest {
List<Train> trains = generateRandomTrainList(100);
File file = createRandomTrainXML(fileName, trains);
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(10);
- List<? extends FileBasedSource<Train>> splits = source.split(256, null);
+ .withMinBundleSize(10)
+ .createSource();
+ List<? extends BoundedSource<Train>> splits = source.split(256, null);
// Not a trivial split
assertTrue(splits.size() > 2);
List<Train> results = new ArrayList<>();
- for (FileBasedSource<Train> split : splits) {
+ for (BoundedSource<Train> split : splits) {
results.addAll(readEverythingFromReader(split.createReader(null)));
}
assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray()));
@@ -729,14 +768,16 @@ public class XmlSourceTest {
List<Train> trains = generateRandomTrainList(100);
File file = createRandomTrainXML(fileName, trains);
- XmlSource<Train> fileSource =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> fileSource =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(Train.class)
- .withMinBundleSize(10);
+ .withMinBundleSize(10)
+ .createSource();
- List<? extends FileBasedSource<Train>> splits =
+ List<? extends BoundedSource<Train>> splits =
fileSource.split(file.length() / 3, null);
for (BoundedSource<Train> splitSource : splits) {
int numItems = readEverythingFromReader(splitSource.createReader(null)).size();
@@ -771,11 +812,13 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithAllFeaturesSingleByte.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
- .withRecordClass(Train.class);
+ .withRecordClass(Train.class)
+ .createSource();
assertSplitAtFractionExhaustive(source, options);
}
@@ -788,11 +831,13 @@ public class XmlSourceTest {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXMLWithAllFeaturesMultiByte.getBytes(StandardCharsets.UTF_8));
- XmlSource<Train> source =
- XmlSource.<Train>from(file.toPath().toString())
+ BoundedSource<Train> source =
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
.withRootElement("\u0daf\u0dd4\u0db8\u0dca\u0dbb\u0dd2\u0dba\u0db1\u0dca")
.withRecordElement("\u0daf\u0dd4\u0db8\u0dca\u0dbb\u0dd2\u0dba")
- .withRecordClass(Train.class);
+ .withRecordClass(Train.class)
+ .createSource();
assertSplitAtFractionExhaustive(source, options);
}
@@ -808,13 +853,15 @@ public class XmlSourceTest {
generateRandomTrainList(8);
createRandomTrainXML("otherfile.xml", trains1);
- XmlSource<Train> source =
- XmlSource.<Train>from(file.getParent() + "/" + "temp*.xml")
- .withRootElement("trains")
- .withRecordElement("train")
- .withRecordClass(Train.class)
- .withMinBundleSize(1024);
- PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
+ PCollection<Train> output =
+ p.apply(
+ "ReadFileData",
+ XmlIO.<Train>read()
+ .from(file.getParent() + "/" + "temp*.xml")
+ .withRootElement("trains")
+ .withRecordElement("train")
+ .withRecordClass(Train.class)
+ .withMinBundleSize(1024));
List<Train> expectedResults = new ArrayList<>();
expectedResults.addAll(trains1);
@@ -827,15 +874,14 @@ public class XmlSourceTest {
@Test
public void testDisplayData() {
-
-
- XmlSource<?> source = XmlSource
- .<Integer>from("foo.xml")
- .withRootElement("bird")
- .withRecordElement("cat")
- .withMinBundleSize(1234)
- .withRecordClass(Integer.class);
- DisplayData displayData = DisplayData.from(source);
+ DisplayData displayData =
+ DisplayData.from(
+ XmlIO.<Integer>read()
+ .from("foo.xml")
+ .withRootElement("bird")
+ .withRecordElement("cat")
+ .withMinBundleSize(1234)
+ .withRecordClass(Integer.class));
assertThat(displayData, hasDisplayItem("filePattern", "foo.xml"));
assertThat(displayData, hasDisplayItem("rootElement", "bird"));
http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index c617f06..9b24b69 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -44,8 +44,10 @@ import static org.junit.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.testing.EqualsTester;
import java.io.IOException;
@@ -1299,6 +1301,21 @@ public class DisplayDataTest implements Serializable {
DisplayData.from(component);
}
+ @AutoValue
+ abstract static class Foo implements HasDisplayData {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add(DisplayData.item("someKey", "someValue"));
+ }
+ }
+
+ @Test
+ public void testAutoValue() {
+ DisplayData data = DisplayData.from(new AutoValue_DisplayDataTest_Foo());
+ Item item = Iterables.getOnlyElement(data.asMap().values());
+ assertEquals(Foo.class, item.getNamespace());
+ }
+
private String quoted(Object obj) {
return String.format("\"%s\"", obj);
}