You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/24 16:21:56 UTC
[2/3] beam git commit: [BEAM-2060] Allow to specify charset in XmlIO
[BEAM-2060] Allow to specify charset in XmlIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffc77813
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffc77813
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffc77813
Branch: refs/heads/master
Commit: ffc77813bb7883d894f65d1a70a88f6c2f56ca89
Parents: 9c39682
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Apr 24 16:37:40 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Apr 24 09:21:20 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/xml/XmlIO.java | 44 +++++++++++++++++-
.../org/apache/beam/sdk/io/xml/XmlSource.java | 8 ++--
.../apache/beam/sdk/io/xml/XmlSourceTest.java | 49 ++++++++++++++++++++
3 files changed, 97 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ffc77813/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index bf0e1b5..ef07925 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
+
import javax.annotation.Nullable;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
@@ -77,6 +78,29 @@ public class XmlIO {
* .withRecordClass(Record.class));
* }</pre>
*
+ * <p>By default, UTF-8 charset is used. If your file is using a different charset, you have to
+ * specify as follow:
+ *
+ * <pre>{@code
+ * PCollection<String> output = p.apply(XmlIO.<Record>read()
+ * .from(file.toPath().toString())
+ * .withRooElement("root")
+ * .withRecordElement("record")
+ * .withRecordClass(Record.class)
+ * .withCharset("ISO-8859-1"));
+ * }</pre>
+ *
+ * <p>Or:
+ *
+ * <pre>{@code
+ * PCollection<String> output = p.apply(XmlIO.<Record>read()
+ * .from(file.toPath().toString())
+ * .withRooElement("root")
+ * .withRecordElement("record")
+ * .withRecordClass(Record.class)
+ * .withCharset(StandardCharsets.ISO_8859_1.name()));
+ * }</pre>
+ *
* <p>Currently, only XML files that use single-byte characters are supported. Using a file that
* contains multi-byte characters may result in data loss or duplication.
*
@@ -94,6 +118,7 @@ public class XmlIO {
return new AutoValue_XmlIO_Read.Builder<T>()
.setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE)
.setCompressionType(Read.CompressionType.AUTO)
+ .setCharset("UTF-8")
.build();
}
@@ -220,6 +245,9 @@ public class XmlIO {
abstract long getMinBundleSize();
+ @Nullable
+ abstract String getCharset();
+
abstract Builder<T> toBuilder();
@AutoValue.Builder
@@ -236,6 +264,8 @@ public class XmlIO {
abstract Builder<T> setCompressionType(CompressionType compressionType);
+ abstract Builder<T> setCharset(String charset);
+
abstract Read<T> build();
}
@@ -325,6 +355,13 @@ public class XmlIO {
return toBuilder().setCompressionType(compressionType).build();
}
+ /**
+ * Sets the XML file charset.
+ */
+ public Read<T> withCharset(String charset) {
+ return toBuilder().setCharset(charset).build();
+ }
+
@Override
public void validate(PBegin input) {
checkNotNull(
@@ -336,6 +373,9 @@ public class XmlIO {
checkNotNull(
getRecordClass(),
"recordClass is null. Use builder method withRecordClass() to set this.");
+ checkNotNull(
+ getCharset(),
+ "charset is null. Use builder method withCharset() to set this.");
}
@Override
@@ -351,7 +391,9 @@ public class XmlIO {
.addIfNotNull(
DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element"))
.addIfNotNull(
- DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class"));
+ DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class"))
+ .addIfNotNull(
+ DisplayData.item("charset", getCharset()).withLabel("Charset"));
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/ffc77813/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
index 876c782..1eb0e06 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
@@ -185,9 +185,11 @@ public class XmlSource<T> extends FileBasedSource<T> {
byte[] dummyStartDocumentBytes =
(String.format(
- "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>",
+ "<?xml version=\"%s\" encoding=\""
+ + getCurrentSource().spec.getCharset()
+ + "\"?><%s>",
XML_VERSION, getCurrentSource().spec.getRootElement()))
- .getBytes(StandardCharsets.UTF_8);
+ .getBytes(getCurrentSource().spec.getCharset());
preambleByteBuffer.write(dummyStartDocumentBytes);
// Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This
// method returns the offset and stores any bytes that should be used when creating the XML
@@ -339,7 +341,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
this.parser = xmlInputFactory.createXMLStreamReader(
new SequenceInputStream(
new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)),
- "UTF-8");
+ getCurrentSource().spec.getCharset());
// Current offset should be the offset before reading the record element.
while (true) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ffc77813/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
index 5b33be3..9321ac3 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
@@ -158,6 +158,11 @@ public class XmlSourceTest {
+ "</train>"
+ "</trains>";
+ String trainXMLWithISO88591 =
+ "<trains>"
+ + "<train size=\"small\"><name>C�dric</name><number>7</number><color>blue</color></train>"
+ + "</trains>";
+
@XmlRootElement
static class Train {
public static final int TRAIN_NUMBER_UNDEFINED = -1;
@@ -594,6 +599,50 @@ public class XmlSourceTest {
}
@Test
+ public void testReadXMLWithCharset() throws IOException {
+ File file = tempFolder.newFile("trainXMLISO88591");
+ Files.write(file.toPath(), trainXMLWithISO88591.getBytes(StandardCharsets.ISO_8859_1));
+
+ PCollection<Train> output =
+ p.apply("ReadFileData",
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
+ .withRootElement("trains")
+ .withRecordElement("train")
+ .withRecordClass(Train.class)
+ .withMinBundleSize(1024)
+ .withCharset(StandardCharsets.ISO_8859_1.name()));
+
+ List<Train> expectedResults =
+ ImmutableList.of(new Train("C�dric", 7, "blue", "small"));
+
+ PAssert.that(output).containsInAnyOrder(expectedResults);
+ p.run();
+ }
+
+ @Test
+ public void testReadXMLWithCharsetAsString() throws IOException {
+ File file = tempFolder.newFile("trainXMLISO88591");
+ Files.write(file.toPath(), trainXMLWithISO88591.getBytes(StandardCharsets.ISO_8859_1));
+
+ PCollection<Train> output =
+ p.apply("ReadFileData",
+ XmlIO.<Train>read()
+ .from(file.toPath().toString())
+ .withRootElement("trains")
+ .withRecordElement("train")
+ .withRecordClass(Train.class)
+ .withMinBundleSize(1024)
+ .withCharset("ISO-8859-1"));
+
+ List<Train> expectedResults =
+ ImmutableList.of(new Train("C�dric", 7, "blue", "small"));
+
+ PAssert.that(output).containsInAnyOrder(expectedResults);
+ p.run();
+ }
+
+ @Test
@Category(NeedsRunner.class)
public void testReadXMLSmallPipeline() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");