You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/07 10:56:38 UTC

[2/7] drill git commit: DRILL-2006: Updated Text reader. Increases variations of text files Drill can work with.

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
new file mode 100644
index 0000000..e411461
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.univocity.parsers.common.TextParsingException;
+
+public class TextParsingSettings {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextParsingSettings.class);
+
+  public static final TextParsingSettings DEFAULT = new TextParsingSettings();
+
+  private String emptyValue = null;
+  private boolean parseUnescapedQuotes = true;
+  private byte quote = b('"');
+  private byte quoteEscape = b('"');
+  private byte delimiter = b(',');
+  private byte comment = b('#');
+
+  private long maxCharsPerColumn = Character.MAX_VALUE;
+  private byte normalizedNewLine = b('\n');
+  private byte[] newLineDelimiter = {normalizedNewLine};
+  private boolean ignoreLeadingWhitespaces = false;
+  private boolean ignoreTrailingWhitespaces = false;
+  private String lineSeparatorString = "\n";
+  private boolean skipFirstLine = false;
+
+  // these options are not yet supported
+  private boolean headerExtractionEnabled = false;
+  private boolean useRepeatedVarChar = true;
+  private int numberOfRecordsToRead = -1;
+
+  public void set(TextFormatConfig config){
+    this.quote = bSafe(config.getQuote(), "quote");
+    this.quoteEscape = bSafe(config.getEscape(), "escape");
+    this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
+    Preconditions.checkArgument(newLineDelimiter.length == 1 || newLineDelimiter.length == 2,
+        String.format("Line delimiter must be 1 or 2 bytes in length.  The provided delimiter was %d bytes long.", newLineDelimiter.length));
+    this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
+    this.comment = bSafe(config.getComment(), "comment");
+    this.skipFirstLine = config.isSkipFirstLine();
+  }
+
+  public byte getComment(){
+    return comment;
+  }
+
+  public boolean isSkipFirstLine() {
+    return skipFirstLine;
+  }
+
+  public void setSkipFirstLine(boolean skipFirstLine) {
+    this.skipFirstLine = skipFirstLine;
+  }
+
+  public boolean isUseRepeatedVarChar() {
+    return useRepeatedVarChar;
+  }
+
+  public void setUseRepeatedVarChar(boolean useRepeatedVarChar) {
+    this.useRepeatedVarChar = useRepeatedVarChar;
+  }
+
+
+  private static byte bSafe(char c, String name){
+    if(c > Byte.MAX_VALUE) {
+      throw new IllegalArgumentException(String.format("Failure validating configuration option %s.  Expected a "
+          + "character between 0 and 127 but value was actually %d.", name, (int) c));
+    }
+    return (byte) c;
+  }
+
+  private static byte b(char c){
+    return (byte) c;
+  }
+
+  public byte[] getNewLineDelimiter() {
+    return newLineDelimiter;
+  }
+
+  /**
+   * Returns the character used for escaping values where the field delimiter is part of the value. Defaults to '"'
+   * @return the quote character
+   */
+  public byte getQuote() {
+    return quote;
+  }
+
+  /**
+   * Defines the character used for escaping values where the field delimiter is part of the value. Defaults to '"'
+   * @param quote the quote character
+   */
+  public void setQuote(byte quote) {
+    this.quote = quote;
+  }
+
+  public String getLineSeparatorString(){
+    return lineSeparatorString;
+  }
+
+
+  /**
+   * Identifies whether or not a given character is used for escaping values where the field delimiter is part of the value
+   * @param ch the character to be verified
+   * @return true if the given character is the character used for escaping values, false otherwise
+   */
+  public boolean isQuote(byte ch) {
+    return this.quote == ch;
+  }
+
+  /**
+   * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"'
+   * @return the quote escape character
+   */
+  public byte getQuoteEscape() {
+    return quoteEscape;
+  }
+
+  /**
+   * Defines the character used for escaping quotes inside an already quoted value. Defaults to '"'
+   * @param quoteEscape the quote escape character
+   */
+  public void setQuoteEscape(byte quoteEscape) {
+    this.quoteEscape = quoteEscape;
+  }
+
+  /**
+   * Identifies whether or not a given character is used for escaping quotes inside an already quoted value.
+   * @param ch the character to be verified
+   * @return true if the given character is the quote escape character, false otherwise
+   */
+  public boolean isQuoteEscape(byte ch) {
+    return this.quoteEscape == ch;
+  }
+
+  /**
+   * Returns the field delimiter character. Defaults to ','
+   * @return the field delimiter character
+   */
+  public byte getDelimiter() {
+    return delimiter;
+  }
+
+  /**
+   * Defines the field delimiter character. Defaults to ','
+   * @param delimiter the field delimiter character
+   */
+  public void setDelimiter(byte delimiter) {
+    this.delimiter = delimiter;
+  }
+
+  /**
+   * Identifies whether or not a given character represents a field delimiter
+   * @param ch the character to be verified
+   * @return true if the given character is the field delimiter character, false otherwise
+   */
+  public boolean isDelimiter(byte ch) {
+    return this.delimiter == ch;
+  }
+
+  /**
+   * Returns the String representation of an empty value (defaults to null)
+   *
+   * <p>When reading, if the parser does not read any character from the input, and the input is within quotes, the empty is used instead of an empty string
+   *
+   * @return the String representation of an empty value
+   */
+  public String getEmptyValue() {
+    return emptyValue;
+  }
+
+  /**
+   * Sets the String representation of an empty value (defaults to null)
+   *
+   * <p>When reading, if the parser does not read any character from the input, and the input is within quotes, the empty is used instead of an empty string
+   *
+   * @param emptyValue the String representation of an empty value
+   */
+  public void setEmptyValue(String emptyValue) {
+    this.emptyValue = emptyValue;
+  }
+
+
+  /**
+   * Indicates whether the CSV parser should accept unescaped quotes inside quoted values and parse them normally. Defaults to {@code true}.
+   * @return a flag indicating whether or not the CSV parser should accept unescaped quotes inside quoted values.
+   */
+  public boolean isParseUnescapedQuotes() {
+    return parseUnescapedQuotes;
+  }
+
+  /**
+   * Configures how to handle unescaped quotes inside quoted values. If set to {@code true}, the parser will parse the quote normally as part of the value.
+   * If set the {@code false}, a {@link TextParsingException} will be thrown. Defaults to {@code true}.
+   * @param parseUnescapedQuotes indicates whether or not the CSV parser should accept unescaped quotes inside quoted values.
+   */
+  public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) {
+    this.parseUnescapedQuotes = parseUnescapedQuotes;
+  }
+
+  /**
+   * Indicates whether or not the first valid record parsed from the input should be considered as the row containing the names of each column
+   * @return true if the first valid record parsed from the input should be considered as the row containing the names of each column, false otherwise
+   */
+  public boolean isHeaderExtractionEnabled() {
+    return headerExtractionEnabled;
+  }
+
+  /**
+   * Defines whether or not the first valid record parsed from the input should be considered as the row containing the names of each column
+   * @param headerExtractionEnabled a flag indicating whether the first valid record parsed from the input should be considered as the row containing the names of each column
+   */
+  public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) {
+    this.headerExtractionEnabled = headerExtractionEnabled;
+  }
+
+  /**
+   * The number of valid records to be parsed before the process is stopped. A negative value indicates there's no limit (defaults to -1).
+   * @return the number of records to read before stopping the parsing process.
+   */
+  public int getNumberOfRecordsToRead() {
+    return numberOfRecordsToRead;
+  }
+
+  /**
+   * Defines the number of valid records to be parsed before the process is stopped. A negative value indicates there's no limit (defaults to -1).
+   * @param numberOfRecordsToRead the number of records to read before stopping the parsing process.
+   */
+  public void setNumberOfRecordsToRead(int numberOfRecordsToRead) {
+    this.numberOfRecordsToRead = numberOfRecordsToRead;
+  }
+
+  public long getMaxCharsPerColumn() {
+    return maxCharsPerColumn;
+  }
+
+  public void setMaxCharsPerColumn(long maxCharsPerColumn) {
+    this.maxCharsPerColumn = maxCharsPerColumn;
+  }
+
+  public void setComment(byte comment) {
+    this.comment = comment;
+  }
+
+  public byte getNormalizedNewLine() {
+    return normalizedNewLine;
+  }
+
+  public void setNormalizedNewLine(byte normalizedNewLine) {
+    this.normalizedNewLine = normalizedNewLine;
+  }
+
+  public boolean isIgnoreLeadingWhitespaces() {
+    return ignoreLeadingWhitespaces;
+  }
+
+  public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) {
+    this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces;
+  }
+
+  public boolean isIgnoreTrailingWhitespaces() {
+    return ignoreTrailingWhitespaces;
+  }
+
+  public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) {
+    this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces;
+  }
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
new file mode 100644
index 0000000..fec0ab4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.UserException;
+
+import com.univocity.parsers.common.TextParsingException;
+import com.univocity.parsers.csv.CsvParserSettings;
+
+/*******************************************************************************
+ * Portions Copyright 2014 uniVocity Software Pty Ltd
+ ******************************************************************************/
+
+/**
+ * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and
+ * DrillBuf support.
+ */
+final class TextReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class);
+
+  private static final byte NULL_BYTE = (byte) '\0';
+
+  private final TextParsingContext context;
+
+  private final long recordsToRead;
+  private final TextParsingSettings settings;
+
+  private final TextInput input;
+  private final TextOutput output;
+  private final DrillBuf workBuf;
+
+  private byte ch;
+
+  // index of the field within this record
+  private int fieldIndex;
+
+  /** Behavior settings **/
+  private final boolean ignoreTrailingWhitespace;
+  private final boolean ignoreLeadingWhitespace;
+  private final boolean parseUnescapedQuotes;
+
+  /** Key Characters **/
+  private final byte comment;
+  private final byte delimiter;
+  private final byte quote;
+  private final byte quoteEscape;
+  private final byte newLine;
+
+  /**
+   * The CsvParser supports all settings provided by {@link CsvParserSettings}, and requires this configuration to be
+   * properly initialized.
+   * @param settings  the parser configuration
+   * @param input  input stream
+   * @param output  interface to produce output record batch
+   * @param workBuf  working buffer to handle whitespaces
+   */
+  public TextReader(TextParsingSettings settings, TextInput input, TextOutput output, DrillBuf workBuf) {
+    this.context = new TextParsingContext(input, output);
+    this.workBuf = workBuf;
+    this.settings = settings;
+
+    this.recordsToRead = settings.getNumberOfRecordsToRead() == -1 ? Long.MAX_VALUE : settings.getNumberOfRecordsToRead();
+
+    this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces();
+    this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces();
+    this.parseUnescapedQuotes = settings.isParseUnescapedQuotes();
+    this.delimiter = settings.getDelimiter();
+    this.quote = settings.getQuote();
+    this.quoteEscape = settings.getQuoteEscape();
+    this.newLine = settings.getNormalizedNewLine();
+    this.comment = settings.getComment();
+
+    this.input = input;
+    this.output = output;
+
+  }
+
+  public TextOutput getOutput(){
+    return output;
+  }
+
+  /* Check if the given byte is a white space. As per the univocity text reader
+   * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed
+   * we have an additional check to make sure its not negative
+   */
+  static final boolean isWhite(byte b){
+    return b <= ' ' && b > -1;
+  }
+
+  // Inform the output interface to indicate we are starting a new record batch
+  public void resetForNextBatch(){
+    output.startBatch();
+  }
+
+  public long getPos(){
+    return input.getPos();
+  }
+
+  /**
+   * Function encapsulates parsing an entire record, delegates parsing of the
+   * fields to parseField() function.
+   * We mark the start of the record and if there are any failures encountered (OOM for eg)
+   * then we reset the input stream to the marked position
+   * @return  true if parsing this record was successful; false otherwise
+   * @throws IOException
+   */
+  private boolean parseRecord() throws IOException {
+    final byte newLine = this.newLine;
+    final TextInput input = this.input;
+
+    input.mark();
+
+    fieldIndex = 0;
+    if (isWhite(ch) && ignoreLeadingWhitespace) {
+      skipWhitespace();
+    }
+
+    int fieldsWritten = 0;
+    try{
+      boolean earlyTerm = false;
+      while (ch != newLine) {
+        earlyTerm = !parseField();
+        fieldsWritten++;
+        if (ch != newLine) {
+          ch = input.nextChar();
+          if (ch == newLine) {
+            output.endEmptyField();
+            break;
+          }
+        }
+        if(earlyTerm){
+          if(ch != newLine){
+            input.skipLines(1);
+          }
+          break;
+        }
+      }
+    }catch(StreamFinishedPseudoException e){
+      // if we've written part of a field or all of a field, we should send this row.
+      if(fieldsWritten == 0 && !output.rowHasData()){
+        throw e;
+      }
+    }
+
+    output.finishRecord();
+    return true;
+  }
+
+  /**
+   * Function parses an individual field and ignores any white spaces encountered
+   * by not appending it to the output vector
+   * @throws IOException
+   */
+  private void parseValueIgnore() throws IOException {
+    final byte newLine = this.newLine;
+    final byte delimiter = this.delimiter;
+    final TextOutput output = this.output;
+    final TextInput input = this.input;
+
+    byte ch = this.ch;
+    while (ch != delimiter && ch != newLine) {
+      output.appendIgnoringWhitespace(ch);
+//      fieldSize++;
+      ch = input.nextChar();
+    }
+    this.ch = ch;
+  }
+
+  /**
+   * Function parses an individual field and appends all characters till the delimeter (or newline)
+   * to the output, including white spaces
+   * @throws IOException
+   */
+  private void parseValueAll() throws IOException {
+    final byte newLine = this.newLine;
+    final byte delimiter = this.delimiter;
+    final TextOutput output = this.output;
+    final TextInput input = this.input;
+
+    byte ch = this.ch;
+    while (ch != delimiter && ch != newLine) {
+      output.append(ch);
+      ch = input.nextChar();
+    }
+    this.ch = ch;
+  }
+
+  /**
+   * Function simply delegates the parsing of a single field to the actual implementation based on parsing config
+   * @throws IOException
+   */
+  private void parseValue() throws IOException {
+    if (ignoreTrailingWhitespace) {
+      parseValueIgnore();
+    }else{
+      parseValueAll();
+    }
+  }
+
+  /**
+   * Recursive function invoked when a quote is encountered. Function also
+   * handles the case when there are non-white space characters in the field
+   * after the quoted value.
+   * @param prev  previous byte read
+   * @throws IOException
+   */
+  private void parseQuotedValue(byte prev) throws IOException {
+    final byte newLine = this.newLine;
+    final byte delimiter = this.delimiter;
+    final TextOutput output = this.output;
+    final TextInput input = this.input;
+    final byte quote = this.quote;
+
+    ch = input.nextChar();
+
+    while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) {
+      if (ch != quote) {
+        if (prev == quote) { // unescaped quote detected
+          if (parseUnescapedQuotes) {
+            output.append(quote);
+            output.append(ch);
+            parseQuotedValue(ch);
+            break;
+          } else {
+            throw new TextParsingException(
+                context,
+                "Unescaped quote character '"
+                    + quote
+                    + "' inside quoted value of CSV field. To allow unescaped quotes, set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. Cannot parse CSV input.");
+          }
+        }
+        output.append(ch);
+        prev = ch;
+      } else if (prev == quoteEscape) {
+        output.append(quote);
+        prev = NULL_BYTE;
+      } else {
+        prev = ch;
+      }
+      ch = input.nextChar();
+    }
+
+    // handles whitespaces after quoted value: whitespaces are ignored. Content after whitespaces may be parsed if
+    // 'parseUnescapedQuotes' is enabled.
+    if (ch != newLine && ch <= ' ') {
+      final DrillBuf workBuf = this.workBuf;
+      workBuf.resetWriterIndex();
+      do {
+        // saves whitespaces after value
+        workBuf.writeByte(ch);
+        ch = input.nextChar();
+        // found a new line, go to next record.
+        if (ch == newLine) {
+          return;
+        }
+      } while (ch <= ' ');
+
+      // there's more stuff after the quoted value, not only empty spaces.
+      if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) {
+
+        output.append(quote);
+        for(int i =0; i < workBuf.writerIndex(); i++){
+          output.append(workBuf.getByte(i));
+        }
+        // the next character is not the escape character, put it there
+        if (ch != quoteEscape) {
+          output.append(ch);
+        }
+        // sets this character as the previous character (may be escaping)
+        // calls recursively to keep parsing potentially quoted content
+        parseQuotedValue(ch);
+      }
+    }
+
+    if (!(ch == delimiter || ch == newLine)) {
+      throw new TextParsingException(context, "Unexpected character '" + ch
+          + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input.");
+    }
+  }
+
+  /**
+   * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function
+   * @return
+   * @throws IOException
+   */
+  private final boolean parseField() throws IOException {
+
+    output.startField(fieldIndex++);
+
+    if (isWhite(ch) && ignoreLeadingWhitespace) {
+      skipWhitespace();
+    }
+
+    if (ch == delimiter) {
+      return output.endEmptyField();
+    } else {
+      if (ch == quote) {
+        parseQuotedValue(NULL_BYTE);
+      } else {
+        parseValue();
+      }
+
+      return output.endField();
+    }
+
+  }
+
+  /**
+   * Helper function to skip white spaces occurring at the current input stream.
+   * @throws IOException
+   */
+  private void skipWhitespace() throws IOException {
+    final byte delimiter = this.delimiter;
+    final byte newLine = this.newLine;
+    final TextInput input = this.input;
+
+    while (isWhite(ch) && ch != delimiter && ch != newLine) {
+      ch = input.nextChar();
+    }
+  }
+
+  /**
+   * Starting point for the reader. Sets up the input interface.
+   * @throws IOException
+   */
+  public final void start() throws IOException {
+    context.stopped = false;
+    input.start();
+  }
+
+
+  /**
+   * Parses the next record from the input. Will skip the line if its a comment,
+   * this is required when the file contains headers
+   * @throws IOException
+   */
+  public final boolean parseNext() throws IOException {
+    try {
+      while (!context.stopped) {
+        ch = input.nextChar();
+        if (ch == comment) {
+          input.skipLines(1);
+          continue;
+        }
+        break;
+      }
+      final long initialLineNumber = input.lineCount();
+      boolean success = parseRecord();
+      if (initialLineNumber + 1 < input.lineCount()) {
+        throw new TextParsingException(context, "Cannot use newline character within quoted string");
+      }
+
+      if(success){
+        if (recordsToRead > 0 && context.currentRecord() >= recordsToRead) {
+          context.stop();
+        }
+        return true;
+      }else{
+        return false;
+      }
+
+    } catch (StreamFinishedPseudoException ex) {
+      stopParsing();
+      return false;
+    } catch (Exception ex) {
+      try {
+        throw handleException(ex);
+      } finally {
+        stopParsing();
+      }
+    }
+  }
+
+  private void stopParsing(){
+
+  }
+
+  private String displayLineSeparators(String str, boolean addNewLine) {
+    if (addNewLine) {
+      if (str.contains("\r\n")) {
+        str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t");
+      } else if (str.contains("\n")) {
+        str = str.replaceAll("\\n", "[\\\\n]\n\t");
+      } else {
+        str = str.replaceAll("\\r", "[\\\\r]\r\t");
+      }
+    } else {
+      str = str.replaceAll("\\n", "\\\\n");
+      str = str.replaceAll("\\r", "\\\\r");
+    }
+    return str;
+  }
+
+  /**
+   * Helper method to handle exceptions caught while processing text files and generate better error messages associated with
+   * the exception.
+   * @param ex  Exception raised
+   * @return
+   * @throws IOException
+   */
+  private TextParsingException handleException(Exception ex) throws IOException {
+
+    if (ex instanceof TextParsingException) {
+      throw (TextParsingException) ex;
+    }
+
+    if (ex instanceof ArrayIndexOutOfBoundsException) {
+      ex = UserException
+          .dataReadError(ex)
+          .message(
+              "Drill failed to read your text file.  Drill supports up to %d columns in a text file.  Your file appears to have more than that.",
+              RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS)
+          .build();
+    }
+
+    String message = null;
+    String tmp = input.getStringSinceMarkForError();
+    char[] chars = tmp.toCharArray();
+    if (chars != null) {
+      int length = chars.length;
+      if (length > settings.getMaxCharsPerColumn()) {
+        message = "Length of parsed input (" + length
+            + ") exceeds the maximum number of characters defined in your parser settings ("
+            + settings.getMaxCharsPerColumn() + "). ";
+      }
+
+      if (tmp.contains("\n") || tmp.contains("\r")) {
+        tmp = displayLineSeparators(tmp, true);
+        String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false);
+        message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '"
+            + lineSeparator + "'. Parsed content:\n\t" + tmp;
+      }
+
+      int nullCharacterCount = 0;
+      // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError
+      int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length;
+      StringBuilder s = new StringBuilder(maxLength);
+      for (int i = 0; i < maxLength; i++) {
+        if (chars[i] == '\0') {
+          s.append('\\');
+          s.append('0');
+          nullCharacterCount++;
+        } else {
+          s.append(chars[i]);
+        }
+      }
+      tmp = s.toString();
+
+      if (nullCharacterCount > 0) {
+        message += "\nIdentified "
+            + nullCharacterCount
+            + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t"
+            + tmp;
+      }
+
+    }
+
+    throw new TextParsingException(context, message, ex);
+  }
+
+  /**
+   * Finish the processing of a batch, indicates to the output
+   * interface to wrap up the batch
+   */
+  public void finishBatch(){
+    output.finishBatch();
+//    System.out.println(String.format("line %d, cnt %d", input.getLineCount(), output.getRecordCount()));
+  }
+
+  /**
+   * Invoked once there are no more records and we are done with the
+   * current record reader to clean up state.
+   * @throws IOException
+   */
+  public void close() throws IOException{
+    input.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 43e6416..fd97c48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -82,7 +82,7 @@ public class MockRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     try {
       this.output = output;
       int estimateRowSize = getEstimatedRecordSize(config.getTypes());

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d5b7303..5e9c4ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -165,9 +165,6 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
     ScanBatch s =
         new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
 
-    for(RecordReader r  : readers){
-      r.setOperatorContext(s.getOperatorContext());
-    }
 
     return s;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2072aae..ebfbd54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -204,10 +204,11 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     if (!isStarQuery()) {
       columnsFound = new boolean[getColumns().size()];
-      nullFilledVectors = new ArrayList();
+      nullFilledVectors = new ArrayList<>();
     }
     columnStatuses = new ArrayList<>();
 //    totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 07950df..8ad0d4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -199,9 +199,10 @@ public class DrillParquetReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
 
     try {
+      this.operatorContext = context;
       schema = footer.getFileMetaData().getSchema();
       MessageType projection = null;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index b7ffbf0..cf98b83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -73,7 +73,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     try {
       Field[] fields = pojoClass.getDeclaredFields();
       List<PojoWriter> writers = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
index 04838bd..819f895 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class CompleteFileWork implements FileWork, CompleteWork{
+public class CompleteFileWork implements FileWork, CompleteWork {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteFileWork.class);
 
   private long start;
@@ -40,7 +40,22 @@ public class CompleteFileWork implements FileWork, CompleteWork{
 
   @Override
   public int compareTo(CompleteWork o) {
+    if(o instanceof CompleteFileWork){
+      CompleteFileWork c = (CompleteFileWork) o;
+      int cmp = path.compareTo(c.getPath());
+      if(cmp != 0){
+        return cmp;
+      }
+
+      cmp = Long.compare(start,  c.getStart());
+      if(cmp != 0){
+        return cmp;
+      }
+
+    }
+
     return Long.compare(getTotalBytes(), o.getTotalBytes());
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 87c78b2..0322f36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -131,7 +131,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR));
     try {
       vector = output.addField(field, RepeatedVarCharVector.class);
@@ -192,7 +192,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
         v.getMutator().setValueCount(recordCount);
       }
       vector.getMutator().setValueCount(recordCount);
-      logger.debug("text scan batch size {}", batchSize);
+//      logger.debug("text scan batch size {}", batchSize);
       return recordCount;
     } catch(Exception e) {
       cleanup();

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 83a1cb8..8fdaa72 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -626,7 +626,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
       ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs,
           new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns);
       TestOutputMutator mutator = new TestOutputMutator(allocator);
-      rr.setup(mutator);
+      rr.setup(null, mutator);
       Stopwatch watch = new Stopwatch();
       watch.start();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
new file mode 100644
index 0000000..76674f9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.text;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestNewTextReader extends BaseTestQuery {
+
+  @Test
+  public void fieldDelimiterWithinQuotes() throws Exception {
+    test("select columns[1] as col1 from cp.`textinput/input1.csv`");
+    testBuilder()
+        .sqlQuery("select columns[1] as col1 from cp.`textinput/input1.csv`")
+        .unOrdered()
+        .baselineColumns("col1")
+        .baselineValues("foo,bar")
+        .go();
+  }
+
+  @Test
+  public void ensureFailureOnNewLineDelimiterWithinQuotes() throws Exception {
+    try {
+      test("select columns[1] as col1 from cp.`textinput/input2.csv`");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Cannot use newline character within quoted string"));
+      return;
+    }
+    Assert.fail("Expected exception not thrown.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
index f07cf3b..882033a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
@@ -45,11 +45,12 @@ public class TestTextColumn extends BaseTestQuery{
       "from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.txt`");
 
     List<List<String>> expectedOutput = Arrays.asList(
-      Arrays.asList("\"a, b,\",\"c\",\"d,, \\n e\""),
-      Arrays.asList("\"d, e,\",\"f\",\"g,, \\n h\""),
-      Arrays.asList("\"g, h,\",\"i\",\"j,, \\n k\""));
+      Arrays.asList("a, b,\",\"c\",\"d,, \\n e"),
+      Arrays.asList("d, e,\",\"f\",\"g,, \\n h"),
+      Arrays.asList("g, h,\",\"i\",\"j,, \\n k"));
 
     List<List<String>> actualOutput = getOutput(batches);
+    System.out.println(actualOutput);
     validateOutput(expectedOutput, actualOutput);
   }
 
@@ -59,9 +60,9 @@ public class TestTextColumn extends BaseTestQuery{
       "columns[3] as col4 from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.csv`");
 
     List<List<String>> expectedOutput = Arrays.asList(
-      Arrays.asList("\"a, b,\"", "\"c\"", "\"d,, \\n e\"","\"f\\\"g\""),
-      Arrays.asList("\"d, e,\"", "\"f\"", "\"g,, \\n h\"","\"i\\\"j\""),
-      Arrays.asList("\"g, h,\"", "\"i\"", "\"j,, \\n k\"","\"l\\\"m\""));
+      Arrays.asList("a, b,", "c", "d,, \\n e","f\\\"g"),
+      Arrays.asList("d, e,", "f", "g,, \\n h","i\\\"j"),
+      Arrays.asList("g, h,", "i", "j,, \\n k","l\\\"m"));
 
     List<List<String>> actualOutput = getOutput(batches);
     validateOutput(expectedOutput, actualOutput);
@@ -81,7 +82,8 @@ public class TestTextColumn extends BaseTestQuery{
           output.add(new ArrayList<String>());
           for (VectorWrapper<?> vw: loader) {
             ValueVector.Accessor accessor = vw.getValueVector().getAccessor();
-            output.get(last).add(accessor.getObject(i).toString());
+            Object o = accessor.getObject(i);
+            output.get(last).add(o == null ? null: o.toString());
           }
           ++last;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index d4d81f6..4a7a53f 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -37,7 +37,8 @@
         },
         "txt" : {
           type : "text",
-          extensions: [ "txt" ]
+          extensions: [ "txt" ],
+          delimiter: "\u0000"
         },
         "avro" : {
           type: "avro"

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/store/text/data/letters.txt
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/data/letters.txt b/exec/java-exec/src/test/resources/store/text/data/letters.txt
index 14b9cb6..5421114 100644
--- a/exec/java-exec/src/test/resources/store/text/data/letters.txt
+++ b/exec/java-exec/src/test/resources/store/text/data/letters.txt
@@ -1,3 +1,3 @@
-"a, b,","c","d,, \n e"
-"d, e,","f","g,, \n h"
-"g, h,","i","j,, \n k"
\ No newline at end of file
+"a, b,"",""c"",""d,, \n e"
+"d, e,"",""f"",""g,, \n h"
+"g, h,"",""i"",""j,, \n k"

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/textinput/input1.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/textinput/input1.csv b/exec/java-exec/src/test/resources/textinput/input1.csv
new file mode 100644
index 0000000..4b70783
--- /dev/null
+++ b/exec/java-exec/src/test/resources/textinput/input1.csv
@@ -0,0 +1 @@
+1,"foo,bar"

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/textinput/input2.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/textinput/input2.csv b/exec/java-exec/src/test/resources/textinput/input2.csv
new file mode 100644
index 0000000..20c8d3c
--- /dev/null
+++ b/exec/java-exec/src/test/resources/textinput/input2.csv
@@ -0,0 +1,2 @@
+1,"a
+b"