You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/07 10:56:38 UTC
[2/7] drill git commit: DRILL-2006: Updated Text reader. Increases
variations of text files Drill can work with.
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
new file mode 100644
index 0000000..e411461
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.univocity.parsers.common.TextParsingException;
+
+public class TextParsingSettings {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextParsingSettings.class);
+
+ public static final TextParsingSettings DEFAULT = new TextParsingSettings();
+
+ private String emptyValue = null;
+ private boolean parseUnescapedQuotes = true;
+ private byte quote = b('"');
+ private byte quoteEscape = b('"');
+ private byte delimiter = b(',');
+ private byte comment = b('#');
+
+ private long maxCharsPerColumn = Character.MAX_VALUE;
+ private byte normalizedNewLine = b('\n');
+ private byte[] newLineDelimiter = {normalizedNewLine};
+ private boolean ignoreLeadingWhitespaces = false;
+ private boolean ignoreTrailingWhitespaces = false;
+ private String lineSeparatorString = "\n";
+ private boolean skipFirstLine = false;
+
+ // these options are not yet supported
+ private boolean headerExtractionEnabled = false;
+ private boolean useRepeatedVarChar = true;
+ private int numberOfRecordsToRead = -1;
+
+ public void set(TextFormatConfig config){
+ this.quote = bSafe(config.getQuote(), "quote");
+ this.quoteEscape = bSafe(config.getEscape(), "escape");
+ this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
+ Preconditions.checkArgument(newLineDelimiter.length == 1 || newLineDelimiter.length == 2,
+ String.format("Line delimiter must be 1 or 2 bytes in length. The provided delimiter was %d bytes long.", newLineDelimiter.length));
+ this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
+ this.comment = bSafe(config.getComment(), "comment");
+ this.skipFirstLine = config.isSkipFirstLine();
+ }
+
+ public byte getComment(){
+ return comment;
+ }
+
+ public boolean isSkipFirstLine() {
+ return skipFirstLine;
+ }
+
+ public void setSkipFirstLine(boolean skipFirstLine) {
+ this.skipFirstLine = skipFirstLine;
+ }
+
+ public boolean isUseRepeatedVarChar() {
+ return useRepeatedVarChar;
+ }
+
+ public void setUseRepeatedVarChar(boolean useRepeatedVarChar) {
+ this.useRepeatedVarChar = useRepeatedVarChar;
+ }
+
+
+ private static byte bSafe(char c, String name){
+ if(c > Byte.MAX_VALUE) {
+ throw new IllegalArgumentException(String.format("Failure validating configuration option %s. Expected a "
+ + "character between 0 and 127 but value was actually %d.", name, (int) c));
+ }
+ return (byte) c;
+ }
+
+ private static byte b(char c){
+ return (byte) c;
+ }
+
+ public byte[] getNewLineDelimiter() {
+ return newLineDelimiter;
+ }
+
+ /**
+ * Returns the character used for escaping values where the field delimiter is part of the value. Defaults to '"'
+ * @return the quote character
+ */
+ public byte getQuote() {
+ return quote;
+ }
+
+ /**
+ * Defines the character used for escaping values where the field delimiter is part of the value. Defaults to '"'
+ * @param quote the quote character
+ */
+ public void setQuote(byte quote) {
+ this.quote = quote;
+ }
+
+ public String getLineSeparatorString(){
+ return lineSeparatorString;
+ }
+
+
+ /**
+ * Identifies whether or not a given character is used for escaping values where the field delimiter is part of the value
+ * @param ch the character to be verified
+ * @return true if the given character is the character used for escaping values, false otherwise
+ */
+ public boolean isQuote(byte ch) {
+ return this.quote == ch;
+ }
+
+ /**
+ * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"'
+ * @return the quote escape character
+ */
+ public byte getQuoteEscape() {
+ return quoteEscape;
+ }
+
+ /**
+ * Defines the character used for escaping quotes inside an already quoted value. Defaults to '"'
+ * @param quoteEscape the quote escape character
+ */
+ public void setQuoteEscape(byte quoteEscape) {
+ this.quoteEscape = quoteEscape;
+ }
+
+ /**
+ * Identifies whether or not a given character is used for escaping quotes inside an already quoted value.
+ * @param ch the character to be verified
+ * @return true if the given character is the quote escape character, false otherwise
+ */
+ public boolean isQuoteEscape(byte ch) {
+ return this.quoteEscape == ch;
+ }
+
+ /**
+ * Returns the field delimiter character. Defaults to ','
+ * @return the field delimiter character
+ */
+ public byte getDelimiter() {
+ return delimiter;
+ }
+
+ /**
+ * Defines the field delimiter character. Defaults to ','
+ * @param delimiter the field delimiter character
+ */
+ public void setDelimiter(byte delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ /**
+ * Identifies whether or not a given character represents a field delimiter
+ * @param ch the character to be verified
+ * @return true if the given character is the field delimiter character, false otherwise
+ */
+ public boolean isDelimiter(byte ch) {
+ return this.delimiter == ch;
+ }
+
+ /**
+ * Returns the String representation of an empty value (defaults to null)
+ *
+ * <p>When reading, if the parser does not read any character from the input, and the input is within quotes, the empty is used instead of an empty string
+ *
+ * @return the String representation of an empty value
+ */
+ public String getEmptyValue() {
+ return emptyValue;
+ }
+
+ /**
+ * Sets the String representation of an empty value (defaults to null)
+ *
+ * <p>When reading, if the parser does not read any character from the input, and the input is within quotes, the empty is used instead of an empty string
+ *
+ * @param emptyValue the String representation of an empty value
+ */
+ public void setEmptyValue(String emptyValue) {
+ this.emptyValue = emptyValue;
+ }
+
+
+ /**
+ * Indicates whether the CSV parser should accept unescaped quotes inside quoted values and parse them normally. Defaults to {@code true}.
+ * @return a flag indicating whether or not the CSV parser should accept unescaped quotes inside quoted values.
+ */
+ public boolean isParseUnescapedQuotes() {
+ return parseUnescapedQuotes;
+ }
+
+ /**
+ * Configures how to handle unescaped quotes inside quoted values. If set to {@code true}, the parser will parse the quote normally as part of the value.
+ * If set the {@code false}, a {@link TextParsingException} will be thrown. Defaults to {@code true}.
+ * @param parseUnescapedQuotes indicates whether or not the CSV parser should accept unescaped quotes inside quoted values.
+ */
+ public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) {
+ this.parseUnescapedQuotes = parseUnescapedQuotes;
+ }
+
+ /**
+ * Indicates whether or not the first valid record parsed from the input should be considered as the row containing the names of each column
+ * @return true if the first valid record parsed from the input should be considered as the row containing the names of each column, false otherwise
+ */
+ public boolean isHeaderExtractionEnabled() {
+ return headerExtractionEnabled;
+ }
+
+ /**
+ * Defines whether or not the first valid record parsed from the input should be considered as the row containing the names of each column
+ * @param headerExtractionEnabled a flag indicating whether the first valid record parsed from the input should be considered as the row containing the names of each column
+ */
+ public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) {
+ this.headerExtractionEnabled = headerExtractionEnabled;
+ }
+
+ /**
+ * The number of valid records to be parsed before the process is stopped. A negative value indicates there's no limit (defaults to -1).
+ * @return the number of records to read before stopping the parsing process.
+ */
+ public int getNumberOfRecordsToRead() {
+ return numberOfRecordsToRead;
+ }
+
+ /**
+ * Defines the number of valid records to be parsed before the process is stopped. A negative value indicates there's no limit (defaults to -1).
+ * @param numberOfRecordsToRead the number of records to read before stopping the parsing process.
+ */
+ public void setNumberOfRecordsToRead(int numberOfRecordsToRead) {
+ this.numberOfRecordsToRead = numberOfRecordsToRead;
+ }
+
+ public long getMaxCharsPerColumn() {
+ return maxCharsPerColumn;
+ }
+
+ public void setMaxCharsPerColumn(long maxCharsPerColumn) {
+ this.maxCharsPerColumn = maxCharsPerColumn;
+ }
+
+ public void setComment(byte comment) {
+ this.comment = comment;
+ }
+
+ public byte getNormalizedNewLine() {
+ return normalizedNewLine;
+ }
+
+ public void setNormalizedNewLine(byte normalizedNewLine) {
+ this.normalizedNewLine = normalizedNewLine;
+ }
+
+ public boolean isIgnoreLeadingWhitespaces() {
+ return ignoreLeadingWhitespaces;
+ }
+
+ public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) {
+ this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces;
+ }
+
+ public boolean isIgnoreTrailingWhitespaces() {
+ return ignoreTrailingWhitespaces;
+ }
+
+ public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) {
+ this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces;
+ }
+
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
new file mode 100644
index 0000000..fec0ab4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.UserException;
+
+import com.univocity.parsers.common.TextParsingException;
+import com.univocity.parsers.csv.CsvParserSettings;
+
+/*******************************************************************************
+ * Portions Copyright 2014 uniVocity Software Pty Ltd
+ ******************************************************************************/
+
+/**
+ * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and
+ * DrillBuf support.
+ */
+final class TextReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class);
+
+ private static final byte NULL_BYTE = (byte) '\0';
+
+ private final TextParsingContext context;
+
+ private final long recordsToRead;
+ private final TextParsingSettings settings;
+
+ private final TextInput input;
+ private final TextOutput output;
+ private final DrillBuf workBuf;
+
+ private byte ch;
+
+ // index of the field within this record
+ private int fieldIndex;
+
+ /** Behavior settings **/
+ private final boolean ignoreTrailingWhitespace;
+ private final boolean ignoreLeadingWhitespace;
+ private final boolean parseUnescapedQuotes;
+
+ /** Key Characters **/
+ private final byte comment;
+ private final byte delimiter;
+ private final byte quote;
+ private final byte quoteEscape;
+ private final byte newLine;
+
+ /**
+ * The CsvParser supports all settings provided by {@link CsvParserSettings}, and requires this configuration to be
+ * properly initialized.
+ * @param settings the parser configuration
+ * @param input input stream
+ * @param output interface to produce output record batch
+ * @param workBuf working buffer to handle whitespaces
+ */
+ public TextReader(TextParsingSettings settings, TextInput input, TextOutput output, DrillBuf workBuf) {
+ this.context = new TextParsingContext(input, output);
+ this.workBuf = workBuf;
+ this.settings = settings;
+
+ this.recordsToRead = settings.getNumberOfRecordsToRead() == -1 ? Long.MAX_VALUE : settings.getNumberOfRecordsToRead();
+
+ this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces();
+ this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces();
+ this.parseUnescapedQuotes = settings.isParseUnescapedQuotes();
+ this.delimiter = settings.getDelimiter();
+ this.quote = settings.getQuote();
+ this.quoteEscape = settings.getQuoteEscape();
+ this.newLine = settings.getNormalizedNewLine();
+ this.comment = settings.getComment();
+
+ this.input = input;
+ this.output = output;
+
+ }
+
+ public TextOutput getOutput(){
+ return output;
+ }
+
+ /* Check if the given byte is a white space. As per the univocity text reader
+ * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed
+ * we have an additional check to make sure its not negative
+ */
+ static final boolean isWhite(byte b){
+ return b <= ' ' && b > -1;
+ }
+
+ // Inform the output interface to indicate we are starting a new record batch
+ public void resetForNextBatch(){
+ output.startBatch();
+ }
+
+ public long getPos(){
+ return input.getPos();
+ }
+
+ /**
+ * Function encapsulates parsing an entire record, delegates parsing of the
+ * fields to parseField() function.
+ * We mark the start of the record and if there are any failures encountered (OOM for eg)
+ * then we reset the input stream to the marked position
+ * @return true if parsing this record was successful; false otherwise
+ * @throws IOException
+ */
+ private boolean parseRecord() throws IOException {
+ final byte newLine = this.newLine;
+ final TextInput input = this.input;
+
+ input.mark();
+
+ fieldIndex = 0;
+ if (isWhite(ch) && ignoreLeadingWhitespace) {
+ skipWhitespace();
+ }
+
+ int fieldsWritten = 0;
+ try{
+ boolean earlyTerm = false;
+ while (ch != newLine) {
+ earlyTerm = !parseField();
+ fieldsWritten++;
+ if (ch != newLine) {
+ ch = input.nextChar();
+ if (ch == newLine) {
+ output.endEmptyField();
+ break;
+ }
+ }
+ if(earlyTerm){
+ if(ch != newLine){
+ input.skipLines(1);
+ }
+ break;
+ }
+ }
+ }catch(StreamFinishedPseudoException e){
+ // if we've written part of a field or all of a field, we should send this row.
+ if(fieldsWritten == 0 && !output.rowHasData()){
+ throw e;
+ }
+ }
+
+ output.finishRecord();
+ return true;
+ }
+
+ /**
+ * Function parses an individual field and ignores any white spaces encountered
+ * by not appending it to the output vector
+ * @throws IOException
+ */
+ private void parseValueIgnore() throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextOutput output = this.output;
+ final TextInput input = this.input;
+
+ byte ch = this.ch;
+ while (ch != delimiter && ch != newLine) {
+ output.appendIgnoringWhitespace(ch);
+// fieldSize++;
+ ch = input.nextChar();
+ }
+ this.ch = ch;
+ }
+
+ /**
+ * Function parses an individual field and appends all characters till the delimeter (or newline)
+ * to the output, including white spaces
+ * @throws IOException
+ */
+ private void parseValueAll() throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextOutput output = this.output;
+ final TextInput input = this.input;
+
+ byte ch = this.ch;
+ while (ch != delimiter && ch != newLine) {
+ output.append(ch);
+ ch = input.nextChar();
+ }
+ this.ch = ch;
+ }
+
+ /**
+ * Function simply delegates the parsing of a single field to the actual implementation based on parsing config
+ * @throws IOException
+ */
+ private void parseValue() throws IOException {
+ if (ignoreTrailingWhitespace) {
+ parseValueIgnore();
+ }else{
+ parseValueAll();
+ }
+ }
+
+ /**
+ * Recursive function invoked when a quote is encountered. Function also
+ * handles the case when there are non-white space characters in the field
+ * after the quoted value.
+ * @param prev previous byte read
+ * @throws IOException
+ */
+ private void parseQuotedValue(byte prev) throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextOutput output = this.output;
+ final TextInput input = this.input;
+ final byte quote = this.quote;
+
+ ch = input.nextChar();
+
+ while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) {
+ if (ch != quote) {
+ if (prev == quote) { // unescaped quote detected
+ if (parseUnescapedQuotes) {
+ output.append(quote);
+ output.append(ch);
+ parseQuotedValue(ch);
+ break;
+ } else {
+ throw new TextParsingException(
+ context,
+ "Unescaped quote character '"
+ + quote
+ + "' inside quoted value of CSV field. To allow unescaped quotes, set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. Cannot parse CSV input.");
+ }
+ }
+ output.append(ch);
+ prev = ch;
+ } else if (prev == quoteEscape) {
+ output.append(quote);
+ prev = NULL_BYTE;
+ } else {
+ prev = ch;
+ }
+ ch = input.nextChar();
+ }
+
+ // handles whitespaces after quoted value: whitespaces are ignored. Content after whitespaces may be parsed if
+ // 'parseUnescapedQuotes' is enabled.
+ if (ch != newLine && ch <= ' ') {
+ final DrillBuf workBuf = this.workBuf;
+ workBuf.resetWriterIndex();
+ do {
+ // saves whitespaces after value
+ workBuf.writeByte(ch);
+ ch = input.nextChar();
+ // found a new line, go to next record.
+ if (ch == newLine) {
+ return;
+ }
+ } while (ch <= ' ');
+
+ // there's more stuff after the quoted value, not only empty spaces.
+ if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) {
+
+ output.append(quote);
+ for(int i =0; i < workBuf.writerIndex(); i++){
+ output.append(workBuf.getByte(i));
+ }
+ // the next character is not the escape character, put it there
+ if (ch != quoteEscape) {
+ output.append(ch);
+ }
+ // sets this character as the previous character (may be escaping)
+ // calls recursively to keep parsing potentially quoted content
+ parseQuotedValue(ch);
+ }
+ }
+
+ if (!(ch == delimiter || ch == newLine)) {
+ throw new TextParsingException(context, "Unexpected character '" + ch
+ + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input.");
+ }
+ }
+
+ /**
+ * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function
+ * @return
+ * @throws IOException
+ */
+ private final boolean parseField() throws IOException {
+
+ output.startField(fieldIndex++);
+
+ if (isWhite(ch) && ignoreLeadingWhitespace) {
+ skipWhitespace();
+ }
+
+ if (ch == delimiter) {
+ return output.endEmptyField();
+ } else {
+ if (ch == quote) {
+ parseQuotedValue(NULL_BYTE);
+ } else {
+ parseValue();
+ }
+
+ return output.endField();
+ }
+
+ }
+
+ /**
+ * Helper function to skip white spaces occurring at the current input stream.
+ * @throws IOException
+ */
+ private void skipWhitespace() throws IOException {
+ final byte delimiter = this.delimiter;
+ final byte newLine = this.newLine;
+ final TextInput input = this.input;
+
+ while (isWhite(ch) && ch != delimiter && ch != newLine) {
+ ch = input.nextChar();
+ }
+ }
+
+ /**
+ * Starting point for the reader. Sets up the input interface.
+ * @throws IOException
+ */
+ public final void start() throws IOException {
+ context.stopped = false;
+ input.start();
+ }
+
+
+ /**
+ * Parses the next record from the input. Will skip the line if its a comment,
+ * this is required when the file contains headers
+ * @throws IOException
+ */
+ public final boolean parseNext() throws IOException {
+ try {
+ while (!context.stopped) {
+ ch = input.nextChar();
+ if (ch == comment) {
+ input.skipLines(1);
+ continue;
+ }
+ break;
+ }
+ final long initialLineNumber = input.lineCount();
+ boolean success = parseRecord();
+ if (initialLineNumber + 1 < input.lineCount()) {
+ throw new TextParsingException(context, "Cannot use newline character within quoted string");
+ }
+
+ if(success){
+ if (recordsToRead > 0 && context.currentRecord() >= recordsToRead) {
+ context.stop();
+ }
+ return true;
+ }else{
+ return false;
+ }
+
+ } catch (StreamFinishedPseudoException ex) {
+ stopParsing();
+ return false;
+ } catch (Exception ex) {
+ try {
+ throw handleException(ex);
+ } finally {
+ stopParsing();
+ }
+ }
+ }
+
+ private void stopParsing(){
+
+ }
+
+ private String displayLineSeparators(String str, boolean addNewLine) {
+ if (addNewLine) {
+ if (str.contains("\r\n")) {
+ str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t");
+ } else if (str.contains("\n")) {
+ str = str.replaceAll("\\n", "[\\\\n]\n\t");
+ } else {
+ str = str.replaceAll("\\r", "[\\\\r]\r\t");
+ }
+ } else {
+ str = str.replaceAll("\\n", "\\\\n");
+ str = str.replaceAll("\\r", "\\\\r");
+ }
+ return str;
+ }
+
+ /**
+ * Helper method to handle exceptions caught while processing text files and generate better error messages associated with
+ * the exception.
+ * @param ex Exception raised
+ * @return
+ * @throws IOException
+ */
+ private TextParsingException handleException(Exception ex) throws IOException {
+
+ if (ex instanceof TextParsingException) {
+ throw (TextParsingException) ex;
+ }
+
+ if (ex instanceof ArrayIndexOutOfBoundsException) {
+ ex = UserException
+ .dataReadError(ex)
+ .message(
+ "Drill failed to read your text file. Drill supports up to %d columns in a text file. Your file appears to have more than that.",
+ RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS)
+ .build();
+ }
+
+ String message = null;
+ String tmp = input.getStringSinceMarkForError();
+ char[] chars = tmp.toCharArray();
+ if (chars != null) {
+ int length = chars.length;
+ if (length > settings.getMaxCharsPerColumn()) {
+ message = "Length of parsed input (" + length
+ + ") exceeds the maximum number of characters defined in your parser settings ("
+ + settings.getMaxCharsPerColumn() + "). ";
+ }
+
+ if (tmp.contains("\n") || tmp.contains("\r")) {
+ tmp = displayLineSeparators(tmp, true);
+ String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false);
+ message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '"
+ + lineSeparator + "'. Parsed content:\n\t" + tmp;
+ }
+
+ int nullCharacterCount = 0;
+ // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError
+ int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length;
+ StringBuilder s = new StringBuilder(maxLength);
+ for (int i = 0; i < maxLength; i++) {
+ if (chars[i] == '\0') {
+ s.append('\\');
+ s.append('0');
+ nullCharacterCount++;
+ } else {
+ s.append(chars[i]);
+ }
+ }
+ tmp = s.toString();
+
+ if (nullCharacterCount > 0) {
+ message += "\nIdentified "
+ + nullCharacterCount
+ + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t"
+ + tmp;
+ }
+
+ }
+
+ throw new TextParsingException(context, message, ex);
+ }
+
+ /**
+ * Finish the processing of a batch, indicates to the output
+ * interface to wrap up the batch
+ */
+ public void finishBatch(){
+ output.finishBatch();
+// System.out.println(String.format("line %d, cnt %d", input.getLineCount(), output.getRecordCount()));
+ }
+
+ /**
+ * Invoked once there are no more records and we are done with the
+ * current record reader to clean up state.
+ * @throws IOException
+ */
+ public void close() throws IOException{
+ input.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 43e6416..fd97c48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -82,7 +82,7 @@ public class MockRecordReader extends AbstractRecordReader {
}
@Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
try {
this.output = output;
int estimateRowSize = getEstimatedRecordSize(config.getTypes());
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d5b7303..5e9c4ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -165,9 +165,6 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
ScanBatch s =
new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
- for(RecordReader r : readers){
- r.setOperatorContext(s.getOperatorContext());
- }
return s;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2072aae..ebfbd54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -204,10 +204,11 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
@Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+ this.operatorContext = context;
if (!isStarQuery()) {
columnsFound = new boolean[getColumns().size()];
- nullFilledVectors = new ArrayList();
+ nullFilledVectors = new ArrayList<>();
}
columnStatuses = new ArrayList<>();
// totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 07950df..8ad0d4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -199,9 +199,10 @@ public class DrillParquetReader extends AbstractRecordReader {
}
@Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
try {
+ this.operatorContext = context;
schema = footer.getFileMetaData().getSchema();
MessageType projection = null;
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index b7ffbf0..cf98b83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -73,7 +73,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
}
@Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
try {
Field[] fields = pojoClass.getDeclaredFields();
List<PojoWriter> writers = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
index 04838bd..819f895 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-public class CompleteFileWork implements FileWork, CompleteWork{
+public class CompleteFileWork implements FileWork, CompleteWork {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteFileWork.class);
private long start;
@@ -40,7 +40,22 @@ public class CompleteFileWork implements FileWork, CompleteWork{
@Override
public int compareTo(CompleteWork o) {
+ if(o instanceof CompleteFileWork){
+ CompleteFileWork c = (CompleteFileWork) o;
+ int cmp = path.compareTo(c.getPath());
+ if(cmp != 0){
+ return cmp;
+ }
+
+ cmp = Long.compare(start, c.getStart());
+ if(cmp != 0){
+ return cmp;
+ }
+
+ }
+
return Long.compare(getTotalBytes(), o.getTotalBytes());
+
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 87c78b2..0322f36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -131,7 +131,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
}
@Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR));
try {
vector = output.addField(field, RepeatedVarCharVector.class);
@@ -192,7 +192,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
v.getMutator().setValueCount(recordCount);
}
vector.getMutator().setValueCount(recordCount);
- logger.debug("text scan batch size {}", batchSize);
+// logger.debug("text scan batch size {}", batchSize);
return recordCount;
} catch(Exception e) {
cleanup();
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 83a1cb8..8fdaa72 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -626,7 +626,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs,
new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns);
TestOutputMutator mutator = new TestOutputMutator(allocator);
- rr.setup(mutator);
+ rr.setup(null, mutator);
Stopwatch watch = new Stopwatch();
watch.start();
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
new file mode 100644
index 0000000..76674f9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.text;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestNewTextReader extends BaseTestQuery {
+
+ @Test
+ public void fieldDelimiterWithinQuotes() throws Exception {
+ test("select columns[1] as col1 from cp.`textinput/input1.csv`");
+ testBuilder()
+ .sqlQuery("select columns[1] as col1 from cp.`textinput/input1.csv`")
+ .unOrdered()
+ .baselineColumns("col1")
+ .baselineValues("foo,bar")
+ .go();
+ }
+
+ @Test
+ public void ensureFailureOnNewLineDelimiterWithinQuotes() throws Exception {
+ try {
+ test("select columns[1] as col1 from cp.`textinput/input2.csv`");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Cannot use newline character within quoted string"));
+ return;
+ }
+ Assert.fail("Expected exception not thrown.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
index f07cf3b..882033a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
@@ -45,11 +45,12 @@ public class TestTextColumn extends BaseTestQuery{
"from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.txt`");
List<List<String>> expectedOutput = Arrays.asList(
- Arrays.asList("\"a, b,\",\"c\",\"d,, \\n e\""),
- Arrays.asList("\"d, e,\",\"f\",\"g,, \\n h\""),
- Arrays.asList("\"g, h,\",\"i\",\"j,, \\n k\""));
+ Arrays.asList("a, b,\",\"c\",\"d,, \\n e"),
+ Arrays.asList("d, e,\",\"f\",\"g,, \\n h"),
+ Arrays.asList("g, h,\",\"i\",\"j,, \\n k"));
List<List<String>> actualOutput = getOutput(batches);
+ System.out.println(actualOutput);
validateOutput(expectedOutput, actualOutput);
}
@@ -59,9 +60,9 @@ public class TestTextColumn extends BaseTestQuery{
"columns[3] as col4 from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.csv`");
List<List<String>> expectedOutput = Arrays.asList(
- Arrays.asList("\"a, b,\"", "\"c\"", "\"d,, \\n e\"","\"f\\\"g\""),
- Arrays.asList("\"d, e,\"", "\"f\"", "\"g,, \\n h\"","\"i\\\"j\""),
- Arrays.asList("\"g, h,\"", "\"i\"", "\"j,, \\n k\"","\"l\\\"m\""));
+ Arrays.asList("a, b,", "c", "d,, \\n e","f\\\"g"),
+ Arrays.asList("d, e,", "f", "g,, \\n h","i\\\"j"),
+ Arrays.asList("g, h,", "i", "j,, \\n k","l\\\"m"));
List<List<String>> actualOutput = getOutput(batches);
validateOutput(expectedOutput, actualOutput);
@@ -81,7 +82,8 @@ public class TestTextColumn extends BaseTestQuery{
output.add(new ArrayList<String>());
for (VectorWrapper<?> vw: loader) {
ValueVector.Accessor accessor = vw.getValueVector().getAccessor();
- output.get(last).add(accessor.getObject(i).toString());
+ Object o = accessor.getObject(i);
+ output.get(last).add(o == null ? null: o.toString());
}
++last;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index d4d81f6..4a7a53f 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -37,7 +37,8 @@
},
"txt" : {
type : "text",
- extensions: [ "txt" ]
+ extensions: [ "txt" ],
+ delimiter: "\u0000"
},
"avro" : {
type: "avro"
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/store/text/data/letters.txt
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/data/letters.txt b/exec/java-exec/src/test/resources/store/text/data/letters.txt
index 14b9cb6..5421114 100644
--- a/exec/java-exec/src/test/resources/store/text/data/letters.txt
+++ b/exec/java-exec/src/test/resources/store/text/data/letters.txt
@@ -1,3 +1,3 @@
-"a, b,","c","d,, \n e"
-"d, e,","f","g,, \n h"
-"g, h,","i","j,, \n k"
\ No newline at end of file
+"a, b,"",""c"",""d,, \n e"
+"d, e,"",""f"",""g,, \n h"
+"g, h,"",""i"",""j,, \n k"
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/textinput/input1.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/textinput/input1.csv b/exec/java-exec/src/test/resources/textinput/input1.csv
new file mode 100644
index 0000000..4b70783
--- /dev/null
+++ b/exec/java-exec/src/test/resources/textinput/input1.csv
@@ -0,0 +1 @@
+1,"foo,bar"
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/textinput/input2.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/textinput/input2.csv b/exec/java-exec/src/test/resources/textinput/input2.csv
new file mode 100644
index 0000000..20c8d3c
--- /dev/null
+++ b/exec/java-exec/src/test/resources/textinput/input2.csv
@@ -0,0 +1,2 @@
+1,"a
+b"