You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/12/12 16:50:05 UTC

incubator-flink git commit: [FLINK-1208] Enable CsvInputFormats to ignore invalid lines and lines starting with comments

Repository: incubator-flink
Updated Branches:
  refs/heads/master e0a4ee070 -> e47365bc0


[FLINK-1208] Enable CsvInputFormats to ignore invalid lines and lines starting with comments

This closes #201


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e47365bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e47365bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e47365bc

Branch: refs/heads/master
Commit: e47365bc0eed07a57682ea75f71991e35981cc82
Parents: e0a4ee0
Author: fel <ne...@googlemail.com>
Authored: Fri Nov 14 12:48:24 2014 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 12 16:41:02 2014 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/CsvInputFormat.java       | 106 +++++++++++++-
 .../org/apache/flink/api/java/io/CsvReader.java |  37 +++++
 .../apache/flink/api/java/io/CSVReaderTest.java |  19 +++
 .../flink/api/java/io/CsvInputFormatTest.java   | 137 +++++++++++++++++++
 .../scala/operators/ScalaCsvInputFormat.java    | 101 +++++++++++++-
 .../flink/api/scala/ExecutionEnvironment.scala  |   3 +
 .../flink/api/scala/io/CsvInputFormatTest.scala |  86 +++++++++++-
 7 files changed, 485 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index a851517..fe2ae14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -20,6 +20,9 @@ package org.apache.flink.api.java.io;
 
 
 import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.UnsupportedCharsetException;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -29,7 +32,10 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 
 
@@ -37,6 +43,11 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 
 	private static final long serialVersionUID = 1L;
 	
+	/**
+	 * The log.
+	 */
+	private static final Logger LOG = LoggerFactory.getLogger(CsvInputFormat.class);
+	
 	public static final String DEFAULT_LINE_DELIMITER = "\n";
 	
 	public static final char DEFAULT_FIELD_DELIMITER = ',';
@@ -44,10 +55,16 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 
 	private transient Object[] parsedValues;
 	
+	private byte[] commentPrefix = null;
+	
 	// To speed up readRecord processing. Used to find windows line endings.
 	// It is set when open so that readRecord does not have to evaluate it
 	private boolean lineDelimiterIsLinebreak = false;
 	
+	private transient int commentCount;
+
+	private transient int invalidLineCount;
+	
 	
 	public CsvInputFormat(Path filePath) {
 		super(filePath);
@@ -66,6 +83,48 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 		setFieldTypes(types);
 	}
 	
+	
+	public byte[] getCommentPrefix() {
+		return commentPrefix;
+	}
+	
+	public void setCommentPrefix(byte[] commentPrefix) {
+		this.commentPrefix = commentPrefix;
+	}
+	
+	public void setCommentPrefix(char commentPrefix) {
+		setCommentPrefix(String.valueOf(commentPrefix));
+	}
+	
+	public void setCommentPrefix(String commentPrefix) {
+		setCommentPrefix(commentPrefix, Charsets.UTF_8);
+	}
+	
+	public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
+		if (charsetName == null) {
+			throw new IllegalArgumentException("Charset name must not be null");
+		}
+		
+		if (commentPrefix != null) {
+			Charset charset = Charset.forName(charsetName);
+			setCommentPrefix(commentPrefix, charset);
+		} else {
+			this.commentPrefix = null;
+		}
+	}
+	
+	public void setCommentPrefix(String commentPrefix, Charset charset) {
+		if (charset == null) {
+			throw new IllegalArgumentException("Charset must not be null");
+		}
+		if (commentPrefix != null) {
+			this.commentPrefix = commentPrefix.getBytes(charset);
+		} else {
+			this.commentPrefix = null;
+		}
+	}
+	
+	
 	public void setFieldTypes(Class<?> ... fieldTypes) {
 		if (fieldTypes == null || fieldTypes.length == 0) {
 			throw new IllegalArgumentException("Field types must not be null or empty.");
@@ -117,10 +176,39 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
 			this.lineDelimiterIsLinebreak = true;
 		}
+		
+		this.commentCount = 0;
+		this.invalidLineCount = 0;
+	}
+	
+	@Override
+	public void close() throws IOException {
+		if (this.invalidLineCount > 0) {
+			if (LOG.isWarnEnabled()) {
+					LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+			}
+		}
+		
+		if (this.commentCount > 0) {
+			if (LOG.isInfoEnabled()) {
+				LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
+			}
+		}
+		super.close();
+	}
+	
+	@Override
+	public OUT nextRecord(OUT record) throws IOException {
+		OUT returnRecord = null;
+		do {
+			returnRecord = super.nextRecord(record);
+		} while (returnRecord == null && !reachedEnd());
+		
+		return returnRecord;
 	}
 
 	@Override
-	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
+	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
 		/*
 		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
 		 */
@@ -130,6 +218,21 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 			numBytes--;
 		}
 		
+		if (commentPrefix != null && commentPrefix.length <= numBytes) {
+			//check record for comments
+			boolean isComment = true;
+			for (int i = 0; i < commentPrefix.length; i++) {
+				if (commentPrefix[i] != bytes[offset + i]) {
+					isComment = false;
+					break;
+				}
+			}
+			if (isComment) {
+				this.commentCount++;
+				return null;
+			}
+		}
+		
 		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
 			// valid parse, map values into pact record
 			for (int i = 0; i < parsedValues.length; i++) {
@@ -137,6 +240,7 @@ public class CsvInputFormat<OUT extends Tuple> extends GenericCsvInputFormat<OUT
 			}
 			return reuse;
 		} else {
+			this.invalidLineCount++;
 			return null;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 014a465..f4c9e32 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -50,9 +50,13 @@ public class CsvReader {
 	protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER;
 	
 	protected char fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
+	
+	protected String commentPrefix = null; //default: no comments
 
 	protected boolean skipFirstLineAsHeader = false;
 	
+	protected boolean ignoreInvalidLines = false;
+	
 	// --------------------------------------------------------------------------------------------
 	
 	public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
@@ -102,6 +106,23 @@ public class CsvReader {
 	}
 	
 	/**
+	 * Configures the string that starts comments.
+	 * By default comments will be treated as invalid lines.
+	 * This function only recognizes comments which start at the beginning of the line!
+	 * 
+	 * @param commentPrefix The string that starts the comments.
+	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public CsvReader ignoreComments(String commentPrefix) {
+		if (commentPrefix == null || commentPrefix.length() == 0) {
+			throw new IllegalArgumentException("The comment prefix must not be null or an empty string");
+		}
+		
+		this.commentPrefix = commentPrefix;
+		return this;
+	}
+	
+	/**
 	 * Configures which fields of the CSV file should be included and which should be skipped. The
 	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
 	 * array. The parser will skip over all fields where the boolean value at the corresponding position
@@ -212,6 +233,20 @@ public class CsvReader {
 		skipFirstLineAsHeader = true;
 		return this;
 	}
+	
+	/**
+	 * Sets the CSV reader to ignore any invalid lines. 
+	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
+	 * 
+	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public CsvReader ignoreInvalidLines(){
+		ignoreInvalidLines = true;
+		return this;
+	}
+	
+	
+	
 	/**
 	 * Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
 	 * {@link Tuple}. The type information for the fields is obtained from the type class. The type
@@ -246,7 +281,9 @@ public class CsvReader {
 	private void configureInputFormat(CsvInputFormat<?> format, Class<?>... types) {
 		format.setDelimiter(this.lineDelimiter);
 		format.setFieldDelimiter(this.fieldDelimiter);
+		format.setCommentPrefix(this.commentPrefix);
 		format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
+		format.setLenient(ignoreInvalidLines);
 		if (this.includedMask == null) {
 			format.setFieldTypes(types);
 		} else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index 1217b3d..6676cd1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.api.java.io;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 import java.util.Arrays;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -46,6 +49,22 @@ public class CSVReaderTest {
 	}
 	
 	@Test
+	public void testIgnoreInvalidLinesConfigure() {
+		CsvReader reader = getCsvReader();
+		Assert.assertFalse(reader.ignoreInvalidLines);
+		reader.ignoreInvalidLines();
+		Assert.assertTrue(reader.ignoreInvalidLines);
+	}
+	
+	@Test
+	public void testIgnoreComments() {
+		CsvReader reader = getCsvReader();
+		assertNull(reader.commentPrefix);
+		reader.ignoreComments("#");
+		assertEquals("#", reader.commentPrefix);
+	}
+	
+	@Test
 	public void testIncludeFieldsDense() {
 		CsvReader reader = getCsvReader();
 		reader.includeFields(true, true, true);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 5f10a2b..c335db1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -53,6 +53,143 @@ public class CsvInputFormatTest {
 	private static final String FIRST_PART = "That is the first part";
 	
 	private static final String SECOND_PART = "That is the second part";
+	
+	@Test
+	public void ignoreInvalidLines() {
+		try {
+			
+			
+			final String fileContent =  "#description of the data\n" + 
+										"header1|header2|header3|\n"+
+										"this is|1|2.0|\n"+
+										"//a comment\n" +
+										"a test|3|4.0|\n" +
+										"#next|5|6.0|\n";
+			
+			final FileInputSplit split = createTempFile(fileContent);
+			
+			CsvInputFormat<Tuple3<String, Integer, Double>> format = 
+					new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|',  String.class, Integer.class, Double.class);
+			format.setLenient(true);
+		
+			final Configuration parameters = new Configuration();
+			format.configure(parameters);
+			format.open(split);
+			
+			
+			Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
+			result = format.nextRecord(result);
+			assertNotNull(result);
+			assertEquals("this is", result.f0);
+			assertEquals(new Integer(1), result.f1);
+			assertEquals(new Double(2.0), result.f2);
+			
+			result = format.nextRecord(result);
+			assertNotNull(result);
+			assertEquals("a test", result.f0);
+			assertEquals(new Integer(3), result.f1);
+			assertEquals(new Double(4.0), result.f2);
+			
+			result = format.nextRecord(result);
+			assertNotNull(result);
+			assertEquals("#next", result.f0);
+			assertEquals(new Integer(5), result.f1);
+			assertEquals(new Double(6.0), result.f2);
+
+			result = format.nextRecord(result);
+			assertNull(result);
+		}
+		catch (Exception ex) {
+			ex.printStackTrace();
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+	}
+	
+	@Test
+	public void ignoreSingleCharPrefixComments() {
+		try {
+			final String fileContent = "#description of the data\n" +
+									   "#successive commented line\n" +
+									   "this is|1|2.0|\n" +
+									   "a test|3|4.0|\n" +
+									   "#next|5|6.0|\n";
+			
+			final FileInputSplit split = createTempFile(fileContent);
+			
+			CsvInputFormat<Tuple3<String, Integer, Double>> format = 
+					new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
+			format.setCommentPrefix("#");
+		
+			final Configuration parameters = new Configuration();
+			format.configure(parameters);
+			format.open(split);
+			
+			Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
+			
+			result = format.nextRecord(result);
+			assertNotNull(result);
+			assertEquals("this is", result.f0);
+			assertEquals(new Integer(1), result.f1);
+			assertEquals(new Double(2.0), result.f2);
+			
+			result = format.nextRecord(result);
+			assertNotNull(result);
+			assertEquals("a test", result.f0);
+			assertEquals(new Integer(3), result.f1);
+			assertEquals(new Double(4.0), result.f2);
+
+			result = format.nextRecord(result);
+			assertNull(result);
+		}
+		catch (Exception ex) {
+			ex.printStackTrace();
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+	}
+	
+	@Test
+	public void ignoreMultiCharPrefixComments() {
+		try {
+			
+			
+			final String fileContent = "//description of the data\n" +
+									   "//successive commented line\n" +
+									   "this is|1|2.0|\n"+
+									   "a test|3|4.0|\n" +
+									   "//next|5|6.0|\n";
+			
+			final FileInputSplit split = createTempFile(fileContent);
+			
+			CsvInputFormat<Tuple3<String, Integer, Double>> format = 
+					new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
+			format.setCommentPrefix("//");
+		
+			final Configuration parameters = new Configuration();
+			format.configure(parameters);
+			format.open(split);
+			
+			Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
+			
+			result = format.nextRecord(result);
+			assertNotNull(result);
+			assertEquals("this is", result.f0);
+			assertEquals(new Integer(1), result.f1);
+			assertEquals(new Double(2.0), result.f2);
+			
+			result = format.nextRecord(result);
+			assertNotNull(result);
+			assertEquals("a test", result.f0);
+			assertEquals(new Integer(3), result.f1);
+			assertEquals(new Double(4.0), result.f2);
+			
+			result = format.nextRecord(result);
+			assertNull(result);
+		}
+		catch (Exception ex) {
+			ex.printStackTrace();
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+	}
 
 	@Test
 	public void readStringFields() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
index abe46a0..97cbd5c 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.scala.operators;
 
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
@@ -30,7 +31,13 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.util.StringUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.IllegalCharsetNameException;
+import java.nio.charset.UnsupportedCharsetException;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -39,6 +46,8 @@ import scala.Product;
 public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFormat<OUT> {
 
 	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ScalaCsvInputFormat.class);
 	
 	private transient Object[] parsedValues;
 	
@@ -47,7 +56,12 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
 	private boolean lineDelimiterIsLinebreak = false;
 
 	private final TupleSerializerBase<OUT> serializer;
-	
+
+	private byte[] commentPrefix = null;
+
+	private transient int commentCount;
+	private transient int invalidLineCount;
+
 	public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
 		super(filePath);
 
@@ -80,6 +94,72 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
 
 		setFieldsGeneric(sourceFieldIndices, fieldTypes);
 	}
+
+	public byte[] getCommentPrefix() {
+		return commentPrefix;
+	}
+
+	public void setCommentPrefix(byte[] commentPrefix) {
+		this.commentPrefix = commentPrefix;
+	}
+
+	public void setCommentPrefix(char commentPrefix) {
+		setCommentPrefix(String.valueOf(commentPrefix));
+	}
+
+	public void setCommentPrefix(String commentPrefix) {
+		setCommentPrefix(commentPrefix, Charsets.UTF_8);
+	}
+
+	public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
+		if (charsetName == null) {
+			throw new IllegalArgumentException("Charset name must not be null");
+		}
+
+		if (commentPrefix != null) {
+			Charset charset = Charset.forName(charsetName);
+			setCommentPrefix(commentPrefix, charset);
+		} else {
+			this.commentPrefix = null;
+		}
+	}
+
+	public void setCommentPrefix(String commentPrefix, Charset charset) {
+		if (charset == null) {
+			throw new IllegalArgumentException("Charset must not be null");
+		}
+		if (commentPrefix != null) {
+			this.commentPrefix = commentPrefix.getBytes(charset);
+		} else {
+			this.commentPrefix = null;
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (this.invalidLineCount > 0) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
+			}
+		}
+
+		if (this.commentCount > 0) {
+			if (LOG.isInfoEnabled()) {
+				LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
+			}
+		}
+		super.close();
+	}
+
+	@Override
+	public OUT nextRecord(OUT record) throws IOException {
+		OUT returnRecord = null;
+		do {
+			returnRecord = super.nextRecord(record);
+		} while (returnRecord == null && !reachedEnd());
+
+		return returnRecord;
+	}
 	
 	@Override
 	public void open(FileInputSplit split) throws IOException {
@@ -98,6 +178,9 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
 		for (int i = 0; i < fieldParsers.length; i++) {
 			this.parsedValues[i] = fieldParsers[i].createValue();
 		}
+
+		this.commentCount = 0;
+		this.invalidLineCount = 0;
 		
 		// left to right evaluation makes access [0] okay
 		// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
@@ -116,11 +199,27 @@ public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFor
 			//reduce the number of bytes so that the Carriage return is not taken as data
 			numBytes--;
 		}
+
+		if (commentPrefix != null && commentPrefix.length <= numBytes) {
+			//check record for comments
+			boolean isComment = true;
+			for (int i = 0; i < commentPrefix.length; i++) {
+				if (commentPrefix[i] != bytes[offset + i]) {
+					isComment = false;
+					break;
+				}
+			}
+			if (isComment) {
+				this.commentCount++;
+				return null;
+			}
+		}
 		
 		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
 			OUT result = serializer.createInstance(parsedValues);
 			return result;
 		} else {
+			this.invalidLineCount++;
 			return null;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 4792e58..e756e78 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -155,6 +155,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * @param lineDelimiter The string that separates lines, defaults to newline.
    * @param fieldDelimiter The char that separates individual fields, defaults to ','.
    * @param ignoreFirstLine Whether the first line in the file should be ignored.
+   * @param ignoreComments Lines that start with the given String are ignored, disabled by default.
    * @param lenient Whether the parser should silently ignore malformed lines.
    * @param includedFields The fields in the file that should be read. Per default all fields
    *                       are read.
@@ -164,6 +165,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
       lineDelimiter: String = "\n",
       fieldDelimiter: Char = ',',
       ignoreFirstLine: Boolean = false,
+      ignoreComments: String = null,
       lenient: Boolean = false,
       includedFields: Array[Int] = null): DataSet[T] = {
 
@@ -174,6 +176,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     inputFormat.setFieldDelimiter(fieldDelimiter)
     inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
     inputFormat.setLenient(lenient)
+    inputFormat.setCommentPrefix(ignoreComments)
 
     val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity)
     for (i <- 0 until typeInfo.getArity) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e47365bc/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 9c90788..54314f7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.scala.io
 
 import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
+import org.junit.Assert._
 import org.junit.Assert.assertEquals
 import org.junit.Assert.assertNotNull
 import org.junit.Assert.assertNull
@@ -28,8 +29,7 @@ import java.io.FileOutputStream
 import java.io.FileWriter
 import java.io.OutputStreamWriter
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.FileInputSplit
-import org.apache.flink.core.fs.Path
+import org.apache.flink.core.fs.{FileInputSplit, Path}
 import org.junit.Test
 import org.apache.flink.api.scala._
 
@@ -39,6 +39,88 @@ class CsvInputFormatTest {
   private final val FIRST_PART: String = "That is the first part"
   private final val SECOND_PART: String = "That is the second part"
 
+
+
+  @Test
+  def ignoreSingleCharPrefixComments():Unit = {
+    try {
+      val fileContent = "#description of the data\n" +
+                        "#successive commented line\n" +
+                        "this is|1|2.0|\n" +
+                        "a test|3|4.0|\n" +
+                        "#next|5|6.0|\n"
+      val split = createTempFile(fileContent)
+      val format = new ScalaCsvInputFormat[(String, Integer, Double)](
+        PATH, createTypeInformation[(String, Integer, Double)])
+      format.setDelimiter("\n")
+      format.setFieldDelimiter('|')
+      format.setCommentPrefix("#")
+      val parameters = new Configuration
+      format.configure(parameters)
+      format.open(split)
+      var result: (String, Integer, Double) = null
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("this is", result._1)
+      assertEquals(new Integer(1), result._2)
+      assertEquals(2.0, result._3, 0.0001)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("a test", result._1)
+      assertEquals(new Integer(3), result._2)
+      assertEquals(4.0, result._3, 0.0001)
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+    }
+    catch {
+      case ex: Exception => {
+        ex.printStackTrace
+        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+      }
+    }
+  }
+
+  @Test
+  def ignoreMultiCharPrefixComments():Unit = {
+    try {
+      val fileContent = "//description of the data\n" +
+                        "//successive commented line\n" +
+                        "this is|1|2.0|\n" +
+                        "a test|3|4.0|\n" +
+                        "//next|5|6.0|\n"
+      val split = createTempFile(fileContent)
+      val format = new ScalaCsvInputFormat[(String, Integer, Double)](
+        PATH, createTypeInformation[(String, Integer, Double)])
+      format.setDelimiter("\n")
+      format.setFieldDelimiter('|')
+      format.setCommentPrefix("//")
+      val parameters = new Configuration
+      format.configure(parameters)
+      format.open(split)
+      var result: (String, Integer, Double) = null
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("this is", result._1)
+      assertEquals(new Integer(1), result._2)
+      assertEquals(2.0, result._3, 0.0001)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("a test", result._1)
+      assertEquals(new Integer(3), result._2)
+      assertEquals(4.0, result._3, 0.0001)
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+    }
+    catch {
+      case ex: Exception => {
+        ex.printStackTrace
+        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+      }
+    }
+  }
+
   @Test
   def readStringFields():Unit = {
     try {