You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/12/12 16:50:05 UTC
incubator-flink git commit: [FLINK-1208] Enable CsvInputFormats to
ignore invalid lines and lines starting with comments
Repository: incubator-flink
Updated Branches:
refs/heads/master e0a4ee070 -> e47365bc0
[FLINK-1208] Enable CsvInputFormats to ignore invalid lines and lines starting with comments
This closes #201
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e47365bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e47365bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e47365bc
Branch: refs/heads/master
Commit: e47365bc0eed07a57682ea75f71991e35981cc82
Parents: e0a4ee0
Author: fel <ne...@googlemail.com>
Authored: Fri Nov 14 12:48:24 2014 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 12 16:41:02 2014 +0100
----------------------------------------------------------------------
.../flink/api/java/io/CsvInputFormat.java | 106 +++++++++++++-
.../org/apache/flink/api/java/io/CsvReader.java | 37 +++++
.../apache/flink/api/java/io/CSVReaderTest.java | 19 +++
.../flink/api/java/io/CsvInputFormatTest.java | 137 +++++++++++++++++++
.../scala/operators/ScalaCsvInputFormat.java | 101 +++++++++++++-
.../flink/api/scala/ExecutionEnvironment.scala | 3 +
.../flink/api/scala/io/CsvInputFormatTest.scala | 86 +++++++++++-
7 files changed, 485 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index a851517..fe2ae14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -20,6 +20,9 @@ package org.apache.flink.api.java.io;
import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.UnsupportedCharsetException;
import java.util.Map;
import java.util.TreeMap;
@@ -29,7 +32,10 @@ import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
@@ -37,6 +43,11 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
private static final long serialVersionUID = 1L;
+ /**
+ * The log.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(CsvInputFormat.class);
+
public static final String DEFAULT_LINE_DELIMITER = "\n";
public static final char DEFAULT_FIELD_DELIMITER = ',';
@@ -44,10 +55,16 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
private transient Object[] parsedValues;
+ private byte[] commentPrefix = null;
+
// To speed up readRecord processing. Used to find windows line endings.
// It is set when open so that readRecord does not have to evaluate it
private boolean lineDelimiterIsLinebreak = false;
+ private transient int commentCount;
+
+ private transient int invalidLineCount;
+
public CsvInputFormat(Path filePath) {
super(filePath);
@@ -66,6 +83,48 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
setFieldTypes(types);
}
+
+ public byte[] getCommentPrefix() {
+ return commentPrefix;
+ }
+
+ public void setCommentPrefix(byte[] commentPrefix) {
+ this.commentPrefix = commentPrefix;
+ }
+
+ public void setCommentPrefix(char commentPrefix) {
+ setCommentPrefix(String.valueOf(commentPrefix));
+ }
+
+ public void setCommentPrefix(String commentPrefix) {
+ setCommentPrefix(commentPrefix, Charsets.UTF_8);
+ }
+
+ public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
+ if (charsetName == null) {
+ throw new IllegalArgumentException("Charset name must not be null");
+ }
+
+ if (commentPrefix != null) {
+ Charset charset = Charset.forName(charsetName);
+ setCommentPrefix(commentPrefix, charset);
+ } else {
+ this.commentPrefix = null;
+ }
+ }
+
+ public void setCommentPrefix(String commentPrefix, Charset charset) {
+ if (charset == null) {
+ throw new IllegalArgumentException("Charset must not be null");
+ }
+ if (commentPrefix != null) {
+ this.commentPrefix = commentPrefix.getBytes(charset);
+ } else {
+ this.commentPrefix = null;
+ }
+ }
+
+
public void setFieldTypes(Class<?> ... fieldTypes) {
if (fieldTypes == null || fieldTypes.length == 0) {
throw new IllegalArgumentException("Field types must not be null or empty.");
@@ -117,10 +176,39 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
this.lineDelimiterIsLinebreak = true;
}
+
+ this.commentCount = 0;
+ this.invalidLineCount = 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.invalidLineCount > 0) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+ }
+ }
+
+ if (this.commentCount > 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
+ }
+ }
+ super.close();
+ }
+
+ @Override
+ public OUT nextRecord(OUT record) throws IOException {
+ OUT returnRecord = null;
+ do {
+ returnRecord = super.nextRecord(record);
+ } while (returnRecord == null && !reachedEnd());
+
+ return returnRecord;
}
@Override
- public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
+ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
/*
* Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
*/
@@ -130,6 +218,21 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
numBytes--;
}
+ if (commentPrefix != null && commentPrefix.length <= numBytes) {
+ //check record for comments
+ boolean isComment = true;
+ for (int i = 0; i < commentPrefix.length; i++) {
+ if (commentPrefix[i] != bytes[offset + i]) {
+ isComment = false;
+ break;
+ }
+ }
+ if (isComment) {
+ this.commentCount++;
+ return null;
+ }
+ }
+
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
// valid parse, map values into pact record
for (int i = 0; i < parsedValues.length; i++) {
@@ -137,6 +240,7 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
}
return reuse;
} else {
+ this.invalidLineCount++;
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 014a465..f4c9e32 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -50,9 +50,13 @@ public class CsvReader {
protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER;
protected char fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
+
+ protected String commentPrefix = null; //default: no comments
protected boolean skipFirstLineAsHeader = false;
+ protected boolean ignoreInvalidLines = false;
+
// --------------------------------------------------------------------------------------------
public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
@@ -102,6 +106,23 @@ public class CsvReader {
}
/**
+ * Configures the string that starts comments.
+ * By default comments will be treated as invalid lines.
+ * This function only recognizes comments which start at the beginning of the line!
+ *
+ * @param commentPrefix The string that starts the comments.
+ * @return The CSV reader instance itself, to allow for fluent function chaining.
+ */
+ public CsvReader ignoreComments(String commentPrefix) {
+ if (commentPrefix == null || commentPrefix.length() == 0) {
+ throw new IllegalArgumentException("The comment prefix must not be null or an empty string");
+ }
+
+ this.commentPrefix = commentPrefix;
+ return this;
+ }
+
+ /**
* Configures which fields of the CSV file should be included and which should be skipped. The
* parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
* array. The parser will skip over all fields where the boolean value at the corresponding position
@@ -212,6 +233,20 @@ public class CsvReader {
skipFirstLineAsHeader = true;
return this;
}
+
+ /**
+ * Sets the CSV reader to ignore any invalid lines.
+ * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
+ *
+ * @return The CSV reader instance itself, to allow for fluent function chaining.
+ */
+ public CsvReader ignoreInvalidLines(){
+ ignoreInvalidLines = true;
+ return this;
+ }
+
+
+
/**
* Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
* {@link Tuple}. The type information for the fields is obtained from the type class. The type
@@ -246,7 +281,9 @@ public class CsvReader {
private void configureInputFormat(CsvInputFormat<?> format, Class<?>... types) {
format.setDelimiter(this.lineDelimiter);
format.setFieldDelimiter(this.fieldDelimiter);
+ format.setCommentPrefix(this.commentPrefix);
format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
+ format.setLenient(ignoreInvalidLines);
if (this.includedMask == null) {
format.setFieldTypes(types);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index 1217b3d..6676cd1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.api.java.io;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
import java.util.Arrays;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -46,6 +49,22 @@ public class CSVReaderTest {
}
@Test
+ public void testIgnoreInvalidLinesConfigure() {
+ CsvReader reader = getCsvReader();
+ Assert.assertFalse(reader.ignoreInvalidLines);
+ reader.ignoreInvalidLines();
+ Assert.assertTrue(reader.ignoreInvalidLines);
+ }
+
+ @Test
+ public void testIgnoreComments() {
+ CsvReader reader = getCsvReader();
+ assertNull(reader.commentPrefix);
+ reader.ignoreComments("#");
+ assertEquals("#", reader.commentPrefix);
+ }
+
+ @Test
public void testIncludeFieldsDense() {
CsvReader reader = getCsvReader();
reader.includeFields(true, true, true);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 5f10a2b..c335db1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -53,6 +53,143 @@ public class CsvInputFormatTest {
private static final String FIRST_PART = "That is the first part";
private static final String SECOND_PART = "That is the second part";
+
+ @Test
+ public void ignoreInvalidLines() {
+ try {
+
+
+ final String fileContent = "#description of the data\n" +
+ "header1|header2|header3|\n"+
+ "this is|1|2.0|\n"+
+ "//a comment\n" +
+ "a test|3|4.0|\n" +
+ "#next|5|6.0|\n";
+
+ final FileInputSplit split = createTempFile(fileContent);
+
+ CsvInputFormat<Tuple3<String, Integer, Double>> format =
+ new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
+ format.setLenient(true);
+
+ final Configuration parameters = new Configuration();
+ format.configure(parameters);
+ format.open(split);
+
+
+ Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("this is", result.f0);
+ assertEquals(new Integer(1), result.f1);
+ assertEquals(new Double(2.0), result.f2);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("a test", result.f0);
+ assertEquals(new Integer(3), result.f1);
+ assertEquals(new Double(4.0), result.f2);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("#next", result.f0);
+ assertEquals(new Integer(5), result.f1);
+ assertEquals(new Double(6.0), result.f2);
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void ignoreSingleCharPrefixComments() {
+ try {
+ final String fileContent = "#description of the data\n" +
+ "#successive commented line\n" +
+ "this is|1|2.0|\n" +
+ "a test|3|4.0|\n" +
+ "#next|5|6.0|\n";
+
+ final FileInputSplit split = createTempFile(fileContent);
+
+ CsvInputFormat<Tuple3<String, Integer, Double>> format =
+ new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
+ format.setCommentPrefix("#");
+
+ final Configuration parameters = new Configuration();
+ format.configure(parameters);
+ format.open(split);
+
+ Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("this is", result.f0);
+ assertEquals(new Integer(1), result.f1);
+ assertEquals(new Double(2.0), result.f2);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("a test", result.f0);
+ assertEquals(new Integer(3), result.f1);
+ assertEquals(new Double(4.0), result.f2);
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void ignoreMultiCharPrefixComments() {
+ try {
+
+
+ final String fileContent = "//description of the data\n" +
+ "//successive commented line\n" +
+ "this is|1|2.0|\n"+
+ "a test|3|4.0|\n" +
+ "//next|5|6.0|\n";
+
+ final FileInputSplit split = createTempFile(fileContent);
+
+ CsvInputFormat<Tuple3<String, Integer, Double>> format =
+ new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
+ format.setCommentPrefix("//");
+
+ final Configuration parameters = new Configuration();
+ format.configure(parameters);
+ format.open(split);
+
+ Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("this is", result.f0);
+ assertEquals(new Integer(1), result.f1);
+ assertEquals(new Double(2.0), result.f2);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("a test", result.f0);
+ assertEquals(new Integer(3), result.f1);
+ assertEquals(new Double(4.0), result.f2);
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+ }
@Test
public void readStringFields() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
index abe46a0..97cbd5c 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.scala.operators;
+import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
@@ -30,7 +31,13 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.UnsupportedCharsetException;
import java.util.Map;
import java.util.TreeMap;
@@ -39,6 +46,8 @@ import scala.Product;
public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFormat<OUT> {
private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ScalaCsvInputFormat.class);
private transient Object[] parsedValues;
@@ -47,7 +56,12 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
private boolean lineDelimiterIsLinebreak = false;
private final TupleSerializerBase<OUT> serializer;
-
+
+ private byte[] commentPrefix = null;
+
+ private transient int commentCount;
+ private transient int invalidLineCount;
+
public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
super(filePath);
@@ -80,6 +94,72 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
setFieldsGeneric(sourceFieldIndices, fieldTypes);
}
+
+ public byte[] getCommentPrefix() {
+ return commentPrefix;
+ }
+
+ public void setCommentPrefix(byte[] commentPrefix) {
+ this.commentPrefix = commentPrefix;
+ }
+
+ public void setCommentPrefix(char commentPrefix) {
+ setCommentPrefix(String.valueOf(commentPrefix));
+ }
+
+ public void setCommentPrefix(String commentPrefix) {
+ setCommentPrefix(commentPrefix, Charsets.UTF_8);
+ }
+
+ public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
+ if (charsetName == null) {
+ throw new IllegalArgumentException("Charset name must not be null");
+ }
+
+ if (commentPrefix != null) {
+ Charset charset = Charset.forName(charsetName);
+ setCommentPrefix(commentPrefix, charset);
+ } else {
+ this.commentPrefix = null;
+ }
+ }
+
+ public void setCommentPrefix(String commentPrefix, Charset charset) {
+ if (charset == null) {
+ throw new IllegalArgumentException("Charset must not be null");
+ }
+ if (commentPrefix != null) {
+ this.commentPrefix = commentPrefix.getBytes(charset);
+ } else {
+ this.commentPrefix = null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.invalidLineCount > 0) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+ }
+ }
+
+ if (this.commentCount > 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
+ }
+ }
+ super.close();
+ }
+
+ @Override
+ public OUT nextRecord(OUT record) throws IOException {
+ OUT returnRecord = null;
+ do {
+ returnRecord = super.nextRecord(record);
+ } while (returnRecord == null && !reachedEnd());
+
+ return returnRecord;
+ }
@Override
public void open(FileInputSplit split) throws IOException {
@@ -98,6 +178,9 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
for (int i = 0; i < fieldParsers.length; i++) {
this.parsedValues[i] = fieldParsers[i].createValue();
}
+
+ this.commentCount = 0;
+ this.invalidLineCount = 0;
// left to right evaluation makes access [0] okay
// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
@@ -116,11 +199,27 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
//reduce the number of bytes so that the Carriage return is not taken as data
numBytes--;
}
+
+ if (commentPrefix != null && commentPrefix.length <= numBytes) {
+ //check record for comments
+ boolean isComment = true;
+ for (int i = 0; i < commentPrefix.length; i++) {
+ if (commentPrefix[i] != bytes[offset + i]) {
+ isComment = false;
+ break;
+ }
+ }
+ if (isComment) {
+ this.commentCount++;
+ return null;
+ }
+ }
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
OUT result = serializer.createInstance(parsedValues);
return result;
} else {
+ this.invalidLineCount++;
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 4792e58..e756e78 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -155,6 +155,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* @param lineDelimiter The string that separates lines, defaults to newline.
* @param fieldDelimiter The char that separates individual fields, defaults to ','.
* @param ignoreFirstLine Whether the first line in the file should be ignored.
+ * @param ignoreComments Lines that start with the given String are ignored, disabled by default.
* @param lenient Whether the parser should silently ignore malformed lines.
* @param includedFields The fields in the file that should be read. Per default all fields
* are read.
@@ -164,6 +165,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
lineDelimiter: String = "\n",
fieldDelimiter: Char = ',',
ignoreFirstLine: Boolean = false,
+ ignoreComments: String = null,
lenient: Boolean = false,
includedFields: Array[Int] = null): DataSet[T] = {
@@ -174,6 +176,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
inputFormat.setFieldDelimiter(fieldDelimiter)
inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
inputFormat.setLenient(lenient)
+ inputFormat.setCommentPrefix(ignoreComments)
val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity)
for (i <- 0 until typeInfo.getArity) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 9c90788..54314f7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.io
import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
+import org.junit.Assert._
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
@@ -28,8 +29,7 @@ import java.io.FileOutputStream
import java.io.FileWriter
import java.io.OutputStreamWriter
import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.FileInputSplit
-import org.apache.flink.core.fs.Path
+import org.apache.flink.core.fs.{FileInputSplit, Path}
import org.junit.Test
import org.apache.flink.api.scala._
@@ -39,6 +39,88 @@ class CsvInputFormatTest {
private final val FIRST_PART: String = "That is the first part"
private final val SECOND_PART: String = "That is the second part"
+
+
+ @Test
+ def ignoreSingleCharPrefixComments():Unit = {
+ try {
+ val fileContent = "#description of the data\n" +
+ "#successive commented line\n" +
+ "this is|1|2.0|\n" +
+ "a test|3|4.0|\n" +
+ "#next|5|6.0|\n"
+ val split = createTempFile(fileContent)
+ val format = new ScalaCsvInputFormat[(String, Integer, Double)](
+ PATH, createTypeInformation[(String, Integer, Double)])
+ format.setDelimiter("\n")
+ format.setFieldDelimiter('|')
+ format.setCommentPrefix("#")
+ val parameters = new Configuration
+ format.configure(parameters)
+ format.open(split)
+ var result: (String, Integer, Double) = null
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("this is", result._1)
+ assertEquals(new Integer(1), result._2)
+ assertEquals(2.0, result._3, 0.0001)
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("a test", result._1)
+ assertEquals(new Integer(3), result._2)
+ assertEquals(4.0, result._3, 0.0001)
+ result = format.nextRecord(result)
+ assertNull(result)
+ assertTrue(format.reachedEnd)
+ }
+ catch {
+ case ex: Exception => {
+ ex.printStackTrace
+ fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+ }
+ }
+ }
+
+ @Test
+ def ignoreMultiCharPrefixComments():Unit = {
+ try {
+ val fileContent = "//description of the data\n" +
+ "//successive commented line\n" +
+ "this is|1|2.0|\n" +
+ "a test|3|4.0|\n" +
+ "//next|5|6.0|\n"
+ val split = createTempFile(fileContent)
+ val format = new ScalaCsvInputFormat[(String, Integer, Double)](
+ PATH, createTypeInformation[(String, Integer, Double)])
+ format.setDelimiter("\n")
+ format.setFieldDelimiter('|')
+ format.setCommentPrefix("//")
+ val parameters = new Configuration
+ format.configure(parameters)
+ format.open(split)
+ var result: (String, Integer, Double) = null
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("this is", result._1)
+ assertEquals(new Integer(1), result._2)
+ assertEquals(2.0, result._3, 0.0001)
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("a test", result._1)
+ assertEquals(new Integer(3), result._2)
+ assertEquals(4.0, result._3, 0.0001)
+ result = format.nextRecord(result)
+ assertNull(result)
+ assertTrue(format.reachedEnd)
+ }
+ catch {
+ case ex: Exception => {
+ ex.printStackTrace
+ fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+ }
+ }
+ }
+
@Test
def readStringFields():Unit = {
try {