You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uybhatti <gi...@git.apache.org> on 2017/09/08 13:11:38 UTC

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

GitHub user uybhatti opened a pull request:

    https://github.com/apache/flink/pull/4660

    [FLINK-7050][table] Add support of RFC compliant CSV parser for Table Source

    ## What is the purpose of the change
    
    Currently CsvInputFormat is not compliant with RFC 4180 standards and we can't correctly parse fields containing double quotes, line delimiter or field delimiter.
    
    
    ## Brief change log
    
      - RFCCsvInputFormat is added to support RFC 4180 standards.
      - Also we added RFCRowCsvInputFormat, RFCTupleCsvInputFormat and RFCCsvTableSource for this purpose.
      - New dependency for Jackson parser is added in pom.xml of flink-table. 
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - RFCCsvInputFormatTest and RFCRowCsvInputFormatTest are added to test the added CSV Input Format functionality.
      - TableSourceTest#testRFCCsvTableSourceBuilder is added for RFCCsvTableSource.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **yes**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no**
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **yes**
      - If yes, how is the feature documented? **not documented yet**


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uybhatti/flink FLINK-7050

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4660.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4660
    
----
commit 977da4649ab01b3c924640c03883312a5c2f1427
Author: uybhatti <uy...@gmail.com>
Date:   2017-09-07T14:53:24Z

    [FLINK-7050][table] Add support of RFC compliant CSV parser for Table Source

----


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293761
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    --- End diff --
    
    do empty types make sense or must they always be configured? If yes, we don't need to initialize the variable.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296542
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		return record;
    +	}
    +
    +	public <I, O> O dataTypeConversion(I input, Class<O> outputClass) throws Exception {
    +
    +		String type = outputClass.getSimpleName().toLowerCase();
    +		try {
    +			switch (type) {
    +				case "string":
    +					return (O) input.toString();
    +				case "boolean":
    +					if (input.toString().length() != 0) { // special case for boolean
    +						return (O) Boolean.valueOf(input.toString());
    +					}
    +					return null;
    +				case "byte":
    +					return (O) Byte.valueOf(input.toString());
    +				case "short":
    +					return (O) Short.valueOf(input.toString());
    +				case "integer":
    +					return (O) Integer.valueOf(input.toString());
    +				case "long":
    +					return (O) Long.valueOf(input.toString());
    +				case "float":
    +					return (O) Float.valueOf(input.toString());
    +				case "double":
    +					return (O) Double.valueOf(input.toString());
    +				case "decimal":
    +					return (O) Double.valueOf(input.toString());
    +				case "date":
    +					return (O) Date.valueOf(input.toString());
    +				case "time":
    +					return (O) Time.valueOf(input.toString());
    +				case "timestamp":
    +					return (O) Timestamp.valueOf(input.toString());
    +				default:
    +					return (O) input;
    +			}
    +		}
    +		catch (Exception e) {
    +			if (isLenient()) {
    +				return null;
    +			}
    +			else {
    +				throw new ParseException();
    +			}
    +		}
    +
    +	}
    +
    +	// create projection mask
    +
    +	protected static boolean[] createDefaultMask(int size) {
    +		boolean[] includedMask = new boolean[size];
    +		for (int x = 0; x < includedMask.length; x++) {
    +			includedMask[x] = true;
    +		}
    +		return includedMask;
    +	}
    +
    +	protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
    +		Preconditions.checkNotNull(sourceFieldIndices);
    +
    +		int max = 0;
    +		for (int i : sourceFieldIndices) {
    +			if (i < 0) {
    +				throw new IllegalArgumentException("Field indices must not be smaller than zero.");
    +			}
    +			max = Math.max(i, max);
    +		}
    +
    +		boolean[] includedMask = new boolean[max + 1];
    +
    +		// check if we support parsers for these types
    +		for (int i = 0; i < sourceFieldIndices.length; i++) {
    +			includedMask[sourceFieldIndices[i]] = true;
    +		}
    +
    +		return includedMask;
    +	}
    +
    +	// configuration setting
    +
    +	public String getDelimiter() {
    +		return recordDelimiter;
    +	}
    +
    +	public void setDelimiter(char delimiter) {
    +		setDelimiter(String.valueOf(delimiter));
    +	}
    +
    +	public void setDelimiter(String delimiter) {
    +		if (delimiter == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (delimiter.length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.recordDelimiter = delimiter;
    +	}
    +
    +	public void setFieldDelimiter(char delimiter) {
    +		if (String.valueOf(delimiter) == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (String.valueOf(delimiter).length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.fieldDelimiter = delimiter;
    +	}
    +
    +	public void setFieldDelimiter(String delimiter) {
    +		if (delimiter == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (delimiter.length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.fieldDelimiter = delimiter.charAt(0);
    +	}
    +
    +	public char getFieldDelimiter() {
    +		return this.fieldDelimiter;
    +	}
    +
    +	public void setSkipFirstLineAsHeader(boolean skipFirstLine) {
    +		this.skipFirstLineAsHeader = skipFirstLine;
    +	}
    +
    +	public void enableQuotedStringParsing(char quoteCharacter) {
    +		quotedStringParsing = true;
    +		this.quoteCharacter = quoteCharacter;
    +	}
    +
    +	public boolean isLenient() {
    +		return lenient;
    +	}
    +
    +	public void setLenient(boolean lenient) {
    +		this.lenient = lenient;
    +	}
    +
    +	public void setCommentPrefix(String commentPrefix) {
    +		this.commentPrefix = commentPrefix;
    +		this.allowComments = true;
    +	}
    +
    +	protected Class<?>[] getGenericFieldTypes() {
    +		// check if we are dense, i.e., we read all fields
    +		if (this.fieldIncluded.length == this.fieldTypes.length) {
    +			return this.fieldTypes;
    +		}
    +		else {
    +			// sparse type array which we made dense for internal book keeping.
    +			// create a sparse copy to return
    +			Class<?>[] types = new Class<?>[this.fieldIncluded.length];
    +
    +			for (int i = 0, k = 0; i < this.fieldIncluded.length; i++) {
    +				if (this.fieldIncluded[i]) {
    +					types[i] = this.fieldTypes[k++];
    +				}
    +			}
    +
    +			return types;
    +		}
    +	}
    +
    +	protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) {
    --- End diff --
    
    Add JavaDocs to explain what the method is doing



---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293764
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    --- End diff --
    
    same as `fieldTypes`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296186
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    --- End diff --
    
    Change to `mapper.readerFor(String[].class).readValues(csvParser);`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139294890
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    --- End diff --
    
    Add more inline comments to this class


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139294745
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    --- End diff --
    
    Move `mapper` down to the place where it is used


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296322
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		return record;
    +	}
    +
    +	public <I, O> O dataTypeConversion(I input, Class<O> outputClass) throws Exception {
    +
    +		String type = outputClass.getSimpleName().toLowerCase();
    +		try {
    +			switch (type) {
    +				case "string":
    +					return (O) input.toString();
    +				case "boolean":
    +					if (input.toString().length() != 0) { // special case for boolean
    +						return (O) Boolean.valueOf(input.toString());
    +					}
    +					return null;
    +				case "byte":
    +					return (O) Byte.valueOf(input.toString());
    +				case "short":
    +					return (O) Short.valueOf(input.toString());
    +				case "integer":
    +					return (O) Integer.valueOf(input.toString());
    +				case "long":
    +					return (O) Long.valueOf(input.toString());
    +				case "float":
    +					return (O) Float.valueOf(input.toString());
    +				case "double":
    +					return (O) Double.valueOf(input.toString());
    +				case "decimal":
    +					return (O) Double.valueOf(input.toString());
    +				case "date":
    +					return (O) Date.valueOf(input.toString());
    +				case "time":
    +					return (O) Time.valueOf(input.toString());
    +				case "timestamp":
    +					return (O) Timestamp.valueOf(input.toString());
    +				default:
    +					return (O) input;
    +			}
    +		}
    +		catch (Exception e) {
    +			if (isLenient()) {
    +				return null;
    --- End diff --
    
    This lenient case is not correctly forwarded. It cannot be distinguished from the case where a field is actually null


---

[GitHub] flink issue #4660: [FLINK-7050][table] Add support of RFC compliant CSV pars...

Posted by uybhatti <gi...@git.apache.org>.
Github user uybhatti commented on the issue:

    https://github.com/apache/flink/pull/4660
  
    Hi @fhueske , thanks for your feedback. I have addressed your feedback. Please take a look on changes.
    Thanks, Usman


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293483
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    --- End diff --
    
    describe parameter


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296209
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		return record;
    +	}
    +
    +	public <I, O> O dataTypeConversion(I input, Class<O> outputClass) throws Exception {
    +
    +		String type = outputClass.getSimpleName().toLowerCase();
    +		try {
    +			switch (type) {
    +				case "string":
    +					return (O) input.toString();
    --- End diff --
    
    If we read as `String[]` we don't need to call `toString()`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293552
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    --- End diff --
    
    `fieldDelimiter = DEFAULT_FIELD_DELIMITER;`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293977
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    --- End diff --
    
    Please add JavaDocs for the method and inline comments.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296219
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		return record;
    +	}
    +
    +	public <I, O> O dataTypeConversion(I input, Class<O> outputClass) throws Exception {
    --- End diff --
    
    `I` is always `String`. No need to parameterize.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293998
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    --- End diff --
    
    check can be done before `seek`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139294724
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    --- End diff --
    
    add `this.`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293983
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    --- End diff --
    
    Please add JavaDocs for the method and inline comments.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by uybhatti <gi...@git.apache.org>.
Github user uybhatti commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r141336157
  
    --- Diff: flink-libraries/flink-table/pom.xml ---
    @@ -114,6 +114,12 @@ under the License.
     			<groupId>joda-time</groupId>
     			<artifactId>joda-time</artifactId>
     		</dependency>
    +		<!-- FOR RFC 4180 Compliant CSV Parser -->
    +		<dependency>
    --- End diff --
    
    I think it's better to move this functionality into `flink-connectors` as  `flink-connector-csv`, what do you think?


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293611
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    --- End diff --
    
    not used. please remove


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293492
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    --- End diff --
    
    `InputFormat that reads CSV files that are compliant with the RFC 4180 standard.`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293818
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    --- End diff --
    
    Please add comments for the member variables


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139295101
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		return record;
    +	}
    +
    +	public <I, O> O dataTypeConversion(I input, Class<O> outputClass) throws Exception {
    --- End diff --
    
    can be `private`. Please add JavaDocs and inline comments


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296194
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    --- End diff --
    
    Change to `String[] record = `


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296182
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    --- End diff --
    
    Can be a `MappingIterator<String[]>`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139295183
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    --- End diff --
    
    rename to `projectFields()`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by uybhatti <gi...@git.apache.org>.
Github user uybhatti commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139923268
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    --- End diff --
    
    Using `Builder.setColumnType()` we can only specify NUMBER, STRING and BOOLEAN. So, manual casting is required for other data types. Also in this way, we have to specify all columns in schema, and parser doesn't automatically skip fields which are not configured and return a map iterator as a result. 
    Also, I think conversion from Array(current implementation) to Row is more efficient rather than from Map to Row. What do you think?


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139295009
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    --- End diff --
    
    The `CsvSchema` JavaDocs explain that only `"\r"`, `"\r\n"`, `"\n"` are supported line delimiters. We should check for these in the setter.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296445
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		return record;
    +	}
    +
    +	public <I, O> O dataTypeConversion(I input, Class<O> outputClass) throws Exception {
    +
    +		String type = outputClass.getSimpleName().toLowerCase();
    +		try {
    +			switch (type) {
    +				case "string":
    +					return (O) input.toString();
    +				case "boolean":
    +					if (input.toString().length() != 0) { // special case for boolean
    +						return (O) Boolean.valueOf(input.toString());
    +					}
    +					return null;
    +				case "byte":
    +					return (O) Byte.valueOf(input.toString());
    +				case "short":
    +					return (O) Short.valueOf(input.toString());
    +				case "integer":
    +					return (O) Integer.valueOf(input.toString());
    +				case "long":
    +					return (O) Long.valueOf(input.toString());
    +				case "float":
    +					return (O) Float.valueOf(input.toString());
    +				case "double":
    +					return (O) Double.valueOf(input.toString());
    +				case "decimal":
    +					return (O) Double.valueOf(input.toString());
    +				case "date":
    +					return (O) Date.valueOf(input.toString());
    +				case "time":
    +					return (O) Time.valueOf(input.toString());
    +				case "timestamp":
    +					return (O) Timestamp.valueOf(input.toString());
    +				default:
    +					return (O) input;
    +			}
    +		}
    +		catch (Exception e) {
    +			if (isLenient()) {
    +				return null;
    +			}
    +			else {
    +				throw new ParseException();
    +			}
    +		}
    +
    +	}
    +
    +	// create projection mask
    +
    +	protected static boolean[] createDefaultMask(int size) {
    +		boolean[] includedMask = new boolean[size];
    +		for (int x = 0; x < includedMask.length; x++) {
    +			includedMask[x] = true;
    +		}
    +		return includedMask;
    +	}
    +
    +	protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
    +		Preconditions.checkNotNull(sourceFieldIndices);
    +
    +		int max = 0;
    +		for (int i : sourceFieldIndices) {
    +			if (i < 0) {
    +				throw new IllegalArgumentException("Field indices must not be smaller than zero.");
    +			}
    +			max = Math.max(i, max);
    +		}
    +
    +		boolean[] includedMask = new boolean[max + 1];
    +
    +		// check if we support parsers for these types
    +		for (int i = 0; i < sourceFieldIndices.length; i++) {
    +			includedMask[sourceFieldIndices[i]] = true;
    +		}
    +
    +		return includedMask;
    +	}
    +
    +	// configuration setting
    +
    +	public String getDelimiter() {
    +		return recordDelimiter;
    +	}
    +
    +	public void setDelimiter(char delimiter) {
    +		setDelimiter(String.valueOf(delimiter));
    +	}
    +
    +	public void setDelimiter(String delimiter) {
    +		if (delimiter == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (delimiter.length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.recordDelimiter = delimiter;
    +	}
    +
    +	public void setFieldDelimiter(char delimiter) {
    +		if (String.valueOf(delimiter) == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (String.valueOf(delimiter).length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.fieldDelimiter = delimiter;
    +	}
    +
    +	public void setFieldDelimiter(String delimiter) {
    +		if (delimiter == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (delimiter.length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.fieldDelimiter = delimiter.charAt(0);
    +	}
    +
    +	public char getFieldDelimiter() {
    --- End diff --
    
    consistently define either getter or setter first


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293951
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    --- End diff --
    
    the name of the variable is misleading. 
    `this.splitLength + lastDelimiterPosition - firstDelimiterPosition` gives the read length, but not the position of the last delimiter.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139294269
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    --- End diff --
    
    You only check the first character of the delimiter. We do not enforce a single character delimiter in the setter and the CSV parser seems to support multiple characters as well. Hence, we must check for the full delimiter. It would also make sense to convert the delimiter into a `char[]` for more efficient access of the characters.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293744
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    --- End diff --
    
    no need for `private static` variables for initialization. We can initialize `fieldTypes` and `fieldIncluded` directly.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296037
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    --- End diff --
    
    try to reduce the number of method calls. can we fuse `projectedFields` and `castRecord`?


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296493
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		return record;
    +	}
    +
    +	public <I, O> O dataTypeConversion(I input, Class<O> outputClass) throws Exception {
    +
    +		String type = outputClass.getSimpleName().toLowerCase();
    +		try {
    +			switch (type) {
    +				case "string":
    +					return (O) input.toString();
    +				case "boolean":
    +					if (input.toString().length() != 0) { // special case for boolean
    +						return (O) Boolean.valueOf(input.toString());
    +					}
    +					return null;
    +				case "byte":
    +					return (O) Byte.valueOf(input.toString());
    +				case "short":
    +					return (O) Short.valueOf(input.toString());
    +				case "integer":
    +					return (O) Integer.valueOf(input.toString());
    +				case "long":
    +					return (O) Long.valueOf(input.toString());
    +				case "float":
    +					return (O) Float.valueOf(input.toString());
    +				case "double":
    +					return (O) Double.valueOf(input.toString());
    +				case "decimal":
    +					return (O) Double.valueOf(input.toString());
    +				case "date":
    +					return (O) Date.valueOf(input.toString());
    +				case "time":
    +					return (O) Time.valueOf(input.toString());
    +				case "timestamp":
    +					return (O) Timestamp.valueOf(input.toString());
    +				default:
    +					return (O) input;
    +			}
    +		}
    +		catch (Exception e) {
    +			if (isLenient()) {
    +				return null;
    +			}
    +			else {
    +				throw new ParseException();
    +			}
    +		}
    +
    +	}
    +
    +	// create projection mask
    +
    +	protected static boolean[] createDefaultMask(int size) {
    +		boolean[] includedMask = new boolean[size];
    +		for (int x = 0; x < includedMask.length; x++) {
    +			includedMask[x] = true;
    +		}
    +		return includedMask;
    +	}
    +
    +	protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
    +		Preconditions.checkNotNull(sourceFieldIndices);
    +
    +		int max = 0;
    +		for (int i : sourceFieldIndices) {
    +			if (i < 0) {
    +				throw new IllegalArgumentException("Field indices must not be smaller than zero.");
    +			}
    +			max = Math.max(i, max);
    +		}
    +
    +		boolean[] includedMask = new boolean[max + 1];
    +
    +		// check if we support parsers for these types
    +		for (int i = 0; i < sourceFieldIndices.length; i++) {
    +			includedMask[sourceFieldIndices[i]] = true;
    +		}
    +
    +		return includedMask;
    +	}
    +
    +	// configuration setting
    +
    +	public String getDelimiter() {
    +		return recordDelimiter;
    +	}
    +
    +	public void setDelimiter(char delimiter) {
    +		setDelimiter(String.valueOf(delimiter));
    +	}
    +
    +	public void setDelimiter(String delimiter) {
    +		if (delimiter == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (delimiter.length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.recordDelimiter = delimiter;
    +	}
    +
    +	public void setFieldDelimiter(char delimiter) {
    +		if (String.valueOf(delimiter) == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (String.valueOf(delimiter).length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.fieldDelimiter = delimiter;
    +	}
    +
    +	public void setFieldDelimiter(String delimiter) {
    +		if (delimiter == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (delimiter.length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.fieldDelimiter = delimiter.charAt(0);
    +	}
    +
    +	public char getFieldDelimiter() {
    +		return this.fieldDelimiter;
    +	}
    +
    +	public void setSkipFirstLineAsHeader(boolean skipFirstLine) {
    +		this.skipFirstLineAsHeader = skipFirstLine;
    +	}
    +
    +	public void enableQuotedStringParsing(char quoteCharacter) {
    +		quotedStringParsing = true;
    +		this.quoteCharacter = quoteCharacter;
    +	}
    +
    +	public boolean isLenient() {
    +		return lenient;
    +	}
    +
    +	public void setLenient(boolean lenient) {
    +		this.lenient = lenient;
    +	}
    +
    +	public void setCommentPrefix(String commentPrefix) {
    +		this.commentPrefix = commentPrefix;
    +		this.allowComments = true;
    +	}
    +
    +	protected Class<?>[] getGenericFieldTypes() {
    --- End diff --
    
    why "generic" field types?


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139295032
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    --- End diff --
    
    Does setting column types (`Builder.setColumnType()`) improve the performance or has other benefits? Does it for instance check some types during parsing or can the parser automatically skip fields which are not configured (to avoid the manual projection)?


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139294708
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    --- End diff --
    
    add `this.` for consistency


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293599
  
    --- Diff: flink-libraries/flink-table/pom.xml ---
    @@ -114,6 +114,12 @@ under the License.
     			<groupId>joda-time</groupId>
     			<artifactId>joda-time</artifactId>
     		</dependency>
    +		<!-- FOR RFC 4180 Compliant CSV Parser -->
    +		<dependency>
    --- End diff --
    
    Maybe we should move this to a separate module.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293651
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    --- End diff --
    
    not used, remove


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293548
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    --- End diff --
    
    `recordDelimiter = DEFAULT_LINE_DELIMITER;`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293809
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    --- End diff --
    
    can be `private`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139294921
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    --- End diff --
    
    Throw an `IllegalStateException` and request to call `open()` first instead returning `null`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139295179
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    --- End diff --
    
    rename to `projectedFields`


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139295169
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    --- End diff --
    
    I think this array can be reused. We don't need to create a new one for each record


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139294098
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    --- End diff --
    
    reading a stream character by character is inefficient. Read a larger chunk of data into a buffer of 4KB and search on this buffer for the record delimiter.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296534
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		return record;
    +	}
    +
    +	public <I, O> O dataTypeConversion(I input, Class<O> outputClass) throws Exception {
    +
    +		String type = outputClass.getSimpleName().toLowerCase();
    +		try {
    +			switch (type) {
    +				case "string":
    +					return (O) input.toString();
    +				case "boolean":
    +					if (input.toString().length() != 0) { // special case for boolean
    +						return (O) Boolean.valueOf(input.toString());
    +					}
    +					return null;
    +				case "byte":
    +					return (O) Byte.valueOf(input.toString());
    +				case "short":
    +					return (O) Short.valueOf(input.toString());
    +				case "integer":
    +					return (O) Integer.valueOf(input.toString());
    +				case "long":
    +					return (O) Long.valueOf(input.toString());
    +				case "float":
    +					return (O) Float.valueOf(input.toString());
    +				case "double":
    +					return (O) Double.valueOf(input.toString());
    +				case "decimal":
    +					return (O) Double.valueOf(input.toString());
    +				case "date":
    +					return (O) Date.valueOf(input.toString());
    +				case "time":
    +					return (O) Time.valueOf(input.toString());
    +				case "timestamp":
    +					return (O) Timestamp.valueOf(input.toString());
    +				default:
    +					return (O) input;
    +			}
    +		}
    +		catch (Exception e) {
    +			if (isLenient()) {
    +				return null;
    +			}
    +			else {
    +				throw new ParseException();
    +			}
    +		}
    +
    +	}
    +
    +	// create projection mask
    +
    +	protected static boolean[] createDefaultMask(int size) {
    +		boolean[] includedMask = new boolean[size];
    +		for (int x = 0; x < includedMask.length; x++) {
    +			includedMask[x] = true;
    +		}
    +		return includedMask;
    +	}
    +
    +	protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
    +		Preconditions.checkNotNull(sourceFieldIndices);
    +
    +		int max = 0;
    +		for (int i : sourceFieldIndices) {
    +			if (i < 0) {
    +				throw new IllegalArgumentException("Field indices must not be smaller than zero.");
    +			}
    +			max = Math.max(i, max);
    +		}
    +
    +		boolean[] includedMask = new boolean[max + 1];
    +
    +		// check if we support parsers for these types
    +		for (int i = 0; i < sourceFieldIndices.length; i++) {
    +			includedMask[sourceFieldIndices[i]] = true;
    +		}
    +
    +		return includedMask;
    +	}
    +
    +	// configuration setting
    +
    +	public String getDelimiter() {
    +		return recordDelimiter;
    +	}
    +
    +	public void setDelimiter(char delimiter) {
    +		setDelimiter(String.valueOf(delimiter));
    +	}
    +
    +	public void setDelimiter(String delimiter) {
    +		if (delimiter == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (delimiter.length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.recordDelimiter = delimiter;
    +	}
    +
    +	public void setFieldDelimiter(char delimiter) {
    +		if (String.valueOf(delimiter) == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (String.valueOf(delimiter).length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.fieldDelimiter = delimiter;
    +	}
    +
    +	public void setFieldDelimiter(String delimiter) {
    +		if (delimiter == null) {
    +			throw new IllegalArgumentException("Delimiter must not be null");
    +		}
    +		if (delimiter.length() == 0) {
    +			throw new IllegalArgumentException("Delimiter must not be empty");
    +		}
    +		this.fieldDelimiter = delimiter.charAt(0);
    +	}
    +
    +	public char getFieldDelimiter() {
    +		return this.fieldDelimiter;
    +	}
    +
    +	public void setSkipFirstLineAsHeader(boolean skipFirstLine) {
    +		this.skipFirstLineAsHeader = skipFirstLine;
    +	}
    +
    +	public void enableQuotedStringParsing(char quoteCharacter) {
    +		quotedStringParsing = true;
    +		this.quoteCharacter = quoteCharacter;
    +	}
    +
    +	public boolean isLenient() {
    +		return lenient;
    +	}
    +
    +	public void setLenient(boolean lenient) {
    +		this.lenient = lenient;
    +	}
    +
    +	public void setCommentPrefix(String commentPrefix) {
    +		this.commentPrefix = commentPrefix;
    +		this.allowComments = true;
    +	}
    +
    +	protected Class<?>[] getGenericFieldTypes() {
    +		// check if we are dense, i.e., we read all fields
    +		if (this.fieldIncluded.length == this.fieldTypes.length) {
    +			return this.fieldTypes;
    +		}
    +		else {
    +			// sparse type array which we made dense for internal book keeping.
    +			// create a sparse copy to return
    +			Class<?>[] types = new Class<?>[this.fieldIncluded.length];
    +
    +			for (int i = 0, k = 0; i < this.fieldIncluded.length; i++) {
    +				if (this.fieldIncluded[i]) {
    +					types[i] = this.fieldTypes[k++];
    +				}
    +			}
    +
    +			return types;
    +		}
    +	}
    +
    +	protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) {
    --- End diff --
    
    please add in-line comments


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296016
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    --- End diff --
    
    I think both cases are exactly the same. We should not throw an exception in the lenient case


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139294380
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    --- End diff --
    
    Same comments as for `findFirstDelimiterPosition()` apply


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139296200
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    +
    +			if (record.length < fieldTypes.length) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException();
    +				}
    +			}
    +
    +			try {
    +				return fillRecord(reuse, castRecord(projectedFields(record)));
    +			}
    +			catch (IOException e) {
    +				if (isLenient()) {
    +					return nextRecord(reuse);
    +				}
    +				else {
    +					throw new ParseException(e);
    +				}
    +			}
    +		}
    +		endOfSplit = true;
    +		return null;
    +	}
    +
    +	protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +	@Override
    +	public String toString() {
    +		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + getFilePath();
    +	}
    +
    +	@Override
    +	public boolean reachedEnd() throws IOException {
    +		return endOfSplit;
    +	}
    +
    +	private Object[] projectedFields(Object[] record) throws IOException {
    +
    +		Object[] resultantRecord = new Object[fieldTypes.length];
    +		int index = 0;
    +		for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +			try {
    +				if (fieldIncluded[i]) {
    +					resultantRecord[index++] = record[i];
    +				}
    +			}
    +			catch (Exception e) {
    +				throw new IOException();
    +			}
    +		}
    +		return resultantRecord;
    +	}
    +
    +	private long findFirstDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.getSplitStart());
    +		if (this.stream.getPos() == 0) {
    +			return 0;
    +		}
    +		else {
    +			int pos = 1;
    +			while ((this.stream.read()) != this.recordDelimiter.charAt(0)) {
    +				pos++;
    +			}
    +			return pos;
    +		}
    +	}
    +
    +	private long findLastDelimiterPosition() throws IOException{
    +
    +		this.stream.seek(this.splitStart + this.splitLength);
    +		int pos = 0;
    +		char c;
    +		int read = this.stream.read();
    +		while (read != -1) {
    +			c = (char) read;
    +			if (c == this.recordDelimiter.charAt(0)) {
    +				break;
    +			}
    +			else {
    +				read = this.stream.read();
    +			}
    +			pos++;
    +		}
    +		return pos;
    +	}
    +
    +	private Object[] castRecord(Object[] record)  throws IOException {
    +
    +		if (isLenient()) {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		else {
    +			for (int i = 0; i < this.fieldTypes.length; i++) {
    +				try {
    +					record[i] = dataTypeConversion(record[i], fieldTypes[i]);
    +				} catch (Exception e) {
    +					throw new IOException(e);
    +				}
    +			}
    +		}
    +		return record;
    +	}
    +
    +	public <I, O> O dataTypeConversion(I input, Class<O> outputClass) throws Exception {
    +
    +		String type = outputClass.getSimpleName().toLowerCase();
    --- End diff --
    
    Don't do this for every record. This can be precomputed. Moreover String comparisons are not very efficient. Rather check directly the class.


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139295981
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	private String recordDelimiter = "\n";
    +	private char fieldDelimiter = ',';
    +
    +	private boolean skipFirstLineAsHeader;
    +	private boolean skipFirstLine = false; // only for first split
    +
    +	private boolean quotedStringParsing = false;
    +	private char quoteCharacter;
    +
    +	private boolean lenient;
    +
    +	private String commentPrefix = null;
    +	private boolean allowComments = false;
    +
    +	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +	private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +	private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +	MappingIterator<Object[]> recordIterator = null;
    +
    +	private boolean endOfSplit = false;
    +
    +	protected RFCCsvInputFormat(Path filePath) {
    +		super(filePath);
    +	}
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +
    +		super.open(split);
    +
    +		CsvMapper mapper = new CsvMapper();
    +		mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +		long firstDelimiterPosition = findFirstDelimiterPosition();
    +		long lastDelimiterPosition = findLastDelimiterPosition();
    +		long startPos = this.splitStart + firstDelimiterPosition;
    +		long endPos = this.splitLength + lastDelimiterPosition - firstDelimiterPosition;
    +		this.stream.seek(startPos);
    +		BoundedInputStream boundedInputStream = new BoundedInputStream(this.stream, endPos);
    +
    +		if (skipFirstLineAsHeader && startPos == 0) {
    +			skipFirstLine = true;
    +		}
    +
    +		CsvParser csvParser = mapper.getFactory().createParser(boundedInputStream);
    +		CsvSchema csvSchema = configureParserSettings();
    +		csvParser.setSchema(csvSchema);
    +
    +		recordIterator = mapper.readerFor(Object[].class).readValues(csvParser);
    +	}
    +
    +	private CsvSchema configureParserSettings() {
    +
    +		CsvSchema csvSchema = CsvSchema.builder()
    +			.setLineSeparator(this.recordDelimiter)
    +			.setColumnSeparator(this.fieldDelimiter)
    +			.setSkipFirstDataRow(skipFirstLine)
    +			.setQuoteChar(this.quoteCharacter)
    +			.setAllowComments(allowComments)
    +			.build();
    +		return  csvSchema;
    +	}
    +
    +	public OUT nextRecord(OUT reuse) throws IOException {
    +
    +		if (recordIterator == null) {
    +			return null;
    +		}
    +
    +		if (recordIterator.hasNext()) {
    +
    +			Object[] record = recordIterator.next();
    --- End diff --
    
    does this call never fail or should we add a `lenient` check here as well?


---

[GitHub] flink pull request #4660: [FLINK-7050][table] Add support of RFC compliant C...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139293524
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    --- End diff --
    
    The annotations `@Internal`, `@PublicEvolving`, and `@Public` are not used in `flink-table`


---