You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/08 20:23:25 UTC
[3/4] flink git commit: [FLINK-3921] Add support to set encoding in
CsvReader and StringParser.
[FLINK-3921] Add support to set encoding in CsvReader and StringParser.
This closes #2060.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2186af6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2186af6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2186af6
Branch: refs/heads/master
Commit: f2186af6702c9fe48c91d5c2d7748378984cd29b
Parents: 2d8f03e
Author: Joshi <re...@gmail.com>
Authored: Wed Jun 1 13:26:47 2016 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 8 18:47:36 2016 +0100
----------------------------------------------------------------------
.../api/common/io/GenericCsvInputFormat.java | 64 ++++++++++----------
.../apache/flink/types/parser/FieldParser.java | 38 +++++++++---
.../apache/flink/types/parser/StringParser.java | 8 +--
.../common/io/GenericCsvInputFormatTest.java | 40 ++++++++++--
.../types/parser/VarLengthStringParserTest.java | 20 ++++++
.../org/apache/flink/api/java/io/CsvReader.java | 24 +++++++-
.../apache/flink/api/java/io/CSVReaderTest.java | 9 +++
7 files changed, 155 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 85d9cd8..0ced22b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -25,14 +25,12 @@ import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;
-
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;
@@ -48,9 +46,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
- /** The default charset to convert strings to bytes */
- private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-
+ /** The charset used to convert strings to bytes */
+ private Charset charset = Charset.forName("UTF-8");
+
private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -107,6 +105,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
super(filePath, null);
}
+ protected GenericCsvInputFormat(Path filePath, Charset charset) {
+ this(filePath);
+ this.charset = Preconditions.checkNotNull(charset);
+ }
+
// --------------------------------------------------------------------------------------------
public int getNumberOfFieldsTotal() {
@@ -121,32 +124,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
return commentPrefix;
}
- public void setCommentPrefix(byte[] commentPrefix) {
- this.commentPrefix = commentPrefix;
- }
-
- public void setCommentPrefix(char commentPrefix) {
- setCommentPrefix(String.valueOf(commentPrefix));
- }
-
public void setCommentPrefix(String commentPrefix) {
- setCommentPrefix(commentPrefix, UTF_8_CHARSET);
+ setCommentPrefix(commentPrefix, charset);
}
- public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
- if (charsetName == null) {
- throw new IllegalArgumentException("Charset name must not be null");
- }
-
- if (commentPrefix != null) {
- Charset charset = Charset.forName(charsetName);
- setCommentPrefix(commentPrefix, charset);
- } else {
- this.commentPrefix = null;
- }
- }
-
- public void setCommentPrefix(String commentPrefix, Charset charset) {
+ private void setCommentPrefix(String commentPrefix, Charset charset) {
if (charset == null) {
throw new IllegalArgumentException("Charset must not be null");
}
@@ -174,7 +156,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
public void setFieldDelimiter(String delimiter) {
- this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET);
+ this.fieldDelim = delimiter.getBytes(charset);
}
public boolean isLenient() {
@@ -314,6 +296,25 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
this.fieldIncluded = includedMask;
}
+ /**
+ * Gets the character set for the parser. Default is set to UTF-8.
+ *
+ * @return The charset for the parser.
+ */
+ Charset getCharset() {
+ return this.charset;
+ }
+
+ /**
+ * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
+ * when doing a parse.
+ *
+ * @param charset The character set to set.
+ */
+ public void setCharset(Charset charset) {
+ this.charset = Preconditions.checkNotNull(charset);
+ }
+
// --------------------------------------------------------------------------------------------
// Runtime methods
// --------------------------------------------------------------------------------------------
@@ -334,6 +335,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);
+ p.setCharset(this.getCharset());
if (this.quotedStringParsing) {
if (p instanceof StringParser) {
((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);
@@ -449,7 +451,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
// search for ending quote character, continue when it is escaped
i++;
- while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){
+ while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)) {
i++;
}
i++;
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 200d239..d9eeecc 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -19,11 +19,6 @@
package org.apache.flink.types.parser;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.types.ByteValue;
@@ -34,6 +29,12 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* A FieldParser is used parse a field from a sequence of bytes. Fields occur in a byte sequence and are terminated
* by the end of the byte sequence or a delimiter.
@@ -77,9 +78,11 @@ public abstract class FieldParser<T> {
/** Invalid Boolean value **/
BOOLEAN_INVALID
}
-
+
+ private Charset charset = Charset.forName("UTF-8");
+
private ParseErrorState errorState = ParseErrorState.NONE;
-
+
/**
* Parses the value of a field from the byte array, taking care of properly reset
* the state of this parser.
@@ -217,7 +220,26 @@ public abstract class FieldParser<T> {
return limitedLength;
}
-
+
+ /*
+ * Gets the Charset for the parser.Default is set to ASCII
+ *
+ * @return The charset for the parser.
+ */
+ public Charset getCharset() {
+ return this.charset;
+ }
+
+ /**
+ * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
+ * when doing a parse.
+ *
+ * @param charset The charset to set.
+ */
+ public void setCharset(Charset charset) {
+ this.charset = charset;
+ }
+
// --------------------------------------------------------------------------------------------
// Mapping from types to parsers
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index 1a2c7e3..7b46a7e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -63,11 +63,11 @@ public class StringParser extends FieldParser<String> {
// check for proper termination
if (i == limit) {
// either by end of line
- this.result = new String(bytes, startPos + 1, i - startPos - 2);
+ this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset());
return limit;
} else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) {
// or following field delimiter
- this.result = new String(bytes, startPos + 1, i - startPos - 2);
+ this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset());
return i + delimiter.length;
} else {
// no proper termination
@@ -87,14 +87,14 @@ public class StringParser extends FieldParser<String> {
if (limit == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
}
- this.result = new String(bytes, startPos, limit - startPos);
+ this.result = new String(bytes, startPos, limit - startPos, getCharset());
return limit;
} else {
// delimiter found.
if (i == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
}
- this.result = new String(bytes, startPos, i - startPos);
+ this.result = new String(bytes, startPos, i - startPos, getCharset());
return i + delimiter.length;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index e3215c6..d063ddc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -28,6 +28,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.GZIPOutputStream;
@@ -485,7 +486,7 @@ public class GenericCsvInputFormatTest {
fail("Input format accepted on invalid input.");
}
catch (ParseException e) {
- ; // all good
+ // all good
}
}
catch (Exception ex) {
@@ -547,7 +548,38 @@ public class GenericCsvInputFormatTest {
fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
}
}
-
+
+ @Test
+ public void testReadWithCharset() throws IOException {
+ try {
+ final String fileContent = "\u00bf|Flink|\u00f1";
+ final FileInputSplit split = createTempFile(fileContent);
+
+ final Configuration parameters = new Configuration();
+
+ format.setCharset(Charset.forName("UTF-8"));
+ format.setFieldDelimiter("|");
+ format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class);
+
+ format.configure(parameters);
+ format.open(split);
+
+ Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()};
+
+ values = format.nextRecord(values);
+ assertNotNull(values);
+ assertEquals("\u00bf", ((StringValue) values[0]).getValue());
+ assertEquals("Flink", ((StringValue) values[1]).getValue());
+ assertEquals("\u00f1", ((StringValue) values[2]).getValue());
+
+ assertNull(format.nextRecord(values));
+ assertTrue(format.reachedEnd());
+ }
+ catch (Exception ex) {
+ fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
+ }
+ }
+
@Test
public void readWithEmptyField() {
try {
@@ -722,7 +754,7 @@ public class GenericCsvInputFormatTest {
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
}
- private final Value[] createIntValues(int num) {
+ private Value[] createIntValues(int num) {
Value[] v = new Value[num];
for (int i = 0; i < num; i++) {
@@ -732,7 +764,7 @@ public class GenericCsvInputFormatTest {
return v;
}
- private final Value[] createLongValues(int num) {
+ private Value[] createLongValues(int num) {
Value[] v = new Value[num];
for (int i = 0; i < num; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 1fe8850..1c5579e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Test;
+import java.nio.charset.Charset;
+
public class VarLengthStringParserTest {
public StringValueParser parser = new StringValueParser();
@@ -194,4 +196,22 @@ public class VarLengthStringParserTest {
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos < 0);
}
+
+ @Test
+ public void testParseValidMixedStringsWithCharset() {
+
+ Charset charset = Charset.forName("US-ASCII");
+ this.parser = new StringValueParser();
+ this.parser.enableQuotedStringParsing((byte) '@');
+
+ // check valid strings with out whitespaces and trailing delimiter
+ byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+ StringValue s = new StringValue();
+
+ int startPos = 0;
+ parser.setCharset(charset);
+ startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s);
+ assertTrue(startPos == 11);
+ assertTrue(s.getValue().equals("abcde|gh"));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 8be5dc2..b13b8aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.io;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
@@ -64,6 +65,8 @@ public class CsvReader {
protected boolean skipFirstLineAsHeader = false;
protected boolean ignoreInvalidLines = false;
+
+ private Charset charset = Charset.forName("UTF-8");
// --------------------------------------------------------------------------------------------
@@ -157,7 +160,25 @@ public class CsvReader {
this.commentPrefix = commentPrefix;
return this;
}
-
+
+ /**
+ * Gets the character set for the reader. Default is set to UTF-8.
+ *
+ * @return The charset for the reader.
+ */
+ public Charset getCharset() {
+ return this.charset;
+ }
+
+ /**
+ * Sets the charset of the reader
+ *
+ * @param charset The character set to set.
+ */
+ public void setCharset(Charset charset) {
+ this.charset = Preconditions.checkNotNull(charset);
+ }
+
/**
* Configures which fields of the CSV file should be included and which should be skipped. The
* parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
@@ -340,6 +361,7 @@ public class CsvReader {
format.setCommentPrefix(this.commentPrefix);
format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
format.setLenient(ignoreInvalidLines);
+ format.setCharset(this.charset);
if (this.parseQuotedStrings) {
format.enableQuotedStringParsing(this.quoteCharacter);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index 8b12315..e1c8023 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Arrays;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -75,6 +76,14 @@ public class CSVReaderTest {
reader.ignoreComments("#");
assertEquals("#", reader.commentPrefix);
}
+
+ @Test
+ public void testCharset() {
+ CsvReader reader = getCsvReader();
+ assertEquals(reader.getCharset(), Charset.forName("UTF-8"));
+ reader.setCharset(Charset.forName("US-ASCII"));
+ assertEquals(reader.getCharset(), Charset.forName("US-ASCII"));
+ }
@Test
public void testIncludeFieldsDense() {