You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fpompermaier <gi...@git.apache.org> on 2016/05/13 16:21:29 UTC

[GitHub] flink pull request: FLINK-3901 - Added CsvRowInputFormat

GitHub user fpompermaier opened a pull request:

    https://github.com/apache/flink/pull/1989

    FLINK-3901 - Added CsvRowInputFormat

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fpompermaier/flink FLINK-3901

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1989.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1989
    
----
commit 4f8288f205629340aa46cad50ae232b9c2fc7439
Author: Flavio Pompermaier <f....@gmail.com>
Date:   2016-05-13T16:19:51Z

    FLINK-3901 - Added CsvRowInputFormat

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67257499
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	private void configure(String lineDelimiter, String fieldDelimiter,
    +			RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +
    +		if (rowTypeInfo.getArity() == 0) {
    +			throw new IllegalArgumentException("Row arity must be greater than 0.");
    +		}
    +
    +		if (includedFieldsMask == null) {
    +			includedFieldsMask = createDefaultMask(rowTypeInfo.getArity());
    +		}
    +
    +		rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig());
    +
    +		setDelimiter(lineDelimiter);
    +		setFieldDelimiter(fieldDelimiter);
    +
    +		Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
    +
    +		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
    +			classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
    +		}
    +
    +		setFieldsGeneric(includedFieldsMask, classes);
    +	}
    +
    +	@Override
    +	public Row fillRecord(Row reuse, Object[] parsedValues) {
    +		if (reuse == null) {
    +			reuse = new Row(rowSerializer.getLength());
    +		}
    +		for (int i = 0; i < parsedValues.length; i++) {
    +			reuse.setField(i, parsedValues[i]);
    +		}
    +		return reuse;
    +	}
    +
    +	@Override
    +	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
    +		boolean[] fieldIncluded = this.fieldIncluded;
    +
    +		int startPos = offset;
    +		final int limit = offset + numBytes;
    +
    +		for (int field = 0, output = 0; field < fieldIncluded.length; field++) {
    +
    +			// check valid start position
    +			if (startPos >= limit) {
    +				if (isLenient()) {
    +					return false;
    +				} else {
    +					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
    +				}
    +			}
    +
    +			if (fieldIncluded[field]) {
    +				// parse field
    +				@SuppressWarnings("unchecked")
    +				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
    +				int latestValidPos = startPos;
    +				startPos = parser.parseField(bytes, startPos, limit, this.getFieldDelimiter(), holders[output]);
    +				if (!isLenient() && parser.getErrorState() != ParseErrorState.NONE) {
    --- End diff --
    
    shouldn't this be something like
    
    ```
    Object[] fieldVals = new Object[numFields];
    
    if (startPos < 0) { 
      // value was not correctly parsed
      if (parser.getErrorState() == ParseErrorState.NONE) {
        // set field value to null
        fieldVals[output] = null;
        // skip field
        ...
      } else if (isLenient()) {
        // skip this Row
        return false;
      } else {
        throw new ParseException(...)
      }
    } else {
      fieldVals[output] = parser.getLastResult();
    }
    ```
      



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier closed the pull request at:

    https://github.com/apache/flink/pull/1989


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the issue:

    https://github.com/apache/flink/pull/1989
  
    Hi @twalthr, any news about this? Are you going to merge this PR in the upcoming 1.1 release?
    If you were waiting for a feedback you have my +1 about your strategy..go ahead and convert the RowCsvInputFormat class to Scala and keep the test code in Java!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67257606
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	private void configure(String lineDelimiter, String fieldDelimiter,
    +			RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +
    +		if (rowTypeInfo.getArity() == 0) {
    +			throw new IllegalArgumentException("Row arity must be greater than 0.");
    +		}
    +
    +		if (includedFieldsMask == null) {
    +			includedFieldsMask = createDefaultMask(rowTypeInfo.getArity());
    +		}
    +
    +		rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig());
    +
    +		setDelimiter(lineDelimiter);
    +		setFieldDelimiter(fieldDelimiter);
    +
    +		Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
    +
    +		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
    +			classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
    +		}
    +
    +		setFieldsGeneric(includedFieldsMask, classes);
    +	}
    +
    +	@Override
    +	public Row fillRecord(Row reuse, Object[] parsedValues) {
    +		if (reuse == null) {
    +			reuse = new Row(rowSerializer.getLength());
    +		}
    +		for (int i = 0; i < parsedValues.length; i++) {
    +			reuse.setField(i, parsedValues[i]);
    +		}
    +		return reuse;
    +	}
    +
    +	@Override
    +	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
    +		boolean[] fieldIncluded = this.fieldIncluded;
    +
    +		int startPos = offset;
    +		final int limit = offset + numBytes;
    +
    +		for (int field = 0, output = 0; field < fieldIncluded.length; field++) {
    +
    +			// check valid start position
    +			if (startPos >= limit) {
    +				if (isLenient()) {
    +					return false;
    +				} else {
    +					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
    +				}
    +			}
    +
    +			if (fieldIncluded[field]) {
    +				// parse field
    +				@SuppressWarnings("unchecked")
    +				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
    +				int latestValidPos = startPos;
    +				startPos = parser.parseField(bytes, startPos, limit, this.getFieldDelimiter(), holders[output]);
    +				if (!isLenient() && parser.getErrorState() != ParseErrorState.NONE) {
    +					// Row is able to handle null values
    +					if (parser.getErrorState() != ParseErrorState.EMPTY_STRING) {
    +						throw new ParseException(
    +								String.format("Parsing error for column %s of row '%s' originated by %s: %s.", field,
    +										new String(bytes, offset, numBytes),
    +										parser.getClass().getSimpleName(), parser.getErrorState()));
    +					}
    +				}
    +				holders[output] = parser.getLastResult();
    +
    +				// check parse result
    +				if (startPos < 0) {
    +					holders[output] = null;
    --- End diff --
    
    Let's use a separate array `fieldVals` to store the result of the parser. Otherwise we lose the reuse objects whenever we have a null value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67296019
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	private void configure(String lineDelimiter, String fieldDelimiter,
    +			RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +
    +		if (rowTypeInfo.getArity() == 0) {
    +			throw new IllegalArgumentException("Row arity must be greater than 0.");
    +		}
    +
    +		if (includedFieldsMask == null) {
    +			includedFieldsMask = createDefaultMask(rowTypeInfo.getArity());
    +		}
    +
    +		rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig());
    +
    +		setDelimiter(lineDelimiter);
    +		setFieldDelimiter(fieldDelimiter);
    +
    +		Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
    +
    +		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
    +			classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
    +		}
    +
    +		setFieldsGeneric(includedFieldsMask, classes);
    +	}
    +
    +	@Override
    +	public Row fillRecord(Row reuse, Object[] parsedValues) {
    +		if (reuse == null) {
    +			reuse = new Row(rowSerializer.getLength());
    +		}
    +		for (int i = 0; i < parsedValues.length; i++) {
    +			reuse.setField(i, parsedValues[i]);
    +		}
    +		return reuse;
    +	}
    +
    +	@Override
    +	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
    +		boolean[] fieldIncluded = this.fieldIncluded;
    +
    +		int startPos = offset;
    +		final int limit = offset + numBytes;
    +
    +		for (int field = 0, output = 0; field < fieldIncluded.length; field++) {
    +
    +			// check valid start position
    +			if (startPos >= limit) {
    +				if (isLenient()) {
    +					return false;
    +				} else {
    +					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
    +				}
    +			}
    +
    +			if (fieldIncluded[field]) {
    +				// parse field
    +				@SuppressWarnings("unchecked")
    +				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
    +				int latestValidPos = startPos;
    +				startPos = parser.parseField(bytes, startPos, limit, this.getFieldDelimiter(), holders[output]);
    +				if (!isLenient() && parser.getErrorState() != ParseErrorState.NONE) {
    +					// Row is able to handle null values
    +					if (parser.getErrorState() != ParseErrorState.EMPTY_STRING) {
    +						throw new ParseException(
    +								String.format("Parsing error for column %s of row '%s' originated by %s: %s.", field,
    +										new String(bytes, offset, numBytes),
    +										parser.getClass().getSimpleName(), parser.getErrorState()));
    +					}
    +				}
    +				holders[output] = parser.getLastResult();
    +
    +				// check parse result
    +				if (startPos < 0) {
    +					holders[output] = null;
    --- End diff --
    
    How can I pass null values back then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67257812
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java ---
    @@ -0,0 +1,1075 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.PojoTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.StringParser;
    +import org.junit.Test;
    +
    +public class RowCsvInputFormatTest {
    +
    +	private static final Path PATH = new Path("an/ignored/file/");
    +
    +	//Static variables for testing the removal of \r\n to \n
    +	private static final String FIRST_PART = "That is the first part";
    +
    +	private static final String SECOND_PART = "That is the second part";
    +
    +	@Test
    +	public void ignoreInvalidLines() {
    +		try {
    +			String fileContent =
    +					"header1|header2|header3|\n"+
    +					"this is|1|2.0|\n"+
    +					"//a comment\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setLenient(false);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			try {
    +				result = format.nextRecord(result);
    +				fail("Parse Exception was not thrown! (Invalid int value)");
    +			} catch (ParseException ex) {
    +			}
    +
    +			// if format has lenient == false this can be asserted only after FLINK-3908
    +//			result = format.nextRecord(result);
    --- End diff --
    
    Enable after FLINK-3908 was fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67257863
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java ---
    @@ -0,0 +1,1075 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.PojoTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.StringParser;
    +import org.junit.Test;
    +
    +public class RowCsvInputFormatTest {
    +
    +	private static final Path PATH = new Path("an/ignored/file/");
    +
    +	//Static variables for testing the removal of \r\n to \n
    +	private static final String FIRST_PART = "That is the first part";
    +
    +	private static final String SECOND_PART = "That is the second part";
    +
    +	@Test
    +	public void ignoreInvalidLines() {
    +		try {
    +			String fileContent =
    +					"header1|header2|header3|\n"+
    +					"this is|1|2.0|\n"+
    +					"//a comment\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setLenient(false);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			try {
    +				result = format.nextRecord(result);
    +				fail("Parse Exception was not thrown! (Invalid int value)");
    +			} catch (ParseException ex) {
    +			}
    +
    +			// if format has lenient == false this can be asserted only after FLINK-3908
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("this is", result.productElement(0));
    +//			assertEquals(new Integer(1), result.productElement(1));
    +//			assertEquals(new Double(2.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("a test", result.productElement(0));
    +//			assertEquals(new Integer(3), result.productElement(1));
    +//			assertEquals(new Double(4.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("#next", result.productElement(0));
    +//			assertEquals(new Integer(5), result.productElement(1));
    +//			assertEquals(new Double(6.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNull(result);
    +			
    +			//re-open with lenient = true
    +			format.setLenient(true);
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("header1", result.productElement(0));
    +			assertNull(result.productElement(1));
    +			assertNull(result.productElement(2));
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("#next", result.productElement(0));
    +			assertEquals(new Integer(5), result.productElement(1));
    +			assertEquals(new Double(6.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreSingleCharPrefixComments() {
    +		try {
    +			final String fileContent = 
    +					"#description of the data\n" +
    +					"#successive commented line\n" +
    +					"this is|1|2.0|\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("#");
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreMultiCharPrefixComments() {
    +		try {
    +
    +
    +			final String fileContent = "//description of the data\n" +
    +					"//successive commented line\n" +
    +					"this is|1|2.0|\n"+
    +					"a test|3|4.0|\n" +
    +					"//next|5|6.0|\n";
    +
    +			final FileInputSplit split = createTempFile(fileContent);
    +
    +			final RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,	BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			final CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("//");
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFields() {
    +		try {
    +			String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readMixedQuotedStringFields() {
    +		try {
    +			String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.enableQuotedStringParsing('@');
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a|b|c", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("|hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFieldsWithTrailingDelimiters() {
    +		try {
    +			String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] { 
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|-");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testIntegerFields() throws IOException {
    +		try {
    +			String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(
    +					new TypeInformation<?>[] {
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO
    +					});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(5);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(111), result.productElement(0));
    +			assertEquals(Integer.valueOf(222), result.productElement(1));
    +			assertEquals(Integer.valueOf(333), result.productElement(2));
    +			assertEquals(Integer.valueOf(444), result.productElement(3));
    +			assertEquals(Integer.valueOf(555), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(666), result.productElement(0));
    +			assertEquals(Integer.valueOf(777), result.productElement(1));
    +			assertEquals(Integer.valueOf(888), result.productElement(2));
    +			assertEquals(Integer.valueOf(999), result.productElement(3));
    +			assertEquals(Integer.valueOf(000), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testEmptyFields() throws IOException {
    +		try{
    +			String fileContent = 
    +					"|0|0|0|0|0|\n" +
    +					"1||1|1|1|1|\n" +
    +					"2|2||2|2|2|\n" +
    +					"3|3|3||3|3|\n" +
    +					"4|4|4|4||4|\n" +
    +					"5|5|5|5|5||\n";
    +			
    +			FileInputSplit split = createTempFile(fileContent);
    +			
    +			//TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.SHORT_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.LONG_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +//				BasicTypeInfo.FLOAT_TYPE_INFO,
    --- End diff --
    
    Can these be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67295348
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java ---
    @@ -0,0 +1,1075 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.PojoTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.StringParser;
    +import org.junit.Test;
    +
    +public class RowCsvInputFormatTest {
    +
    +	private static final Path PATH = new Path("an/ignored/file/");
    +
    +	//Static variables for testing the removal of \r\n to \n
    +	private static final String FIRST_PART = "That is the first part";
    +
    +	private static final String SECOND_PART = "That is the second part";
    +
    +	@Test
    +	public void ignoreInvalidLines() {
    +		try {
    +			String fileContent =
    +					"header1|header2|header3|\n"+
    +					"this is|1|2.0|\n"+
    +					"//a comment\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setLenient(false);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			try {
    +				result = format.nextRecord(result);
    +				fail("Parse Exception was not thrown! (Invalid int value)");
    +			} catch (ParseException ex) {
    +			}
    +
    +			// if format has lenient == false this can be asserted only after FLINK-3908
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("this is", result.productElement(0));
    +//			assertEquals(new Integer(1), result.productElement(1));
    +//			assertEquals(new Double(2.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("a test", result.productElement(0));
    +//			assertEquals(new Integer(3), result.productElement(1));
    +//			assertEquals(new Double(4.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("#next", result.productElement(0));
    +//			assertEquals(new Integer(5), result.productElement(1));
    +//			assertEquals(new Double(6.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNull(result);
    +			
    +			//re-open with lenient = true
    +			format.setLenient(true);
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("header1", result.productElement(0));
    +			assertNull(result.productElement(1));
    +			assertNull(result.productElement(2));
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("#next", result.productElement(0));
    +			assertEquals(new Integer(5), result.productElement(1));
    +			assertEquals(new Double(6.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreSingleCharPrefixComments() {
    +		try {
    +			final String fileContent = 
    +					"#description of the data\n" +
    +					"#successive commented line\n" +
    +					"this is|1|2.0|\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("#");
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreMultiCharPrefixComments() {
    +		try {
    +
    +
    +			final String fileContent = "//description of the data\n" +
    +					"//successive commented line\n" +
    +					"this is|1|2.0|\n"+
    +					"a test|3|4.0|\n" +
    +					"//next|5|6.0|\n";
    +
    +			final FileInputSplit split = createTempFile(fileContent);
    +
    +			final RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,	BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			final CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("//");
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFields() {
    +		try {
    +			String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readMixedQuotedStringFields() {
    +		try {
    +			String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.enableQuotedStringParsing('@');
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a|b|c", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("|hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFieldsWithTrailingDelimiters() {
    +		try {
    +			String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] { 
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|-");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testIntegerFields() throws IOException {
    +		try {
    +			String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(
    +					new TypeInformation<?>[] {
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO
    +					});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(5);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(111), result.productElement(0));
    +			assertEquals(Integer.valueOf(222), result.productElement(1));
    +			assertEquals(Integer.valueOf(333), result.productElement(2));
    +			assertEquals(Integer.valueOf(444), result.productElement(3));
    +			assertEquals(Integer.valueOf(555), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(666), result.productElement(0));
    +			assertEquals(Integer.valueOf(777), result.productElement(1));
    +			assertEquals(Integer.valueOf(888), result.productElement(2));
    +			assertEquals(Integer.valueOf(999), result.productElement(3));
    +			assertEquals(Integer.valueOf(000), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testEmptyFields() throws IOException {
    +		try{
    +			String fileContent = 
    +					"|0|0|0|0|0|\n" +
    +					"1||1|1|1|1|\n" +
    +					"2|2||2|2|2|\n" +
    +					"3|3|3||3|3|\n" +
    +					"4|4|4|4||4|\n" +
    +					"5|5|5|5|5||\n";
    +			
    +			FileInputSplit split = createTempFile(fileContent);
    +			
    +			//TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.SHORT_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.LONG_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +//				BasicTypeInfo.FLOAT_TYPE_INFO,
    --- End diff --
    
    Actually this is a problem of those parser and since the code in those functions is quite complicated I think it will be better if who wrote those parsers could have a look at them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67299396
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java ---
    @@ -0,0 +1,1075 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.PojoTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.StringParser;
    +import org.junit.Test;
    +
    +public class RowCsvInputFormatTest {
    +
    +	private static final Path PATH = new Path("an/ignored/file/");
    +
    +	//Static variables for testing the removal of \r\n to \n
    +	private static final String FIRST_PART = "That is the first part";
    +
    +	private static final String SECOND_PART = "That is the second part";
    +
    +	@Test
    +	public void ignoreInvalidLines() {
    +		try {
    +			String fileContent =
    +					"header1|header2|header3|\n"+
    +					"this is|1|2.0|\n"+
    +					"//a comment\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setLenient(false);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			try {
    +				result = format.nextRecord(result);
    +				fail("Parse Exception was not thrown! (Invalid int value)");
    +			} catch (ParseException ex) {
    +			}
    +
    +			// if format has lenient == false this can be asserted only after FLINK-3908
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("this is", result.productElement(0));
    +//			assertEquals(new Integer(1), result.productElement(1));
    +//			assertEquals(new Double(2.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("a test", result.productElement(0));
    +//			assertEquals(new Integer(3), result.productElement(1));
    +//			assertEquals(new Double(4.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("#next", result.productElement(0));
    +//			assertEquals(new Integer(5), result.productElement(1));
    +//			assertEquals(new Double(6.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNull(result);
    +			
    +			//re-open with lenient = true
    +			format.setLenient(true);
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("header1", result.productElement(0));
    +			assertNull(result.productElement(1));
    +			assertNull(result.productElement(2));
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("#next", result.productElement(0));
    +			assertEquals(new Integer(5), result.productElement(1));
    +			assertEquals(new Double(6.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreSingleCharPrefixComments() {
    +		try {
    +			final String fileContent = 
    +					"#description of the data\n" +
    +					"#successive commented line\n" +
    +					"this is|1|2.0|\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("#");
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreMultiCharPrefixComments() {
    +		try {
    +
    +
    +			final String fileContent = "//description of the data\n" +
    +					"//successive commented line\n" +
    +					"this is|1|2.0|\n"+
    +					"a test|3|4.0|\n" +
    +					"//next|5|6.0|\n";
    +
    +			final FileInputSplit split = createTempFile(fileContent);
    +
    +			final RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,	BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			final CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("//");
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFields() {
    +		try {
    +			String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readMixedQuotedStringFields() {
    +		try {
    +			String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.enableQuotedStringParsing('@');
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a|b|c", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("|hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFieldsWithTrailingDelimiters() {
    +		try {
    +			String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] { 
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|-");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testIntegerFields() throws IOException {
    +		try {
    +			String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(
    +					new TypeInformation<?>[] {
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO
    +					});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(5);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(111), result.productElement(0));
    +			assertEquals(Integer.valueOf(222), result.productElement(1));
    +			assertEquals(Integer.valueOf(333), result.productElement(2));
    +			assertEquals(Integer.valueOf(444), result.productElement(3));
    +			assertEquals(Integer.valueOf(555), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(666), result.productElement(0));
    +			assertEquals(Integer.valueOf(777), result.productElement(1));
    +			assertEquals(Integer.valueOf(888), result.productElement(2));
    +			assertEquals(Integer.valueOf(999), result.productElement(3));
    +			assertEquals(Integer.valueOf(000), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testEmptyFields() throws IOException {
    +		try{
    +			String fileContent = 
    +					"|0|0|0|0|0|\n" +
    +					"1||1|1|1|1|\n" +
    +					"2|2||2|2|2|\n" +
    +					"3|3|3||3|3|\n" +
    +					"4|4|4|4||4|\n" +
    +					"5|5|5|5|5||\n";
    +			
    +			FileInputSplit split = createTempFile(fileContent);
    +			
    +			//TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.SHORT_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.LONG_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +//				BasicTypeInfo.FLOAT_TYPE_INFO,
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67254028
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	private void configure(String lineDelimiter, String fieldDelimiter,
    +			RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +
    +		if (rowTypeInfo.getArity() == 0) {
    +			throw new IllegalArgumentException("Row arity must be greater than 0.");
    +		}
    +
    +		if (includedFieldsMask == null) {
    +			includedFieldsMask = createDefaultMask(rowTypeInfo.getArity());
    +		}
    +
    +		rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig());
    --- End diff --
    
    We do not need the `RowSerializer`. It is sufficient to remember the number of `Row` fields (`rowTypeInfo.getArity()`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67314234
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java ---
    @@ -0,0 +1,1075 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.PojoTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.StringParser;
    +import org.junit.Test;
    +
    +public class RowCsvInputFormatTest {
    +
    +	private static final Path PATH = new Path("an/ignored/file/");
    +
    +	//Static variables for testing the removal of \r\n to \n
    +	private static final String FIRST_PART = "That is the first part";
    +
    +	private static final String SECOND_PART = "That is the second part";
    +
    +	@Test
    +	public void ignoreInvalidLines() {
    +		try {
    +			String fileContent =
    +					"header1|header2|header3|\n"+
    +					"this is|1|2.0|\n"+
    +					"//a comment\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setLenient(false);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			try {
    +				result = format.nextRecord(result);
    +				fail("Parse Exception was not thrown! (Invalid int value)");
    +			} catch (ParseException ex) {
    +			}
    +
    +			// if format has lenient == false this can be asserted only after FLINK-3908
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("this is", result.productElement(0));
    +//			assertEquals(new Integer(1), result.productElement(1));
    +//			assertEquals(new Double(2.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("a test", result.productElement(0));
    +//			assertEquals(new Integer(3), result.productElement(1));
    +//			assertEquals(new Double(4.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("#next", result.productElement(0));
    +//			assertEquals(new Integer(5), result.productElement(1));
    +//			assertEquals(new Double(6.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNull(result);
    +			
    +			//re-open with lenient = true
    +			format.setLenient(true);
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("header1", result.productElement(0));
    +			assertNull(result.productElement(1));
    +			assertNull(result.productElement(2));
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("#next", result.productElement(0));
    +			assertEquals(new Integer(5), result.productElement(1));
    +			assertEquals(new Double(6.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreSingleCharPrefixComments() {
    +		try {
    +			final String fileContent = 
    +					"#description of the data\n" +
    +					"#successive commented line\n" +
    +					"this is|1|2.0|\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("#");
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreMultiCharPrefixComments() {
    +		try {
    +
    +
    +			final String fileContent = "//description of the data\n" +
    +					"//successive commented line\n" +
    +					"this is|1|2.0|\n"+
    +					"a test|3|4.0|\n" +
    +					"//next|5|6.0|\n";
    +
    +			final FileInputSplit split = createTempFile(fileContent);
    +
    +			final RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,	BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			final CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("//");
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFields() {
    +		try {
    +			String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readMixedQuotedStringFields() {
    +		try {
    +			String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.enableQuotedStringParsing('@');
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a|b|c", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("|hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFieldsWithTrailingDelimiters() {
    +		try {
    +			String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] { 
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|-");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testIntegerFields() throws IOException {
    +		try {
    +			String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(
    +					new TypeInformation<?>[] {
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO
    +					});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(5);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(111), result.productElement(0));
    +			assertEquals(Integer.valueOf(222), result.productElement(1));
    +			assertEquals(Integer.valueOf(333), result.productElement(2));
    +			assertEquals(Integer.valueOf(444), result.productElement(3));
    +			assertEquals(Integer.valueOf(555), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(666), result.productElement(0));
    +			assertEquals(Integer.valueOf(777), result.productElement(1));
    +			assertEquals(Integer.valueOf(888), result.productElement(2));
    +			assertEquals(Integer.valueOf(999), result.productElement(3));
    +			assertEquals(Integer.valueOf(000), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testEmptyFields() throws IOException {
    +		try{
    +			String fileContent = 
    +					"|0|0|0|0|0|\n" +
    +					"1||1|1|1|1|\n" +
    +					"2|2||2|2|2|\n" +
    +					"3|3|3||3|3|\n" +
    +					"4|4|4|4||4|\n" +
    +					"5|5|5|5|5||\n";
    +			
    +			FileInputSplit split = createTempFile(fileContent);
    +			
    +			//TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.SHORT_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.LONG_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +//				BasicTypeInfo.FLOAT_TYPE_INFO,
    --- End diff --
    
    opened FLINK-4081 at this purpose


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the issue:

    https://github.com/apache/flink/pull/1989
  
    Ok for me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67299477
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	private void configure(String lineDelimiter, String fieldDelimiter,
    +			RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +
    +		if (rowTypeInfo.getArity() == 0) {
    +			throw new IllegalArgumentException("Row arity must be greater than 0.");
    +		}
    +
    +		if (includedFieldsMask == null) {
    +			includedFieldsMask = createDefaultMask(rowTypeInfo.getArity());
    +		}
    +
    +		rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig());
    +
    +		setDelimiter(lineDelimiter);
    +		setFieldDelimiter(fieldDelimiter);
    +
    +		Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
    +
    +		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
    +			classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
    +		}
    +
    +		setFieldsGeneric(includedFieldsMask, classes);
    +	}
    +
    +	@Override
    +	public Row fillRecord(Row reuse, Object[] parsedValues) {
    +		if (reuse == null) {
    +			reuse = new Row(rowSerializer.getLength());
    +		}
    +		for (int i = 0; i < parsedValues.length; i++) {
    +			reuse.setField(i, parsedValues[i]);
    +		}
    +		return reuse;
    +	}
    +
    +	@Override
    +	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
    +		boolean[] fieldIncluded = this.fieldIncluded;
    +
    +		int startPos = offset;
    +		final int limit = offset + numBytes;
    +
    +		for (int field = 0, output = 0; field < fieldIncluded.length; field++) {
    +
    +			// check valid start position
    +			if (startPos >= limit) {
    +				if (isLenient()) {
    +					return false;
    +				} else {
    +					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
    +				}
    +			}
    +
    +			if (fieldIncluded[field]) {
    +				// parse field
    +				@SuppressWarnings("unchecked")
    +				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
    +				int latestValidPos = startPos;
    +				startPos = parser.parseField(bytes, startPos, limit, this.getFieldDelimiter(), holders[output]);
    +				if (!isLenient() && parser.getErrorState() != ParseErrorState.NONE) {
    +					// Row is able to handle null values
    +					if (parser.getErrorState() != ParseErrorState.EMPTY_STRING) {
    +						throw new ParseException(
    +								String.format("Parsing error for column %s of row '%s' originated by %s: %s.", field,
    +										new String(bytes, offset, numBytes),
    +										parser.getClass().getSimpleName(), parser.getErrorState()));
    +					}
    +				}
    +				holders[output] = parser.getLastResult();
    +
    +				// check parse result
    +				if (startPos < 0) {
    +					holders[output] = null;
    --- End diff --
    
    Hmm, that's a good point. For boxed primitives it won't make a difference anyway since these are immutable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/1989
  
    @fpompermaier I started working on this, but I can not guarantee that it will be part of the release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/1989
  
    I would assign FLINK-4081 to me and try to fix it this week. I also think that a mixed project makes development more complicated. I scanned through your code. Maybe you could convert the 143 lines of  `RowCsvInputFormat` to Scala and let the 1000+ lines of test code in Java.
    
    What do you think @fpompermaier @fhueske?
    Maybe we can get this PR into the 1.1 release. It is a nice feature which might make a lot of users happy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67295515
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	private void configure(String lineDelimiter, String fieldDelimiter,
    +			RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +
    +		if (rowTypeInfo.getArity() == 0) {
    +			throw new IllegalArgumentException("Row arity must be greater than 0.");
    +		}
    +
    +		if (includedFieldsMask == null) {
    +			includedFieldsMask = createDefaultMask(rowTypeInfo.getArity());
    +		}
    +
    +		rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig());
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/1989
  
    @fpompermaier I think you can close this PR. I will take your commit, put the Scala conversion on top of it and open a new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67295853
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	private void configure(String lineDelimiter, String fieldDelimiter,
    +			RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +
    +		if (rowTypeInfo.getArity() == 0) {
    +			throw new IllegalArgumentException("Row arity must be greater than 0.");
    +		}
    +
    +		if (includedFieldsMask == null) {
    +			includedFieldsMask = createDefaultMask(rowTypeInfo.getArity());
    +		}
    +
    +		rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig());
    +
    +		setDelimiter(lineDelimiter);
    +		setFieldDelimiter(fieldDelimiter);
    +
    +		Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
    +
    +		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
    +			classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
    +		}
    +
    +		setFieldsGeneric(includedFieldsMask, classes);
    +	}
    +
    +	@Override
    +	public Row fillRecord(Row reuse, Object[] parsedValues) {
    +		if (reuse == null) {
    +			reuse = new Row(rowSerializer.getLength());
    +		}
    +		for (int i = 0; i < parsedValues.length; i++) {
    +			reuse.setField(i, parsedValues[i]);
    +		}
    +		return reuse;
    +	}
    +
    +	@Override
    +	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
    +		boolean[] fieldIncluded = this.fieldIncluded;
    +
    +		int startPos = offset;
    +		final int limit = offset + numBytes;
    +
    +		for (int field = 0, output = 0; field < fieldIncluded.length; field++) {
    +
    +			// check valid start position
    +			if (startPos >= limit) {
    +				if (isLenient()) {
    +					return false;
    +				} else {
    +					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
    +				}
    +			}
    +
    +			if (fieldIncluded[field]) {
    +				// parse field
    +				@SuppressWarnings("unchecked")
    +				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
    +				int latestValidPos = startPos;
    +				startPos = parser.parseField(bytes, startPos, limit, this.getFieldDelimiter(), holders[output]);
    +				if (!isLenient() && parser.getErrorState() != ParseErrorState.NONE) {
    --- End diff --
    
    Actually I started from the TupleCsvInputFormat...I have to check this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/1989
  
    Hi @fpompermaier, thanks for the PR. The `flink-table` module is almost completely implemented in Scala. We do not have good experience with mixed Scala / Java modules and I would like to ask if you could port the Java classes to Scala. 
    I haven't had a detailed look at the test yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67298280
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	private void configure(String lineDelimiter, String fieldDelimiter,
    +			RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +
    +		if (rowTypeInfo.getArity() == 0) {
    +			throw new IllegalArgumentException("Row arity must be greater than 0.");
    +		}
    +
    +		if (includedFieldsMask == null) {
    +			includedFieldsMask = createDefaultMask(rowTypeInfo.getArity());
    +		}
    +
    +		rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig());
    +
    +		setDelimiter(lineDelimiter);
    +		setFieldDelimiter(fieldDelimiter);
    +
    +		Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
    +
    +		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
    +			classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
    +		}
    +
    +		setFieldsGeneric(includedFieldsMask, classes);
    +	}
    +
    +	@Override
    +	public Row fillRecord(Row reuse, Object[] parsedValues) {
    +		if (reuse == null) {
    +			reuse = new Row(rowSerializer.getLength());
    +		}
    +		for (int i = 0; i < parsedValues.length; i++) {
    +			reuse.setField(i, parsedValues[i]);
    +		}
    +		return reuse;
    +	}
    +
    +	@Override
    +	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
    +		boolean[] fieldIncluded = this.fieldIncluded;
    +
    +		int startPos = offset;
    +		final int limit = offset + numBytes;
    +
    +		for (int field = 0, output = 0; field < fieldIncluded.length; field++) {
    +
    +			// check valid start position
    +			if (startPos >= limit) {
    +				if (isLenient()) {
    +					return false;
    +				} else {
    +					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
    +				}
    +			}
    +
    +			if (fieldIncluded[field]) {
    +				// parse field
    +				@SuppressWarnings("unchecked")
    +				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
    +				int latestValidPos = startPos;
    +				startPos = parser.parseField(bytes, startPos, limit, this.getFieldDelimiter(), holders[output]);
    +				if (!isLenient() && parser.getErrorState() != ParseErrorState.NONE) {
    --- End diff --
    
    Yes, I had a look at this class as well. The difference here is that we do not want to stop parsing a line if we encounter a `ParseState.NONE` and set a `null` value instead, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67257728
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala ---
    @@ -25,6 +25,8 @@ import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, TupleTypeInfo}
     import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
     import org.apache.flink.api.table.Row
     import org.apache.flink.core.fs.Path
    +import org.apache.flink.api.table.typeutils.RowTypeInfo
    +import org.apache.flink.api.java.io.RowCsvInputFormat
     
     /**
       * A [[TableSource]] for simple CSV files with up to 25 fields.
    --- End diff --
    
    Remove 25 field limitation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67252744
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    --- End diff --
    
    let all constructors (transitively) call this constructor and move the functionality of `configure` into this constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67256445
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	private void configure(String lineDelimiter, String fieldDelimiter,
    +			RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +
    +		if (rowTypeInfo.getArity() == 0) {
    +			throw new IllegalArgumentException("Row arity must be greater than 0.");
    +		}
    +
    +		if (includedFieldsMask == null) {
    +			includedFieldsMask = createDefaultMask(rowTypeInfo.getArity());
    +		}
    +
    +		rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig());
    +
    +		setDelimiter(lineDelimiter);
    +		setFieldDelimiter(fieldDelimiter);
    +
    +		Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
    +
    +		for (int i = 0; i < rowTypeInfo.getArity(); i++) {
    +			classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
    +		}
    +
    +		setFieldsGeneric(includedFieldsMask, classes);
    +	}
    +
    +	@Override
    +	public Row fillRecord(Row reuse, Object[] parsedValues) {
    +		if (reuse == null) {
    +			reuse = new Row(rowSerializer.getLength());
    +		}
    +		for (int i = 0; i < parsedValues.length; i++) {
    +			reuse.setField(i, parsedValues[i]);
    +		}
    +		return reuse;
    +	}
    +
    +	@Override
    +	protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {
    +		boolean[] fieldIncluded = this.fieldIncluded;
    +
    +		int startPos = offset;
    +		final int limit = offset + numBytes;
    +
    +		for (int field = 0, output = 0; field < fieldIncluded.length; field++) {
    +
    +			// check valid start position
    +			if (startPos >= limit) {
    +				if (isLenient()) {
    +					return false;
    +				} else {
    +					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
    +				}
    +			}
    +
    +			if (fieldIncluded[field]) {
    +				// parse field
    +				@SuppressWarnings("unchecked")
    +				FieldParser<Object> parser = (FieldParser<Object>) this.getFieldParsers()[output];
    +				int latestValidPos = startPos;
    +				startPos = parser.parseField(bytes, startPos, limit, this.getFieldDelimiter(), holders[output]);
    +				if (!isLenient() && parser.getErrorState() != ParseErrorState.NONE) {
    +					// Row is able to handle null values
    +					if (parser.getErrorState() != ParseErrorState.EMPTY_STRING) {
    +						throw new ParseException(
    +								String.format("Parsing error for column %s of row '%s' originated by %s: %s.", field,
    +										new String(bytes, offset, numBytes),
    +										parser.getClass().getSimpleName(), parser.getErrorState()));
    +					}
    +				}
    +				holders[output] = parser.getLastResult();
    +
    +				// check parse result
    +				if (startPos < 0) {
    +					holders[output] = null;
    +					startPos = skipFields(bytes, latestValidPos, limit, this.getFieldDelimiter());
    +				}
    +				output++;
    +			} else {
    +				// skip field
    +				startPos = skipFields(bytes, startPos, limit, this.getFieldDelimiter());
    --- End diff --
    
    Add a check for `startPos < 0`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67295498
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowSerializer;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.FieldParser.ParseErrorState;
    +
    +@Internal
    +public class RowCsvInputFormat extends CsvInputFormat<Row> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private RowSerializer rowSerializer;
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) {
    +		this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity()));
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			int[] includedFieldsMask) {
    +		super(filePath);
    +		boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
    +				: toBooleanMask(includedFieldsMask);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
    +		this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask);
    +	}
    +
    +	public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo,
    +			boolean[] includedFieldsMask) {
    +		super(filePath);
    +		configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask);
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1989#discussion_r67299243
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java ---
    @@ -0,0 +1,1075 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.io;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStreamWriter;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.PojoTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.types.parser.StringParser;
    +import org.junit.Test;
    +
    +public class RowCsvInputFormatTest {
    +
    +	private static final Path PATH = new Path("an/ignored/file/");
    +
    +	//Static variables for testing the removal of \r\n to \n
    +	private static final String FIRST_PART = "That is the first part";
    +
    +	private static final String SECOND_PART = "That is the second part";
    +
    +	@Test
    +	public void ignoreInvalidLines() {
    +		try {
    +			String fileContent =
    +					"header1|header2|header3|\n"+
    +					"this is|1|2.0|\n"+
    +					"//a comment\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setLenient(false);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			try {
    +				result = format.nextRecord(result);
    +				fail("Parse Exception was not thrown! (Invalid int value)");
    +			} catch (ParseException ex) {
    +			}
    +
    +			// if format has lenient == false this can be asserted only after FLINK-3908
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("this is", result.productElement(0));
    +//			assertEquals(new Integer(1), result.productElement(1));
    +//			assertEquals(new Double(2.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("a test", result.productElement(0));
    +//			assertEquals(new Integer(3), result.productElement(1));
    +//			assertEquals(new Double(4.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNotNull(result);
    +//			assertEquals("#next", result.productElement(0));
    +//			assertEquals(new Integer(5), result.productElement(1));
    +//			assertEquals(new Double(6.0), result.productElement(2));
    +//
    +//			result = format.nextRecord(result);
    +//			assertNull(result);
    +			
    +			//re-open with lenient = true
    +			format.setLenient(true);
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("header1", result.productElement(0));
    +			assertNull(result.productElement(1));
    +			assertNull(result.productElement(2));
    +			
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("#next", result.productElement(0));
    +			assertEquals(new Integer(5), result.productElement(1));
    +			assertEquals(new Double(6.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreSingleCharPrefixComments() {
    +		try {
    +			final String fileContent = 
    +					"#description of the data\n" +
    +					"#successive commented line\n" +
    +					"this is|1|2.0|\n" +
    +					"a test|3|4.0|\n" +
    +					"#next|5|6.0|\n";
    +
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("#");
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void ignoreMultiCharPrefixComments() {
    +		try {
    +
    +
    +			final String fileContent = "//description of the data\n" +
    +					"//successive commented line\n" +
    +					"this is|1|2.0|\n"+
    +					"a test|3|4.0|\n" +
    +					"//next|5|6.0|\n";
    +
    +			final FileInputSplit split = createTempFile(fileContent);
    +
    +			final RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,	BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO });
    +			final CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +			format.setCommentPrefix("//");
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("this is", result.productElement(0));
    +			assertEquals(new Integer(1), result.productElement(1));
    +			assertEquals(new Double(2.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a test", result.productElement(0));
    +			assertEquals(new Integer(3), result.productElement(1));
    +			assertEquals(new Double(4.0), result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFields() {
    +		try {
    +			String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			final Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readMixedQuotedStringFields() {
    +		try {
    +			String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			Configuration parameters = new Configuration();
    +			format.configure(parameters);
    +			format.enableQuotedStringParsing('@');
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("a|b|c", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("|hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			ex.printStackTrace();
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void readStringFieldsWithTrailingDelimiters() {
    +		try {
    +			String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] { 
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO,
    +				BasicTypeInfo.STRING_TYPE_INFO
    +			});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|-");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(3);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("def", result.productElement(1));
    +			assertEquals("ghijk", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("abc", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("hhg", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals("", result.productElement(0));
    +			assertEquals("", result.productElement(1));
    +			assertEquals("", result.productElement(2));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testIntegerFields() throws IOException {
    +		try {
    +			String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
    +			FileInputSplit split = createTempFile(fileContent);
    +
    +			RowTypeInfo typeInfo = new RowTypeInfo(
    +					new TypeInformation<?>[] {
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO,
    +						BasicTypeInfo.INT_TYPE_INFO
    +					});
    +			CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo);
    +
    +			format.setFieldDelimiter("|");
    +
    +			format.configure(new Configuration());
    +			format.open(split);
    +
    +			Row result = new Row(5);
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(111), result.productElement(0));
    +			assertEquals(Integer.valueOf(222), result.productElement(1));
    +			assertEquals(Integer.valueOf(333), result.productElement(2));
    +			assertEquals(Integer.valueOf(444), result.productElement(3));
    +			assertEquals(Integer.valueOf(555), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNotNull(result);
    +			assertEquals(Integer.valueOf(666), result.productElement(0));
    +			assertEquals(Integer.valueOf(777), result.productElement(1));
    +			assertEquals(Integer.valueOf(888), result.productElement(2));
    +			assertEquals(Integer.valueOf(999), result.productElement(3));
    +			assertEquals(Integer.valueOf(000), result.productElement(4));
    +
    +			result = format.nextRecord(result);
    +			assertNull(result);
    +			assertTrue(format.reachedEnd());
    +		}
    +		catch (Exception ex) {
    +			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testEmptyFields() throws IOException {
    +		try{
    +			String fileContent = 
    +					"|0|0|0|0|0|\n" +
    +					"1||1|1|1|1|\n" +
    +					"2|2||2|2|2|\n" +
    +					"3|3|3||3|3|\n" +
    +					"4|4|4|4||4|\n" +
    +					"5|5|5|5|5||\n";
    +			
    +			FileInputSplit split = createTempFile(fileContent);
    +			
    +			//TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values
    +			RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] {
    +				BasicTypeInfo.SHORT_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +				BasicTypeInfo.LONG_TYPE_INFO,
    +				BasicTypeInfo.INT_TYPE_INFO,
    +//				BasicTypeInfo.FLOAT_TYPE_INFO,
    --- End diff --
    
    Can you open a JIRA issue for this? Nobody will notice this issue if this is hidden in a comment of a test class, 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1989: FLINK-3901 - Added CsvRowInputFormat

Posted by fpompermaier <gi...@git.apache.org>.
Github user fpompermaier commented on the issue:

    https://github.com/apache/flink/pull/1989
  
    Don't worry @twalthr, it will be nice to have it in the 1.1 but I understand that it's not a priority :)
    Thanks a lot for the update!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---