You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by FelixNeutatz <gi...@git.apache.org> on 2014/11/14 12:55:40 UTC

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

GitHub user FelixNeutatz opened a pull request:

    https://github.com/apache/incubator-flink/pull/201

    enable CSV Reader to ignore invalid lines like an empty line at the end and comments 

    Corresponding to issue FLINK-1208:
    enable CSV Reader to ignore invalid lines like an empty line at the end, multiple header lines or comments without throwing an exception

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/FelixNeutatz/incubator-flink csvreader

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #201
    
----
commit 66727a4085c917d5a8a77e11bb5560d1b338f38a
Author: fel <ne...@googlemail.com>
Date:   2014-11-14T11:48:24Z

    enable CSV Reader to ignore invalid lines like an empty line at the end, multiple header lines or comments without throwing an exception

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63943006
  
    Any further remarks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20606237
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---
    @@ -269,6 +283,21 @@ public void open(FileInputSplit split) throws IOException {
     	
     	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
     		
    +		if (commentPrefix != null) {
    +			//check record for comments
    --- End diff --
    
    Your logic checks for comments anywhere in a line. It would even identify comments which are contained in an escaped String. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-64355493
  
    I don't like the name of the method: "enableComments()". It does not enable comments, what would that even mean. In my opinion it should be called "ignoreComments()".
    
    Also, there is no support for this in the Scala API. We could add it by adding an optional parameter to readCsvFile(), like this:
    ```scala
    def readCsvFile[T <: Product : ClassTag : TypeInformation](
          filePath: String,
          lineDelimiter: String = "\n",
          fieldDelimiter: Char = ',',
          ignoreFirstLine: Boolean = false,
          ignoreComments: String = null,
          lenient: Boolean = false,
          includedFields: Array[Int] = null): DataSet[T] = {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20606791
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java ---
    @@ -271,6 +271,138 @@ public void testSparseParseWithIndices() {
     	}
     	
     	@Test
    +	public void testIgnoreInvalidInput() throws IOException {
    +		try {
    +			final String fileContent = "#description of the data\n" + 
    +									   "header1|header2|header3|\n"+
    +									   "this is|1|2.0|\n"+
    +									   "//a comment\n" +
    +									   "a test|3|4.0|\n" +
    +									   "#next|5|6.0|\n";
    +			
    +			final FileInputSplit split = createTempFile(fileContent);	
    +		
    +			final Configuration parameters = new Configuration();
    +			format.setFieldDelimiter('|');
    +			format.setFieldTypesGeneric(StringValue.class, IntValue.class, DoubleValue.class);
    +			format.setLenient(true);
    +			
    +			format.configure(parameters);
    +			format.open(split);
    +			
    +			Value[] values;
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("this is", ((StringValue) values[0]).getValue());
    +			assertEquals(1, ((IntValue) values[1]).getValue());
    +			assertEquals(2.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("a test", ((StringValue) values[0]).getValue());
    +			assertEquals(3, ((IntValue) values[1]).getValue());
    +			assertEquals(4.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("#next", ((StringValue) values[0]).getValue());
    +			assertEquals(5, ((IntValue) values[1]).getValue());
    +			assertEquals(6.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
    +		}
    +	}
    +	
    +	@Test
    +	public void testIgnoreSingleCharPrefixComments() throws IOException {
    +		try {
    +			final String fileContent = "#description of the data\n" + 
    +									   "this is|1|2.0|\n"+
    +									   "a test|3|4.0|#comment after record\n" +
    +									   "#next|5|6.0|\n";
    +			
    +			final FileInputSplit split = createTempFile(fileContent);	
    +		
    +			final Configuration parameters = new Configuration();
    +			format.setFieldDelimiter('|');
    +			format.setFieldTypesGeneric(StringValue.class, IntValue.class, DoubleValue.class);
    +			format.setCommentPrefix("#");
    +			
    +			format.configure(parameters);
    +			format.open(split);
    +			
    +			Value[] values;
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("this is", ((StringValue) values[0]).getValue());
    +			assertEquals(1, ((IntValue) values[1]).getValue());
    +			assertEquals(2.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("a test", ((StringValue) values[0]).getValue());
    +			assertEquals(3, ((IntValue) values[1]).getValue());
    +			assertEquals(4.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
    +		}
    +	}
    +	
    +	@Test
    +	public void testIgnoreMultiCharPrefixComments() throws IOException {
    +		try {
    --- End diff --
    
    Can you also check multiple successive commented lines?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20431709
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---
    @@ -41,6 +41,8 @@
     	
     	private static final char DEFAULT_FIELD_DELIMITER = ',';
     	
    +	private static final String DEFAULT_COMMENT_START = "#";
    --- End diff --
    
    The default value is not used. Can it be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-64327364
  
    There are already tests for this case in GenericCsvInputFormatTest: testReadInvalidContents()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63105660
  
    I have added one minor fix in the runtime of Flink and some tests which show that it works (CsvInputFormatTest.java).
    
    Now, the question is: What do you prefer: Either my solution which enables skipping of any invalid input (all kinds of comments, headers, invalid entries) or one special function which allows to skip certain comment structures. It's up to you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63704747
  
    Any other suggestions for improvement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63951010
  
    It's up to you, I can also put the check into the CSVIF class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20382601
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---
    @@ -211,8 +213,9 @@ public void invoke() throws Exception {
     							// as long as there is data to read
     							while (!this.taskCanceled && !format.reachedEnd()) {
     								// build next pair and ship pair if it is valid
    -								if ((record = format.nextRecord(record)) != null) {
    -									output.collect(record);
    +								OT r;
    --- End diff --
    
    see above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63620115
  
    Looks good so far. Minor comments inline...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20707584
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---
    @@ -486,8 +536,42 @@ public boolean reachedEnd() {
     	@Override
     	public OT nextRecord(OT record) throws IOException {
     		if (readLine()) {
    -			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			if (commentPrefix != null) {
    +				//check record for comments
    +				Boolean isComment = true;
    +				for (int i = 0; i < commentPrefix.length; i++) {
    +					if (commentPrefix[i] != this.currBuffer[this.currOffset + i]) {
    +						isComment = false;
    +						break;
    +					}
    +				}
    +				if (isComment) {
    +					this.commentCount++;
    +					return nextRecord(record);
    +				}
    +			}
    +			
    +			OT returnRecord = readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			
    +			if (returnRecord == null) {
    --- End diff --
    
    Yes, you're right, sorry. Can you update the JavaDoc for ```readRecord()`` and add the handling of ``null`` returns (ignored and next line read)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20567213
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---
    @@ -269,6 +281,23 @@ public void open(FileInputSplit split) throws IOException {
     	
     	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
     		
    +		if(commentStart != null) {
    +			//check record for comments
    +			String s = new String(bytes, offset, numBytes).trim();
    +			int commentStartIndex = s.indexOf(commentStart);
    +			
    +			//delete record if the whole line was a comment
    +			if(commentStartIndex == 0) {
    +				return false;
    +			}
    +			//delete comment at the end of a valid record
    +			if(commentStartIndex != -1) {
    +				numBytes = numBytes - commentStartIndex + 1;
    +			}
    +		}
    +		
    +		Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
    --- End diff --
    
    It is better to initialize the logger once statically per class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-66785727
  
    Looks good. Thanks!
    Will merge this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20849398
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -137,6 +239,7 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			}
     			return reuse;
     		} else {
    +			this.invalidLineCount++;
    --- End diff --
    
    There is checking for input correctness in GenericCsvInputFormat.parseRecord() and the exception will be thrown their so we will never come to this line:
    
    14/11/25 09:46:36 INFO taskmanager.TaskManager: Shutting down TaskManager
    Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: org.apache.flink.api.common.io.ParseException: Row too short: invalid line
    	at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:284)
    	at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:235)
    	at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:1)
    	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:489)
    	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:203)
    	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:1)
    	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
    	at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:245)
    	at java.lang.Thread.run(Thread.java:745)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20706010
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---
    @@ -486,8 +536,42 @@ public boolean reachedEnd() {
     	@Override
     	public OT nextRecord(OT record) throws IOException {
     		if (readLine()) {
    -			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			if (commentPrefix != null) {
    +				//check record for comments
    +				Boolean isComment = true;
    +				for (int i = 0; i < commentPrefix.length; i++) {
    +					if (commentPrefix[i] != this.currBuffer[this.currOffset + i]) {
    +						isComment = false;
    +						break;
    +					}
    +				}
    +				if (isComment) {
    +					this.commentCount++;
    +					return nextRecord(record);
    +				}
    +			}
    +			
    +			OT returnRecord = readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			
    +			if (returnRecord == null) {
    +				this.invalidLineCount++;
    +				return nextRecord(record);
    +			}
    +			
    +			return returnRecord;
     		} else {
    +			if (this.invalidLineCount > 0) {
    +				if (LOG.isWarnEnabled()) {
    +					LOG.warn("In the current split " + this.invalidLineCount +" invalid line(s) were skipped.");
    --- End diff --
    
    I'd log the filename and startPos of the FileInputSplit instead of "current".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20760662
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -130,6 +216,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			numBytes--;
     		}
     		
    +		if (commentPrefix != null && commentPrefix.length <= numBytes) {
    +			//check record for comments
    +			Boolean isComment = true;
    +			for (int i = 0; i < commentPrefix.length; i++) {
    +				if (commentPrefix[i] != bytes[offset + i]) {
    +					isComment = false;
    +					break;
    +				}
    +			}
    +			if (isComment) {
    +				this.commentCount++;
    +				return nextRecord(reuse);
    --- End diff --
    
    Fabian told me to not return null:  "That's what I meant by letting the DelimitedInputFormat handling invalid lines. I would not give the null value back to the DataSourceTask, but instead let the DelimitedInputFormat catch this and try to call readRecord() until a valid record is return and hand that to the DataSourceTask.
    I am actually surprised that giving a null value to the data source does not cause a NPE." (cite by Fabian)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20759430
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -130,6 +216,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			numBytes--;
     		}
     		
    +		if (commentPrefix != null && commentPrefix.length <= numBytes) {
    +			//check record for comments
    +			Boolean isComment = true;
    +			for (int i = 0; i < commentPrefix.length; i++) {
    +				if (commentPrefix[i] != bytes[offset + i]) {
    +					isComment = false;
    +					break;
    +				}
    +			}
    +			if (isComment) {
    +				this.commentCount++;
    +				return nextRecord(reuse);
    --- End diff --
    
    This call results in recursive calls. For files with a lot of successive comments, this will result in a stack overflow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-64171804
  
    @fhueske: You were right, it was possible without accessing the variables within the CsvInputFormat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20760689
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -130,6 +216,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			numBytes--;
     		}
     		
    +		if (commentPrefix != null && commentPrefix.length <= numBytes) {
    +			//check record for comments
    +			Boolean isComment = true;
    +			for (int i = 0; i < commentPrefix.length; i++) {
    +				if (commentPrefix[i] != bytes[offset + i]) {
    +					isComment = false;
    +					break;
    +				}
    +			}
    +			if (isComment) {
    +				this.commentCount++;
    +				return nextRecord(reuse);
    --- End diff --
    
    My intention was to capture the null in the DelimitedInputFormat before it is given to the DataSourceTask.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63979680
  
    Looks good to me. Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-65870781
  
    Unfortunately not, there are a bunch of conflicts during all the different commits and I recognized I have to update my repo more often by the upstream. I hope, I figure out a way to a clean repo again^^


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20382587
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---
    @@ -192,8 +192,9 @@ public void invoke() throws Exception {
     							// as long as there is data to read
     							while (!this.taskCanceled && !format.reachedEnd()) {
     								// build next pair and ship pair if it is valid
    -								if ((record = format.nextRecord(record)) != null) {
    -									output.collect(record);
    +								OT r;
    --- End diff --
    
    These modifications change Flink's behavior for any type of InputFormat, not just CSVInputFormats.
    If we want to allow to skip invalid lines, this is not the right place to add this feature.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63122212
  
    Thanks for your PR!
    
    I think there are a few issues with your approach. For example, a CSV file that starts with a String field will not be skipped if it starts with a comment character such as '#' or '//'. Also, your changes on the DataSourceTask have implications for all InputFormats which is definitely not desired.
    
    IMO, it is necessary to explicitly specify a comment string and check for it at the beginning of each line.
    
    Skipping invalid lines is also a good feature in my opinion. It would be good to inform the user about invalid lines. Maybe counting the number of invalid line for each split and emit a log statement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63054779
  
    Thank you for the pull request.
    I think the issue's author also wanted to have a way to specify a comment character (for example #). I think the pull request does not cover such a feature.
    
    Can you add a test case for the change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-64274077
  
    Thanks for the quick update cycles! 
    There is one more thing. I think the `lenient = false` case (strict checking) is not working correctly, yet. Neither the DelimitedIF or the DataSourceTasks are checking for that.
    
    Apart from that, it looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20675590
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -130,6 +132,13 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			numBytes--;
     		}
     		
    +		//help to prevent null pointer exception in the case of invalid input
    --- End diff --
    
    That's what I meant by letting the DelimitedInputFormat handling invalid lines. I would not give the null value back to the DataSourceTask, but instead let hte DelimitedInputFormat catch this and try to call `readRecord()` until a valid record is return and hand that to the DataSourceTask.
    I am actually surprised that giving a null value to the data source does not cause a NPE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-64166021
  
    Why should CsvInputFormat require access to currBuffer, etc?
    
    If configured lenient, `DelimitedInputFormat.nextRecord()` calls `readLine()` and `readRecord()` until `readRecord()` returns a non-null value (or `readLine()` is false). This way, `CsvInputFormat.readRecord()` is called with the correct buffer, offset, and length information. In both cases of comment and invalid lines, `CsvInputFormat.readRecord()` returns null.
    
    If not configured lenient, DelimitedInputFormat raises an exception if a null value is returned. 
    
    Logging should still happen in CsvInputFormat, because DelimitedIF cannot distinguish between invalid and comment lines.
    
    Would that work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63620906
  
    You have given the option to use multi-char comment prefixes (like `//`). This should also be tested - right now all tests test single char comment prefixes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20382695
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -130,6 +130,8 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			numBytes--;
     		}
     		
    +		
    +		
    --- End diff --
    
    It's not a big deal in this case, but try to avoid unnecessary reformatting to keep diffs clean and lean ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63970474
  
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-65894376
  
    If you need help, feel free to contact me ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20706013
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---
    @@ -486,8 +536,42 @@ public boolean reachedEnd() {
     	@Override
     	public OT nextRecord(OT record) throws IOException {
     		if (readLine()) {
    -			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			if (commentPrefix != null) {
    +				//check record for comments
    +				Boolean isComment = true;
    +				for (int i = 0; i < commentPrefix.length; i++) {
    +					if (commentPrefix[i] != this.currBuffer[this.currOffset + i]) {
    +						isComment = false;
    +						break;
    +					}
    +				}
    +				if (isComment) {
    +					this.commentCount++;
    +					return nextRecord(record);
    +				}
    +			}
    +			
    +			OT returnRecord = readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			
    +			if (returnRecord == null) {
    +				this.invalidLineCount++;
    +				return nextRecord(record);
    +			}
    +			
    +			return returnRecord;
     		} else {
    +			if (this.invalidLineCount > 0) {
    +				if (LOG.isWarnEnabled()) {
    +					LOG.warn("In the current split " + this.invalidLineCount +" invalid line(s) were skipped.");
    +				}
    +			}
    +			
    +			if (this.commentCount > 0) {
    +				if (LOG.isInfoEnabled()) {
    +					LOG.info("In the current split " + this.commentCount +" comment line(s) were skipped.");
    --- End diff --
    
    Same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20382595
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---
    @@ -202,8 +203,9 @@ public void invoke() throws Exception {
     							// as long as there is data to read
     							while (!this.taskCanceled && !format.reachedEnd()) {
     								// build next pair and ship pair if it is valid
    -								if ((record = format.nextRecord(record)) != null) {
    -									output.collect(record);
    +								OT r;
    --- End diff --
    
    see above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63295734
  
    The update looks good.
    
    The `CSVReaderTest` is still failing - see Travis build for details...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20706109
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---
    @@ -486,8 +536,42 @@ public boolean reachedEnd() {
     	@Override
     	public OT nextRecord(OT record) throws IOException {
     		if (readLine()) {
    -			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			if (commentPrefix != null) {
    +				//check record for comments
    +				Boolean isComment = true;
    +				for (int i = 0; i < commentPrefix.length; i++) {
    +					if (commentPrefix[i] != this.currBuffer[this.currOffset + i]) {
    +						isComment = false;
    +						break;
    +					}
    +				}
    +				if (isComment) {
    +					this.commentCount++;
    +					return nextRecord(record);
    +				}
    +			}
    +			
    +			OT returnRecord = readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			
    +			if (returnRecord == null) {
    --- End diff --
    
    I'd not skip invalid lines by default. Add a parameter to enable this feature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20670967
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -130,6 +132,13 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			numBytes--;
     		}
     		
    +		//help to prevent null pointer exception in the case of invalid input
    --- End diff --
    
    in DataSourceTask the previous record is used to get the next record:
    
    ```java
    while (!this.taskCanceled && !format.reachedEnd()) {
    	// build next pair and ship pair if it is valid
    	if ((record = format.nextRecord(record)) != null) {
    		output.collect(record);
    	}
    }
    ```
    The problem is: when there was an invalid record, the function nextRecord has returned null and  the null record will stop the loop to get more records ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-flink/pull/201


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63952053
  
    I'd move the check to the CSVFormat. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63385024
  
    Unfortunately, tests unpredicably fail on Travis sometimes. Three different errors in one build is quite rare though...
    Who merges this PR will double check that everything is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20567310
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---
    @@ -90,6 +94,10 @@ public int getNumberOfNonNullFields() {
     	public char getFieldDelimiter() {
     		return fieldDelim;
     	}
    +	
    +	public String getCommentStart() {
    --- End diff --
    
    Maybe "comment prefix" would be nicer than "comment start"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20759425
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -117,10 +177,36 @@ public void open(FileInputSplit split) throws IOException {
     		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
     			this.lineDelimiterIsLinebreak = true;
     		}
    +		
    +		this.commentCount = 0;
    +		this.invalidLineCount = 0;
    +		
    +		isLogged = false;
    +	}
    +	
    +	@Override
    +	public OUT nextRecord(OUT record) throws IOException {
    +		OUT returnrecord = super.nextRecord(record);
    +		if (reachedEnd() && !isLogged) {
    --- End diff --
    
    Can we move this to the `close()` method? Method? Would keep this method a bit tighter...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63054970
  
    Also, have a look here: http://flink.incubator.apache.org/coding_guidelines.html
    The commit name should contain the JIRA number.
    You can update the commit name and the pull request by force-pushing into the branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20826354
  
    --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java ---
    @@ -48,6 +48,143 @@
     	private static final String FIRST_PART = "That is the first part";
     	
     	private static final String SECOND_PART = "That is the second part";
    +	
    +	@Test
    +	public void ignoreInvalidLines() {
    +		try {
    +			
    +			
    +			final String fileContent =  "#description of the data\n" + 
    +										"header1|header2|header3|\n"+
    +										"this is|1|2.0|\n"+
    +										"//a comment\n" +
    +										"a test|3|4.0|\n" +
    +										"#next|5|6.0|\n";
    +			
    +			final FileInputSplit split = createTempFile(fileContent);
    +			
    +			CsvInputFormat<Tuple3<String, Integer, Double>> format = 
    +					new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|',  String.class, Integer.class, Double.class);
    +			format.setLenient(true);
    +		
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +			
    +			
    +			Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.f0);
    +			assertEquals(new Integer(1), result.f1);
    +			assertEquals(new Double(2.0), result.f2);
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.f0);
    +			assertEquals(new Integer(3), result.f1);
    +			assertEquals(new Double(4.0), result.f2);
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("#next", result.f0);
    +			assertEquals(new Integer(5), result.f1);
    +			assertEquals(new Double(6.0), result.f2);
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +	
    +	@Test
    +	public void ignoreSingleCharPrefixComments() {
    +		try {
    +			final String fileContent = "#description of the data\n" +
    +									   "#successive commented line\n" +
    +									   "this is|1|2.0|\n" +
    +									   "a test|3|4.0|\n" +
    +									   "#next|5|6.0|\n";
    +			
    +			final FileInputSplit split = createTempFile(fileContent);
    +			
    +			CsvInputFormat<Tuple3<String, Integer, Double>> format = 
    +					new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
    +			format.setCommentPrefix("#");
    +		
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +			
    +			Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.f0);
    +			assertEquals(new Integer(1), result.f1);
    +			assertEquals(new Double(2.0), result.f2);
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.f0);
    +			assertEquals(new Integer(3), result.f1);
    +			assertEquals(new Double(4.0), result.f2);
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +	
    +	@Test
    +	public void ignoreMultiCharPrefixComments() {
    +		try {
    +			
    +			
    +			final String fileContent = "//description of the data\n" +
    +									   "//successive commented line\n" +
    +									   "this is|1|2.0|\n"+
    +									   "a test|3|4.0|\n" +
    +									   "//next|5|6.0|\n";
    +			
    +			final FileInputSplit split = createTempFile(fileContent);
    +			
    +			CsvInputFormat<Tuple3<String, Integer, Double>> format = 
    +					new CsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", '|', String.class, Integer.class, Double.class);
    +			format.setCommentPrefix("//");
    +		
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +			
    +			Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.f0);
    +			assertEquals(new Integer(1), result.f1);
    +			assertEquals(new Double(2.0), result.f2);
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.f0);
    +			assertEquals(new Integer(3), result.f1);
    +			assertEquals(new Double(4.0), result.f2);
    +			
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    --- End diff --
    
    Can you add a test case that checks for correct behavior of `lenient = false`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20826143
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -137,6 +239,7 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			}
     			return reuse;
     		} else {
    +			this.invalidLineCount++;
    --- End diff --
    
    The DelimitedIF forwards the returned `null` to the DataSourceTask where it is ignored. Hence, there is no checking for input correctness. If the CsvIF was configured with lenient = false, we need to raise an exception here instead of returning null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-64100176
  
    So I would need to iterate over the content of DelimitedInputFormat.nextRecord():
    
    ```java
    if (readLine()) {
    	return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    } else {
    	this.end = true;
    	return null;
    }
    ```
    The problem is, in CsvInputFormat I have no access to this.currBuffer, this.currOffset, this.currLen. Therefore I see only two alternatives:
    1) I copy the functionality of readLine() to CsvInputFormat 
    2) change access of the private variables this.currBuffer, this.currOffset, this.currLen in DelimitedInputFormat to protected or add corresponding getFunctions ...
    
    What is the better solution in your opinion


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-65861146
  
    Did you manage to solve the merge conflicts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63382456
  
    Can anyone tell me, why my latest commit failed?
    I only removed two unused constants and the previous commit passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20606989
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---
    @@ -279,6 +308,9 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu
     			// check valid start position
     			if (startPos >= limit) {
     				if (lenient) {
    +					if (LOG.isWarnEnabled()) {
    --- End diff --
    
    I think it might be even better to do the handling of invalid lines in the DelimitedInputFormat as it delegates the parsing of lines and could read the next line if something fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20705816
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---
    @@ -486,8 +536,42 @@ public boolean reachedEnd() {
     	@Override
     	public OT nextRecord(OT record) throws IOException {
     		if (readLine()) {
    -			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			if (commentPrefix != null) {
    +				//check record for comments
    +				Boolean isComment = true;
    +				for (int i = 0; i < commentPrefix.length; i++) {
    +					if (commentPrefix[i] != this.currBuffer[this.currOffset + i]) {
    --- End diff --
    
    check that ``this.currBuffer[this.currOffset +i]`` does not cause IndexOutOfBoundsException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-66564141
  
    done :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20606714
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---
    @@ -279,6 +308,9 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu
     			// check valid start position
     			if (startPos >= limit) {
     				if (lenient) {
    +					if (LOG.isWarnEnabled()) {
    --- End diff --
    
    I'd put this at least at INFO level or even at DEBUG. Logging all invalid lines can become quite expensive.
    I'd also prefer to do this aggregated (count of invalid lines) at the end of each input split to reduce the logging effort.
    
    Maybe you can override `open()`, call `super.open()` and log after that...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-65086896
  
    how can I solve the merge conflicts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20849730
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -137,6 +239,7 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			}
     			return reuse;
     		} else {
    +			this.invalidLineCount++;
    --- End diff --
    
    Ah, yes, you are right!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20759427
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -130,6 +216,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			numBytes--;
     		}
     		
    +		if (commentPrefix != null && commentPrefix.length <= numBytes) {
    +			//check record for comments
    +			Boolean isComment = true;
    --- End diff --
    
    I would use a primitive type here, more efficient


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20431715
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -41,6 +41,7 @@
     	
     	public static final char DEFAULT_FIELD_DELIMITER = ',';
     
    +	public static final String DEFAULT_COMMENT_START = "#";
    --- End diff --
    
    The default value is not used. Can it be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20567259
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---
    @@ -279,6 +308,7 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu
     			// check valid start position
     			if (startPos >= limit) {
     				if (lenient) {
    +					LOG.warn("The invalid line: \"" + new String(bytes, offset, numBytes) + "\" was skipped.");
    --- End diff --
    
    In order to make it more efficient with deactivated logging, we guard such statements with `if (LOG.isWarnEnabled()) { LOG.warn(...); }` Otherwise, the string concatenation will happen even with deactivated logging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20606157
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -130,6 +132,13 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
     			numBytes--;
     		}
     		
    +		//help to prevent null pointer exception in the case of invalid input
    --- End diff --
    
    Why is this necessary? Why would `reuse` be null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-64327879
  
    Thanks for the correction!
    PR looks good to me :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20606461
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---
    @@ -269,6 +283,21 @@ public void open(FileInputSplit split) throws IOException {
     	
     	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
     		
    +		if (commentPrefix != null) {
    +			//check record for comments
    +			String s = new String(bytes, offset, numBytes).trim();
    --- End diff --
    
    We try to avoid object instantiations as much as possible and parse the data as a `byte[]`. This effort is voided if we make the line a String for the purpose of comment checking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-65072823
  
    Good point. I'd also prefer `ìgnoreComments()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-65092024
  
    I have set `meld` as the default merge tool.
    What I do is `git rebase origin/master` and then use `git mergetool` to resolve the conflicts that occur on the way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by FelixNeutatz <gi...@git.apache.org>.
Github user FelixNeutatz commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20706975
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---
    @@ -486,8 +536,42 @@ public boolean reachedEnd() {
     	@Override
     	public OT nextRecord(OT record) throws IOException {
     		if (readLine()) {
    -			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			if (commentPrefix != null) {
    +				//check record for comments
    +				Boolean isComment = true;
    +				for (int i = 0; i < commentPrefix.length; i++) {
    +					if (commentPrefix[i] != this.currBuffer[this.currOffset + i]) {
    +						isComment = false;
    +						break;
    +					}
    +				}
    +				if (isComment) {
    +					this.commentCount++;
    +					return nextRecord(record);
    +				}
    +			}
    +			
    +			OT returnRecord = readRecord(record, this.currBuffer, this.currOffset, this.currLen);
    +			
    +			if (returnRecord == null) {
    --- End diff --
    
    Invalid lines are not skipped by default: CsvReader.class
    
    ```java
    protected boolean ignoreInvalidLines = false;
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/201#discussion_r20706556
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java ---
    @@ -271,6 +271,138 @@ public void testSparseParseWithIndices() {
     	}
     	
     	@Test
    +	public void testIgnoreInvalidInput() throws IOException {
    +		try {
    +			final String fileContent = "#description of the data\n" + 
    +									   "header1|header2|header3|\n"+
    +									   "this is|1|2.0|\n"+
    +									   "//a comment\n" +
    +									   "a test|3|4.0|\n" +
    +									   "#next|5|6.0|\n";
    +			
    +			final FileInputSplit split = createTempFile(fileContent);	
    +		
    +			final Configuration parameters = new Configuration();
    +			format.setFieldDelimiter('|');
    +			format.setFieldTypesGeneric(StringValue.class, IntValue.class, DoubleValue.class);
    +			format.setLenient(true);
    +			
    +			format.configure(parameters);
    +			format.open(split);
    +			
    +			Value[] values;
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("this is", ((StringValue) values[0]).getValue());
    +			assertEquals(1, ((IntValue) values[1]).getValue());
    +			assertEquals(2.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("a test", ((StringValue) values[0]).getValue());
    +			assertEquals(3, ((IntValue) values[1]).getValue());
    +			assertEquals(4.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("#next", ((StringValue) values[0]).getValue());
    +			assertEquals(5, ((IntValue) values[1]).getValue());
    +			assertEquals(6.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
    +		}
    +	}
    +	
    +	@Test
    +	public void testIgnoreSingleCharPrefixComments() throws IOException {
    +		try {
    +			final String fileContent = "#description of the data\n" + 
    +									   "this is|1|2.0|\n"+
    +									   "a test|3|4.0|#comment after record\n" +
    +									   "#next|5|6.0|\n";
    +			
    +			final FileInputSplit split = createTempFile(fileContent);	
    +		
    +			final Configuration parameters = new Configuration();
    +			format.setFieldDelimiter('|');
    +			format.setFieldTypesGeneric(StringValue.class, IntValue.class, DoubleValue.class);
    +			format.setCommentPrefix("#");
    +			
    +			format.configure(parameters);
    +			format.open(split);
    +			
    +			Value[] values;
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("this is", ((StringValue) values[0]).getValue());
    +			assertEquals(1, ((IntValue) values[1]).getValue());
    +			assertEquals(2.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertEquals("a test", ((StringValue) values[0]).getValue());
    +			assertEquals(3, ((IntValue) values[1]).getValue());
    +			assertEquals(4.0, ((DoubleValue) values[2]).getValue(), 0.001);
    +			
    +			values = format.nextRecord(new Value[] { new StringValue(), new IntValue(), new DoubleValue() });
    +			assertNull(values);
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
    +		}
    +	}
    +	
    +	@Test
    +	public void testIgnoreMultiCharPrefixComments() throws IOException {
    +		try {
    --- End diff --
    
    what about this test case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: enable CSV Reader to ignore invalid ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/201#issuecomment-63948901
  
    Looks quite good. A few comments in line.
    I'm not sure if we should do the comment check in the DelimitedIF or in the CSVIF. Doing it in the DelimitedIF, it means overhead for all other IFs that extend it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---