You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/06/18 18:22:52 UTC

[drill] 03/03: DRILL-7292: Remove V1 and V2 text readers

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit f3f7dbd40f5e899f2aacba35db8f50ffedfa9d3d
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Wed Jun 12 17:31:56 2019 -0700

    DRILL-7292: Remove V1 and V2 text readers
    
    Drill 1.16 introduced the "V2" text reader based on the row set
    and provided schema mechanisms. V3 was available by system/session
    option as the functionality was considered experimental.
    
    The functionality has now undergone thorough testing. This commit makes
    the V3 text reader available by default, and removes the code for the
    original "V1" and the "new" (compliant, "V2") text reader.
    
    The system/session options that controlled reader selection are retained
    for backward compatibility, but they no longer do anything.
    
    Specific changes:
    
    * Removed the V2 "compliant" text reader.
    * Moved the "V3" to replace the "compliant" version.
    * Renamed the "complaint" package to "reader."
    * Removed the V1 text reader.
    * Moved the V1 text writer (still used with the V2 and V3 readers)
      into a new "writer" package adjacent to the reader.
    * Removed the CSV tests for the V2 reader, including those that
      demonstrated bugs in V2.
    * V2 did not properly handle the quote escape character. One or two unit
      tests depended on the broken behavior. Fixed them for the correct
      behavior.
    * Behavior of "messy quotes" (those that appear in a non-quoted field)
      was undefined for the text reader. Added a test to clearly demonstrate
      the (somewhat odd) behavior. The behavior itself was not changed.
    
    Reran all unit tests to ensure that they work with the now-default V3
    text reader.
    
    closes #1806
---
 .../java/org/apache/drill/exec/ExecConstants.java  |  19 +-
 .../impl/scan/columns/ColumnsArrayParser.java      |   2 +-
 .../exec/store/easy/text/TextFormatPlugin.java     |  57 +-
 .../text/compliant/CompliantTextRecordReader.java  | 256 -------
 .../easy/text/compliant/FieldVarCharOutput.java    | 234 ------
 .../store/easy/text/compliant/HeaderBuilder.java   | 263 -------
 .../easy/text/compliant/RepeatedVarCharOutput.java | 346 ---------
 .../exec/store/easy/text/compliant/TextInput.java  | 365 ---------
 .../exec/store/easy/text/compliant/TextOutput.java |  87 ---
 .../easy/text/compliant/TextParsingContext.java    | 122 ---
 .../easy/text/compliant/TextParsingSettings.java   | 291 -------
 .../exec/store/easy/text/compliant/TextReader.java | 511 -------------
 .../store/easy/text/compliant/package-info.java    |  27 -
 .../v3/StreamFinishedPseudoException.java          |  29 -
 .../{compliant/v3 => reader}/BaseFieldOutput.java  |   2 +-
 .../v3 => reader}/CompliantTextBatchReader.java    |   6 +-
 .../v3 => reader}/FieldVarCharOutput.java          |   2 +-
 .../{compliant/v3 => reader}/HeaderBuilder.java    |   2 +-
 .../v3 => reader}/RepeatedVarCharOutput.java       |   2 +-
 .../StreamFinishedPseudoException.java             |   9 +-
 .../text/{compliant/v3 => reader}/TextInput.java   |   6 +-
 .../text/{compliant/v3 => reader}/TextOutput.java  |   2 +-
 .../v3 => reader}/TextParsingContext.java          |   2 +-
 .../TextParsingSettings.java}                      |   7 +-
 .../text/{compliant/v3 => reader}/TextReader.java  |  27 +-
 .../{compliant/v3 => reader}/package-info.java     |   2 +-
 .../text/writer/TextRecordWriter.java}             |  14 +-
 .../exec/store/text/DrillTextRecordReader.java     | 244 ------
 .../org/apache/drill/TestSelectWithOption.java     |  13 +-
 .../drill/exec/store/TestImplicitFileColumns.java  |  16 +-
 .../store/easy/text/compliant/BaseCsvTest.java     |  10 -
 .../text/compliant}/TestCsvHeader.java             |   2 +-
 .../easy/text/compliant/TestCsvIgnoreHeaders.java  |  25 +-
 .../text/compliant/TestCsvTableProperties.java     |  67 ++
 .../easy/text/compliant/TestCsvWithHeaders.java    | 846 ++++++---------------
 .../easy/text/compliant/TestCsvWithSchema.java     | 188 +----
 .../easy/text/compliant/TestCsvWithoutHeaders.java | 344 +++------
 .../easy/text/compliant/TestHeaderBuilder.java     |   2 +-
 .../easy/text/compliant/TestPartitionRace.java     | 227 +-----
 .../text/compliant}/TestTextColumn.java            |  15 +-
 .../text/compliant/TestTextReader.java}            |  11 +-
 .../text/compliant}/TextRecordReaderTest.java      |   6 +-
 .../src/test/resources/store/text/data/letters.csv |   9 +-
 43 files changed, 517 insertions(+), 4200 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 93c9902..a7bfd96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -713,13 +713,28 @@ public final class ExecConstants {
   public static final OptionValidator ENABLE_VERBOSE_ERRORS = new BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY,
       new OptionDescription("Toggles verbose output of executable error messages"));
 
+  /**
+   * Key used in earlier versions to use the original ("V1") text reader. Since at least Drill 1.8
+   * users have used the ("compliant") ("V2") version. Deprecated in Drill 1.17; the "V3" reader
+   * with schema support is always used. Retained for backward compatibility, but does
+   * nothing.
+   */
+  @Deprecated
   public static final String ENABLE_NEW_TEXT_READER_KEY = "exec.storage.enable_new_text_reader";
+  @Deprecated
   public static final OptionValidator ENABLE_NEW_TEXT_READER = new BooleanValidator(ENABLE_NEW_TEXT_READER_KEY,
-      new OptionDescription("Enables the text reader that complies with the RFC 4180 standard for text/csv files."));
+      new OptionDescription("Deprecated. Drill's text reader complies with the RFC 4180 standard for text/csv files."));
 
+  /**
+   * Flag used in Drill 1.16 to select the row-set based ("V3") or the original
+   * "compliant" ("V2") text reader. In Drill 1.17, the "V3" version is always
+   * used. Retained for backward compatibility, but does nothing.
+   */
+  @Deprecated
   public static final String ENABLE_V3_TEXT_READER_KEY = "exec.storage.enable_v3_text_reader";
+  @Deprecated
   public static final OptionValidator ENABLE_V3_TEXT_READER = new BooleanValidator(ENABLE_V3_TEXT_READER_KEY,
-      new OptionDescription("Enables the row set based version of the text/csv reader."));
+      new OptionDescription("Deprecated. The \"V3\" text reader is always used."));
 
   public static final String MIN_READER_WIDTH_KEY = "exec.storage.min_width";
   public static final OptionValidator MIN_READER_WIDTH = new LongValidator(MIN_READER_WIDTH_KEY,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index 1737271..24a332d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -24,7 +24,7 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
 import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.store.easy.text.compliant.v3.TextReader;
+import org.apache.drill.exec.store.easy.text.reader.TextReader;
 
 /**
  * Parses the `columns` array. Doing so is surprisingly complex.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index f12fffb..89ef8f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -47,27 +47,19 @@ import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader;
-import org.apache.drill.exec.store.easy.text.compliant.TextParsingSettings;
-import org.apache.drill.exec.store.easy.text.compliant.v3.CompliantTextBatchReader;
-import org.apache.drill.exec.store.easy.text.compliant.v3.TextParsingSettingsV3;
+import org.apache.drill.exec.store.easy.text.reader.CompliantTextBatchReader;
+import org.apache.drill.exec.store.easy.text.reader.TextParsingSettings;
+import org.apache.drill.exec.store.easy.text.writer.TextRecordWriter;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
-import org.apache.drill.exec.store.text.DrillTextRecordReader;
-import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.drill.exec.vector.accessor.convert.AbstractConvertFromString;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
@@ -212,9 +204,9 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
    */
   private static class ColumnsReaderFactory extends FileReaderFactory {
 
-    private final TextParsingSettingsV3 settings;
+    private final TextParsingSettings settings;
 
-    public ColumnsReaderFactory(TextParsingSettingsV3 settings) {
+    public ColumnsReaderFactory(TextParsingSettings settings) {
       this.settings = settings;
     }
 
@@ -246,9 +238,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     config.defaultName = PLUGIN_NAME;
     config.readerOperatorType = CoreOperatorType.TEXT_SUB_SCAN_VALUE;
     config.writerOperatorType = CoreOperatorType.TEXT_WRITER_VALUE;
-
-    // Uncomment this, and remove useEnhancedScan(), when v2 is retired
-    //config.useEnhancedScan = true;
+    config.useEnhancedScan = true;
     return config;
   }
 
@@ -271,40 +261,11 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  protected boolean useEnhancedScan(OptionManager options) {
-    // Create the "legacy", "V2" reader or the new "V3" version based on
-    // the result set loader. This code should be temporary: the two
-    // readers provide identical functionality for the user; only the
-    // internals differ.
-    return options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY);
-  }
-
-  // TODO: Remove this once the V2 reader is removed.
-  @Override
-  public RecordReader getRecordReader(FragmentContext context,
-                                      DrillFileSystem dfs,
-                                      FileWork fileWork,
-                                      List<SchemaPath> columns,
-                                      String userName) {
-    Path path = dfs.makeQualified(fileWork.getPath());
-    FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
-
-    if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) {
-      TextParsingSettings settings = new TextParsingSettings();
-      settings.set(formatConfig);
-      return new CompliantTextRecordReader(split, dfs, settings, columns);
-    } else {
-      char delim = formatConfig.getFieldDelimiter();
-      return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns);
-    }
-  }
-
-  @Override
   protected FileScanBuilder frameworkBuilder(
       OptionManager options, EasySubScan scan) throws ExecutionSetupException {
     ColumnsScanBuilder builder = new ColumnsScanBuilder();
-    TextParsingSettingsV3 settings =
-        new TextParsingSettingsV3(getConfig(), scan, options);
+    TextParsingSettings settings =
+        new TextParsingSettings(getConfig(), scan, options);
     builder.setReaderFactory(new ColumnsReaderFactory(settings));
 
     // If this format has no headers, or wants to skip them,
@@ -359,7 +320,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     options.put("separator", getConfig().getFieldDelimiterAsString());
     options.put("extension", getConfig().getExtensions().get(0));
 
-    RecordWriter recordWriter = new DrillTextRecordWriter(
+    RecordWriter recordWriter = new TextRecordWriter(
         context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
     recordWriter.init(options);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
deleted file mode 100644
index 0b091f8..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.hadoop.mapred.FileSplit;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Predicate;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
-import com.univocity.parsers.common.TextParsingException;
-
-import io.netty.buffer.DrillBuf;
-
-// New text reader, complies with the RFC 4180 standard for text/csv files
-public class CompliantTextRecordReader extends AbstractRecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextRecordReader.class);
-
-  private static final int MAX_RECORDS_PER_BATCH = 8096;
-  private static final int READ_BUFFER = 1024*1024;
-  private static final int WHITE_SPACE_BUFFER = 64*1024;
-  // When no named column is required, ask SCAN to return a DEFAULT column.
-  // If such column does not exist, it will be returned as a nullable-int column.
-  private static final List<SchemaPath> DEFAULT_NAMED_TEXT_COLS_TO_READ =
-      ImmutableList.of(SchemaPath.getSimplePath("_DEFAULT_COL_TO_READ_"));
-
-  // settings to be used while parsing
-  private TextParsingSettings settings;
-  // Chunk of the file to be read by this reader
-  private FileSplit split;
-  // text reader implementation
-  private TextReader reader;
-  // input buffer
-  private DrillBuf readBuffer;
-  // working buffer to handle whitespace
-  private DrillBuf whitespaceBuffer;
-  private DrillFileSystem dfs;
-  // operator context for OutputMutator
-  private OperatorContext oContext;
-
-  public CompliantTextRecordReader(FileSplit split, DrillFileSystem dfs, TextParsingSettings settings, List<SchemaPath> columns) {
-    this.split = split;
-    this.settings = settings;
-    this.dfs = dfs;
-    setColumns(columns);
-  }
-
-  // checks to see if we are querying all columns(star) or individual columns
-  @Override
-  public boolean isStarQuery() {
-    if(settings.isUseRepeatedVarChar()) {
-      return super.isStarQuery() || Iterables.tryFind(getColumns(), new Predicate<SchemaPath>() {
-        @Override
-        public boolean apply(@Nullable SchemaPath path) {
-          return path.equals(RepeatedVarCharOutput.COLUMNS);
-        }
-      }).isPresent();
-    }
-    return super.isStarQuery();
-  }
-
-  /**
-   * Returns list of default columns to read to replace empty list of columns.
-   * For text files without headers returns "columns[0]".
-   * Text files with headers do not support columns syntax,
-   * so when header extraction is enabled, returns fake named column "_DEFAULT_COL_TO_READ_".
-   *
-   * @return list of default columns to read
-   */
-  @Override
-  protected List<SchemaPath> getDefaultColumnsToRead() {
-    if (settings.isHeaderExtractionEnabled()) {
-      return DEFAULT_NAMED_TEXT_COLS_TO_READ;
-    }
-    return DEFAULT_TEXT_COLS_TO_READ;
-  }
-
-  /**
-   * Performs the initial setup required for the record reader.
-   * Initializes the input stream, handling of the output record batch
-   * and the actual reader to be used.
-   * @param context  operator context from which buffer's will be allocated and managed
-   * @param outputMutator  Used to create the schema in the output record batch
-   * @throws ExecutionSetupException
-   */
-  @Override
-  public void setup(OperatorContext context, OutputMutator outputMutator) throws ExecutionSetupException {
-
-    oContext = context;
-    // Note: DO NOT use managed buffers here. They remain in existence
-    // until the fragment is shut down. The buffers here are large.
-    // If we scan 1000 files, and allocate 1 MB for each, we end up
-    // holding onto 1 GB of memory in managed buffers.
-    // Instead, we allocate the buffers explicitly, and must free
-    // them.
-//    readBuffer = context.getManagedBuffer(READ_BUFFER);
-//    whitespaceBuffer = context.getManagedBuffer(WHITE_SPACE_BUFFER);
-    readBuffer = context.getAllocator().buffer(READ_BUFFER);
-    whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
-
-    // setup Output, Input, and Reader
-    try {
-      TextOutput output = null;
-      TextInput input = null;
-      InputStream stream = null;
-
-      // setup Output using OutputMutator
-      if (settings.isHeaderExtractionEnabled()){
-        //extract header and use that to setup a set of VarCharVectors
-        String [] fieldNames = extractHeader();
-        output = new FieldVarCharOutput(outputMutator, fieldNames, getColumns(), isStarQuery());
-      } else {
-        //simply use RepeatedVarCharVector
-        output = new RepeatedVarCharOutput(outputMutator, getColumns(), isStarQuery());
-      }
-
-      // setup Input using InputStream
-      logger.trace("Opening file {}", split.getPath());
-      stream = dfs.openPossiblyCompressedStream(split.getPath());
-      input = new TextInput(settings, stream, readBuffer, split.getStart(), split.getStart() + split.getLength());
-
-      // setup Reader using Input and Output
-      reader = new TextReader(settings, input, output, whitespaceBuffer);
-      reader.start();
-
-    } catch (SchemaChangeException | IOException e) {
-      throw new ExecutionSetupException(String.format("Failure while setting up text reader for file %s", split.getPath()), e);
-    } catch (IllegalArgumentException e) {
-      throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger);
-    }
-  }
-
-  /**
-   * This method is responsible to implement logic for extracting header from text file
-   * Currently it is assumed to be first line if headerExtractionEnabled is set to true
-   * TODO: enhance to support more common header patterns
-   * @return field name strings
-   */
-  private String [] extractHeader() throws SchemaChangeException, IOException, ExecutionSetupException{
-    assert (settings.isHeaderExtractionEnabled());
-    assert (oContext != null);
-
-    // don't skip header in case skipFirstLine is set true
-    settings.setSkipFirstLine(false);
-
-    HeaderBuilder hOutput = new HeaderBuilder();
-
-    // setup Input using InputStream
-    // we should read file header irrespective of split given given to this reader
-    InputStream hStream = dfs.openPossiblyCompressedStream(split.getPath());
-    TextInput hInput = new TextInput(settings,  hStream, oContext.getManagedBuffer(READ_BUFFER), 0, split.getLength());
-
-    // setup Reader using Input and Output
-    this.reader = new TextReader(settings, hInput, hOutput, oContext.getManagedBuffer(WHITE_SPACE_BUFFER));
-    reader.start();
-
-    // extract first row only
-    reader.parseNext();
-
-    // grab the field names from output
-    String [] fieldNames = hOutput.getHeaders();
-
-    // cleanup and set to skip the first line next time we read input
-    reader.close();
-    settings.setSkipFirstLine(true);
-
-    return fieldNames;
-  }
-
-  /**
-   * Generates the next record batch
-   * @return  number of records in the batch
-   *
-   */
-  @Override
-  public int next() {
-    reader.resetForNextBatch();
-    int cnt = 0;
-
-    try{
-      while(cnt < MAX_RECORDS_PER_BATCH && reader.parseNext()){
-        cnt++;
-      }
-      reader.finishBatch();
-      return cnt;
-    } catch (IOException | TextParsingException e) {
-      throw UserException.dataReadError(e)
-          .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
-              split.getPath(), reader.getPos())
-          .build(logger);
-    }
-  }
-
-  /**
-   * Cleanup state once we are finished processing all the records.
-   * This would internally close the input stream we are reading from.
-   */
-  @Override
-  public void close() {
-
-    // Release the buffers allocated above. Double-check to handle
-    // unexpected multiple calls to close().
-
-    if (readBuffer != null) {
-      readBuffer.release();
-      readBuffer = null;
-    }
-    if (whitespaceBuffer != null) {
-      whitespaceBuffer.release();
-      whitespaceBuffer = null;
-    }
-    try {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    } catch (IOException e) {
-      logger.warn("Exception while closing stream.", e);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "CompliantTextRecordReader[File=" + split.getPath()
-        + ", reader=" + reader
-        + "]";
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
deleted file mode 100644
index 9734c88..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.map.CaseInsensitiveMap;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.vector.VarCharVector;
-
-/**
- * Class is responsible for generating record batches for text file inputs. We generate
- * a record batch with a set of varchar vectors. A varchar vector contains all the field
- * values for a given column. Each record is a single value within each vector of the set.
- */
-class FieldVarCharOutput extends TextOutput {
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FieldVarCharOutput.class);
-  static final String COL_NAME = "columns";
-
-  // array of output vector
-  private final VarCharVector [] vectors;
-  // boolean array indicating which fields are selected (if star query entire array is set to true)
-  private final boolean[] selectedFields;
-  // current vector to which field will be added
-  private VarCharVector currentVector;
-  // track which field is getting appended
-  private int currentFieldIndex = -1;
-  // track chars within field
-  private int currentDataPointer = 0;
-  // track if field is still getting appended
-  private boolean fieldOpen = true;
-  // holds chars for a field
-  private byte[] fieldBytes;
-
-  private boolean collect = true;
-  private boolean rowHasData= false;
-  private static final int MAX_FIELD_LENGTH = 1024 * 64;
-  private int recordCount = 0;
-  private int maxField = 0;
-  private int[] nullCols;
-  private byte nullValue[] = new byte[0];
-
-  /**
-   * We initialize and add the varchar vector for each incoming field in this
-   * constructor.
-   * @param outputMutator  Used to create/modify schema
-   * @param fieldNames Incoming field names
-   * @param columns  List of columns selected in the query
-   * @param isStarQuery  boolean to indicate if all fields are selected or not
-   * @throws SchemaChangeException
-   */
-  public FieldVarCharOutput(OutputMutator outputMutator, String [] fieldNames, Collection<SchemaPath> columns, boolean isStarQuery) throws SchemaChangeException {
-
-    int totalFields = fieldNames.length;
-    List<String> outputColumns = new ArrayList<>(Arrays.asList(fieldNames));
-    List<Integer> nullColumns = new ArrayList<>();
-
-    if (isStarQuery) {
-      maxField = totalFields - 1;
-      this.selectedFields = new boolean[totalFields];
-      Arrays.fill(selectedFields, true);
-    } else {
-      List<Integer> columnIds = new ArrayList<Integer>();
-      Map<String, Integer> headers = CaseInsensitiveMap.newHashMap();
-      for (int i = 0; i < fieldNames.length; i++) {
-        headers.put(fieldNames[i], i);
-      }
-
-      for (SchemaPath path : columns) {
-        int index;
-        String pathStr = path.getRootSegment().getPath();
-        if (pathStr.equals(COL_NAME) && path.getRootSegment().getChild() != null) {
-          //TODO: support both field names and columns index along with predicate pushdown
-          throw UserException
-              .unsupportedError()
-              .message("With extractHeader enabled, only header names are supported")
-              .addContext("column name", pathStr)
-              .addContext("column index", path.getRootSegment().getChild())
-              .build(logger);
-        } else {
-          Integer value = headers.get(pathStr);
-          if (value == null) {
-            // found col that is not a part of fieldNames, add it
-            // this col might be part of some another scanner
-            index = totalFields++;
-            outputColumns.add(pathStr);
-            nullColumns.add(index);
-          } else {
-            index = value;
-          }
-        }
-        columnIds.add(index);
-      }
-      Collections.sort(columnIds);
-
-      this.selectedFields = new boolean[totalFields];
-      for(Integer i : columnIds) {
-        selectedFields[i] = true;
-        maxField = i;
-      }
-    }
-
-    this.vectors = new VarCharVector[totalFields];
-
-    for (int i = 0; i <= maxField; i++) {
-      if (selectedFields[i]) {
-        MaterializedField field = MaterializedField.create(outputColumns.get(i), Types.required(TypeProtos.MinorType.VARCHAR));
-        this.vectors[i] = outputMutator.addField(field, VarCharVector.class);
-      }
-    }
-
-    this.fieldBytes = new byte[MAX_FIELD_LENGTH];
-
-    // Keep track of the null columns to be filled in.
-
-    nullCols = new int[nullColumns.size()];
-    for (int i = 0; i < nullCols.length; i++) {
-      nullCols[i] = nullColumns.get(i);
-    }
-  }
-
-  /**
-   * Start a new record batch. Resets all pointers
-   */
-  @Override
-  public void startBatch() {
-    recordCount = 0;
-    currentFieldIndex= -1;
-    collect = true;
-    fieldOpen = false;
-  }
-
-  @Override
-  public void startField(int index) {
-    currentFieldIndex = index;
-    currentDataPointer = 0;
-    fieldOpen = true;
-    collect = selectedFields[index];
-    currentVector = vectors[index];
-  }
-
-  @Override
-  public void append(byte data) {
-    if (!collect) {
-      return;
-    }
-
-    if (currentDataPointer >= MAX_FIELD_LENGTH -1) {
-      throw UserException
-          .unsupportedError()
-          .message("Trying to write something big in a column")
-          .addContext("columnIndex", currentFieldIndex)
-          .addContext("Limit", MAX_FIELD_LENGTH)
-          .build(logger);
-    }
-
-    fieldBytes[currentDataPointer++] = data;
-  }
-
-  @Override
-  public boolean endField() {
-    fieldOpen = false;
-
-    if (collect) {
-      assert currentVector != null;
-      currentVector.getMutator().setSafe(recordCount, fieldBytes, 0, currentDataPointer);
-    }
-
-    if (currentDataPointer > 0) {
-      this.rowHasData = true;
-    }
-
-    return currentFieldIndex < maxField;
-  }
-
-  @Override
-  public boolean endEmptyField() {
-    return endField();
-  }
-
- @Override
-  public void finishRecord() {
-    if (fieldOpen){
-      endField();
-    }
-
-    // Fill in null (really empty) values.
-
-    for (int i = 0; i < nullCols.length; i++) {
-      vectors[nullCols[i]].getMutator().setSafe(recordCount, nullValue, 0, 0);
-    }
-    recordCount++;
-  }
-
-  @Override
-  public void finishBatch() { }
-
-  @Override
-  public long getRecordCount() {
-    return recordCount;
-  }
-
-  @Override
-  public boolean rowHasData() {
-    return this.rowHasData;
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
deleted file mode 100644
index 6dfc528..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/HeaderBuilder.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.drill.common.exceptions.UserException;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-
-/**
- * Text output that implements a header reader/parser.
- * The caller parses out the characters of each header;
- * this class assembles UTF-8 bytes into Unicode characters,
- * fixes invalid characters (those not legal for SQL symbols),
- * and maps duplicate names to unique names.
- * <p>
- * That is, this class is as permissive as possible with file
- * headers to avoid spurious query failures for trivial reasons.
- */
-
-// Note: this class uses Java heap strings and the usual Java
-// convenience classes. Since we do heavy Unicode string operations,
-// and read a single row, there is no good reason to try to use
-// value vectors and direct memory for this task.
-
-public class HeaderBuilder extends TextOutput {
-
-  /**
-   * Maximum Drill symbol length, as enforced for headers.
-   * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
-   * identifier documentation</a>
-   */
-  // TODO: Replace with the proper constant, if available
-  public static final int MAX_HEADER_LEN = 1024;
-
-  /**
-   * Prefix used to replace non-alphabetic characters at the start of
-   * a column name. For example, $foo becomes col_foo. Used
-   * because SQL does not allow _foo.
-   */
-
-  public static final String COLUMN_PREFIX = "col_";
-
-  /**
-   * Prefix used to create numbered columns for missing
-   * headers. Typical names: column_1, column_2, ...
-   */
-
-  public static final String ANONYMOUS_COLUMN_PREFIX = "column_";
-
-  public final List<String> headers = new ArrayList<>();
-  public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
-
-  @Override
-  public void startField(int index) {
-    currentField.clear();
-  }
-
-  @Override
-  public boolean endField() {
-    String header = new String(currentField.array(), 0, currentField.position(), Charsets.UTF_8);
-    header = validateSymbol(header);
-    headers.add(header);
-    return true;
-  }
-
-  @Override
-  public boolean endEmptyField() {
-
-    // Empty header will be rewritten to "column_<n>".
-
-    return endField();
-  }
-
-  /**
-   * Validate the header name according to the SQL lexical rules.
-   * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
-   * identifier documentation</a>
-   * @param header the header name to validate
-   */
-
-  // TODO: Replace with existing code, if any.
-  private String validateSymbol(String header) {
-    header = header.trim();
-
-    // To avoid unnecessary query failures, just make up a column name
-    // if the name is missing or all blanks.
-
-    if (header.isEmpty()) {
-      return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
-    }
-    if (! Character.isAlphabetic(header.charAt(0))) {
-      return rewriteHeader(header);
-    }
-    for (int i = 1; i < header.length(); i++) {
-      char ch = header.charAt(i);
-      if (! Character.isAlphabetic(ch)  &&
-          ! Character.isDigit(ch)  &&  ch != '_') {
-        return rewriteHeader(header);
-      }
-    }
-    return header;
-  }
-
-  /**
-   * Given an invalid header, rewrite it to replace illegal characters
-   * with valid ones. The header won't be what the user specified,
-   * but it will be a valid SQL identifier. This solution avoids failing
-   * queries due to corrupted or invalid header data.
-   * <p>
-   * Names with invalid first characters are mapped to "col_". Example:
-   * $foo maps to col_foo. If the only character is non-alphabetic, treat
-   * the column as anonymous and create a generic name: column_4, etc.
-   * <p>
-   * This mapping could create a column that exceeds the maximum length
-   * of 1024. Since that is not really a hard limit, we just live with the
-   * extra few characters.
-   *
-   * @param header the original header
-   * @return the rewritten header, valid for SQL
-   */
-
-  private String rewriteHeader(String header) {
-    final StringBuilder buf = new StringBuilder();
-
-    // If starts with non-alphabetic, can't map the character to
-    // underscore, so just tack on a prefix.
-
-    char ch = header.charAt(0);
-    if (Character.isAlphabetic(ch)) {
-      buf.append(ch);
-    } else if (Character.isDigit(ch)) {
-      buf.append(COLUMN_PREFIX);
-      buf.append(ch);
-
-      // For the strange case of only one character, format
-      // the same as an empty header.
-
-    } else if (header.length() == 1) {
-      return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
-    } else {
-      buf.append(COLUMN_PREFIX);
-    }
-
-    // Convert all remaining invalid characters to underscores
-
-    for (int i = 1; i < header.length(); i++) {
-      ch = header.charAt(i);
-      if (Character.isAlphabetic(ch)  ||
-          Character.isDigit(ch)  ||  ch == '_') {
-        buf.append(ch);
-      } else {
-        buf.append("_");
-      }
-    }
-    return buf.toString();
-  }
-
-  @Override
-  public void append(byte data) {
-
-    // Ensure the data fits. Note that, if the name is Unicode, the actual
-    // number of characters might be less than the limit even though the
-    // byte count exceeds the limit. Fixing this, in general, would require
-    // a buffer four times larger, so we leave that as a later improvement
-    // if ever needed.
-
-    try {
-      currentField.put(data);
-    } catch (BufferOverflowException e) {
-      throw UserException.dataReadError()
-        .message("Column exceeds maximum length of %d", MAX_HEADER_LEN)
-        .build(logger);
-    }
-  }
-
-  @Override
-  public void finishRecord() {
-    if (headers.isEmpty()) {
-      throw UserException.dataReadError()
-        .message("The file must define at least one header.")
-        .build(logger);
-    }
-
-    // Force headers to be unique.
-
-    final Set<String> idents = new HashSet<String>();
-    for (int i = 0; i < headers.size(); i++) {
-      String header = headers.get(i);
-      String key = header.toLowerCase();
-
-      // Is the header a duplicate?
-
-      if (idents.contains(key)) {
-
-        // Make header unique by appending a suffix.
-        // This loop must end because we have a finite
-        // number of headers.
-        // The original column is assumed to be "1", so
-        // the first duplicate is "2", and so on.
-        // Note that this will map columns of the form:
-        // "col,col,col_2,col_2_2" to
-        // "col", "col_2", "col_2_2", "col_2_2_2".
-        // No mapping scheme is perfect...
-
-        for (int l = 2;; l++) {
-          final String rewritten = header + "_" + l;
-          key = rewritten.toLowerCase();
-          if (! idents.contains(key)) {
-            headers.set(i, rewritten);
-            break;
-          }
-        }
-      }
-      idents.add(key);
-    }
-  }
-
-  @Override
-  public long getRecordCount() { return 1; }
-
-  @Override
-  public void startBatch() { }
-
-  @Override
-  public void finishBatch() { }
-
-  @Override
-  public boolean rowHasData() {
-    return ! headers.isEmpty();
-  }
-
-  public String[] getHeaders() {
-
-    // Just return the headers: any needed checks were done in
-    // finishRecord()
-
-    final String array[] = new String[headers.size()];
-    return headers.toArray(array);
-  }
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
deleted file mode 100644
index c8a681e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-import io.netty.buffer.DrillBuf;
-import io.netty.util.internal.PlatformDependent;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.vector.RepeatedVarCharVector;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-
-/**
- * Class is responsible for generating record batches for text file inputs. We generate
- * a record batch with a single vector of type repeated varchar vector. Each record is a single
- * value within the vector containing all the fields in the record as individual array elements.
- */
-public class RepeatedVarCharOutput extends TextOutput {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedVarCharOutput.class);
-
-  static final String COL_NAME = "columns";
-  static final SchemaPath COLUMNS = SchemaPath.getSimplePath("columns");
-  public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
-
-  // output vector
-  private final RepeatedVarCharVector vector;
-
-  // mutator for the output vector
-  private final RepeatedVarCharVector.Mutator mutator;
-
-  // boolean array indicating which fields are selected (if star query entire array is set to true)
-  private final boolean[] collectedFields;
-
-  // pointer to keep track of the offsets per record
-  private long repeatedOffset;
-
-  // pointer to keep track of the original offsets per record
-  private long repeatedOffsetOriginal;
-
-  // pointer to end of the offset buffer
-  private long repeatedOffsetMax;
-
-  // pointer to the start of the actual data buffer
-  private long characterDataOriginal;
-
-  // pointer to the current location of the data buffer
-  private long characterData;
-
-  // pointer to the end of the data buffer
-  private long characterDataMax;
-
-  // current pointer into the buffer that keeps track of the length of individual fields
-  private long charLengthOffset;
-
-  // pointer to the start of the length buffer
-  private long charLengthOffsetOriginal;
-
-  // pointer to the end of length buffer
-  private long charLengthOffsetMax;
-
-  // pointer to the beginning of the record
-  private long recordStart;
-
-  // total number of records processed (across batches)
-  private long recordCount;
-
-  // number of records processed in this current batch
-  private int batchIndex;
-
-  // current index of the field being processed within the record
-  private int fieldIndex = -1;
-
-  /* boolean to indicate if we are currently appending data to the output vector
-   * Its set to false when we have hit out of memory or we are not interested in
-   * the particular field
-   */
-  private boolean collect;
-
-  // are we currently appending to a field
-  private boolean fieldOpen;
-
-  // maximum number of fields/columns
-  private final int maxField;
-
-  /**
-   * We initialize and add the repeated varchar vector to the record batch in this
-   * constructor. Perform some sanity checks if the selected columns are valid or not.
-   * @param outputMutator  Used to create/modify schema in the record batch
-   * @param columns  List of columns selected in the query
-   * @param isStarQuery  boolean to indicate if all fields are selected or not
-   * @throws SchemaChangeException
-   */
-  public RepeatedVarCharOutput(OutputMutator outputMutator, Collection<SchemaPath> columns, boolean isStarQuery) throws SchemaChangeException {
-    super();
-
-    MaterializedField field = MaterializedField.create(COL_NAME, Types.repeated(TypeProtos.MinorType.VARCHAR));
-    this.vector = outputMutator.addField(field, RepeatedVarCharVector.class);
-
-    this.mutator = vector.getMutator();
-
-
-    { // setup fields
-      List<Integer> columnIds = new ArrayList<>();
-      if (!isStarQuery) {
-        String pathStr;
-        for (SchemaPath path : columns) {
-          assert path.getRootSegment().isNamed() : "root segment should be named";
-          pathStr = path.getRootSegment().getPath();
-          Preconditions.checkArgument(COL_NAME.equalsIgnoreCase(pathStr) || (SchemaPath.DYNAMIC_STAR.equals(pathStr) && path.getRootSegment().getChild() == null),
-              String.format("Selected column '%s' must have name 'columns' or must be plain '*'", pathStr));
-
-          if (path.getRootSegment().getChild() != null) {
-            Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),
-              String.format("Selected column '%s' must be an array index", pathStr));
-            int index = path.getRootSegment().getChild().getArraySegment().getIndex();
-            columnIds.add(index);
-          }
-        }
-        Collections.sort(columnIds);
-
-      }
-
-      boolean[] fields = new boolean[MAXIMUM_NUMBER_COLUMNS];
-
-      int maxField = fields.length;
-
-      if(isStarQuery){
-        Arrays.fill(fields, true);
-      }else{
-        for(Integer i : columnIds){
-          maxField = 0;
-          maxField = Math.max(maxField, i);
-          fields[i] = true;
-        }
-      }
-      this.collectedFields = fields;
-      this.maxField = maxField;
-    }
-
-
-  }
-
-  /**
-   * Start a new record batch. Resets all the offsets and pointers that
-   * store buffer addresses
-   */
-  @Override
-  public void startBatch() {
-    this.recordStart = characterDataOriginal;
-    this.fieldOpen = false;
-    this.batchIndex = 0;
-    this.fieldIndex = -1;
-    this.collect = true;
-
-    loadRepeatedOffsetAddress();
-    loadVarCharOffsetAddress();
-    loadVarCharDataAddress();
-  }
-
-  private void loadRepeatedOffsetAddress(){
-    DrillBuf buf = vector.getOffsetVector().getBuffer();
-    checkBuf(buf);
-    this.repeatedOffset = buf.memoryAddress() + 4;
-    this.repeatedOffsetOriginal = buf.memoryAddress() + 4;
-    this.repeatedOffsetMax = buf.memoryAddress() + buf.capacity();
-  }
-
-  private void loadVarCharDataAddress(){
-    DrillBuf buf = vector.getDataVector().getBuffer();
-    checkBuf(buf);
-    this.characterData = buf.memoryAddress();
-    this.characterDataOriginal = buf.memoryAddress();
-    this.characterDataMax = buf.memoryAddress() + buf.capacity();
-  }
-
-  private void loadVarCharOffsetAddress(){
-    DrillBuf buf = vector.getDataVector().getOffsetVector().getBuffer();
-    checkBuf(buf);
-    this.charLengthOffset = buf.memoryAddress() + 4;
-    this.charLengthOffsetOriginal = buf.memoryAddress() + 4; // add four as offsets conceptually start at 1. (first item is 0..1)
-    this.charLengthOffsetMax = buf.memoryAddress() + buf.capacity();
-  }
-
-  private void expandVarCharOffsets(){
-    vector.getDataVector().getOffsetVector().reAlloc();
-    long diff = charLengthOffset - charLengthOffsetOriginal;
-    loadVarCharOffsetAddress();
-    charLengthOffset += diff;
-  }
-
-  private void expandVarCharData(){
-    vector.getDataVector().reAlloc();
-    long diff = characterData - characterDataOriginal;
-    loadVarCharDataAddress();
-    characterData += diff;
-  }
-
-  private void expandRepeatedOffsets(){
-    vector.getOffsetVector().reAlloc();
-    long diff = repeatedOffset - repeatedOffsetOriginal;
-    loadRepeatedOffsetAddress();
-    repeatedOffset += diff;
-  }
-
-  /**
-   * Helper method to check if the buffer we are accessing
-   * has a minimum reference count and has not been deallocated
-   * @param b  working drill buffer
-   */
-  private void checkBuf(DrillBuf b){
-    if(b.refCnt() < 1){
-      throw new IllegalStateException("Cannot access a dereferenced buffer.");
-    }
-  }
-
-  @Override
-  public void startField(int index) {
-    fieldIndex = index;
-    collect = collectedFields[index];
-    fieldOpen = true;
-  }
-
-  @Override
-  public boolean endField() {
-    fieldOpen = false;
-
-    if(charLengthOffset >= charLengthOffsetMax){
-      expandVarCharOffsets();
-    }
-
-    int newOffset = (int) (characterData - characterDataOriginal);
-    PlatformDependent.putInt(charLengthOffset, newOffset);
-    charLengthOffset += 4;
-    return fieldIndex < maxField;
-  }
-
-  @Override
-  public boolean endEmptyField() {
-    return endField();
-  }
-
-  @Override
-  public void append(byte data) {
-    if(!collect){
-      return;
-    }
-
-    if(characterData >= characterDataMax){
-      expandVarCharData();
-    }
-
-    PlatformDependent.putByte(characterData, data);
-    characterData++;
-
-  }
-
-  @Override
-  public long getRecordCount() {
-    return recordCount;
-  }
-
-  @Override
-  public boolean rowHasData() {
-    return this.recordStart < characterData;
-  }
-
-  @Override
-  public void finishRecord() {
-    this.recordStart = characterData;
-
-    if(fieldOpen){
-      endField();
-    }
-
-    if(repeatedOffset >= repeatedOffsetMax){
-      expandRepeatedOffsets();
-    }
-
-    int newOffset = ((int) (charLengthOffset - charLengthOffsetOriginal))/4;
-    PlatformDependent.putInt(repeatedOffset, newOffset);
-    repeatedOffset += 4;
-
-    // if there were no defined fields, skip.
-    if(fieldIndex > -1){
-      batchIndex++;
-      recordCount++;
-    }
-
-
-  }
-
-  /**
-   * This method is a helper method added for DRILL-951
-   * TextRecordReader to call this method to get field names out
-   * @return array of field data strings
-   */
-  public String [] getTextOutput () throws ExecutionSetupException {
-    if (recordCount == 0 || fieldIndex == -1) {
-      return null;
-    }
-
-    if (this.recordStart != characterData) {
-      throw new ExecutionSetupException("record text was requested before finishing record");
-    }
-
-    //Currently only first line header is supported. Return only first record.
-    int retSize = fieldIndex+1;
-    String [] out = new String [retSize];
-
-    RepeatedVarCharVector.Accessor a = this.vector.getAccessor();
-    for (int i=0; i<retSize; i++){
-      out[i] = a.getSingleObject(0,i).toString();
-    }
-    return out;
-  }
-
-  @Override
-  public void finishBatch() { }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
deleted file mode 100644
index 6cf554d..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-import io.netty.buffer.DrillBuf;
-import io.netty.util.internal.PlatformDependent;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-
-import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
-
-/**
- * Class that fronts an InputStream to provide a byte consumption interface.
- * Also manages only reading lines to and from each split.
- */
-final class TextInput {
-
-  private final byte[] lineSeparator;
-  private final byte normalizedLineSeparator;
-  private final TextParsingSettings settings;
-
-  private long lineCount;
-  private long charCount;
-
-  /**
-   * The starting position in the file.
-   */
-  private final long startPos;
-  private final long endPos;
-
-  private int bufferMark;
-  private long streamMark;
-
-  private long streamPos;
-
-  private final Seekable seekable;
-  private final FSDataInputStream inputFS;
-  private final InputStream input;
-
-  private final DrillBuf buffer;
-  private final ByteBuffer underlyingBuffer;
-  private final long bStart;
-  private final long bStartMinus1;
-
-  private final boolean bufferReadable;
-
-  /**
-   * Whether there was a possible partial line separator on the previous
-   * read so we dropped it and it should be appended to next read.
-   */
-  private int remByte = -1;
-
-  /**
-   * The current position in the buffer.
-   */
-  public int bufferPtr;
-
-  /**
-   * The quantity of valid data in the buffer.
-   */
-  public int length = -1;
-
-  private boolean endFound = false;
-
-  /**
-   * Creates a new instance with the mandatory characters for handling newlines transparently.
-   * lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
-   * normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
-   */
-  public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
-    this.lineSeparator = settings.getNewLineDelimiter();
-    byte normalizedLineSeparator = settings.getNormalizedNewLine();
-    Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
-    boolean isCompressed = input instanceof CompressionInputStream;
-    Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
-
-    // splits aren't allowed with compressed data.  The split length will be the compressed size which means we'll normally end prematurely.
-    if(isCompressed && endPos > 0){
-      endPos = Long.MAX_VALUE;
-    }
-
-    this.input = input;
-    this.seekable = (Seekable) input;
-    this.settings = settings;
-
-    if(input instanceof FSDataInputStream){
-      this.inputFS = (FSDataInputStream) input;
-      this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
-    }else{
-      this.inputFS = null;
-      this.bufferReadable = false;
-    }
-
-    this.startPos = startPos;
-    this.endPos = endPos;
-
-    this.normalizedLineSeparator = normalizedLineSeparator;
-
-    this.buffer = readBuffer;
-    this.bStart = buffer.memoryAddress();
-    this.bStartMinus1 = bStart -1;
-    this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
-  }
-
-  /**
-   * Test the input to position for read start.  If the input is a non-zero split or
-   * splitFirstLine is enabled, input will move to appropriate complete line.
-   * @throws IOException
-   */
-  final void start() throws IOException {
-    lineCount = 0;
-    if(startPos > 0){
-      seekable.seek(startPos);
-    }
-
-    updateBuffer();
-    if (length > 0) {
-      if(startPos > 0 || settings.isSkipFirstLine()){
-
-        // move to next full record.
-        skipLines(1);
-      }
-    }
-  }
-
-
-  /**
-   * Helper method to get the most recent characters consumed since the last record started.
-   * May get an incomplete string since we don't support stream rewind.  Returns empty string for now.
-   * @return String of last few bytes.
-   * @throws IOException
-   */
-  public String getStringSinceMarkForError() throws IOException {
-    return " ";
-  }
-
-  long getPos(){
-    return streamPos + bufferPtr;
-  }
-
-  public void mark(){
-    streamMark = streamPos;
-    bufferMark = bufferPtr;
-  }
-
-  /**
-   * read some more bytes from the stream.  Uses the zero copy interface if available.  Otherwise, does byte copy.
-   * @throws IOException
-   */
-  private void read() throws IOException {
-    if(bufferReadable){
-
-      if(remByte != -1){
-        for (int i = 0; i <= remByte; i++) {
-          underlyingBuffer.put(lineSeparator[i]);
-        }
-        remByte = -1;
-      }
-      length = inputFS.read(underlyingBuffer);
-
-    }else{
-
-      byte[] b = new byte[underlyingBuffer.capacity()];
-      if(remByte != -1){
-        int remBytesNum = remByte + 1;
-        System.arraycopy(lineSeparator, 0, b, 0, remBytesNum);
-        length = input.read(b, remBytesNum, b.length - remBytesNum);
-        remByte = -1;
-      }else{
-        length = input.read(b);
-      }
-      underlyingBuffer.put(b);
-    }
-  }
-
-
-  /**
-   * Read more data into the buffer.  Will also manage split end conditions.
-   * @throws IOException
-   */
-  private void updateBuffer() throws IOException {
-    streamPos = seekable.getPos();
-    underlyingBuffer.clear();
-
-    if(endFound){
-      length = -1;
-      return;
-    }
-
-    read();
-
-    // check our data read allowance.
-    if(streamPos + length >= this.endPos){
-      updateLengthBasedOnConstraint();
-    }
-
-    charCount += bufferPtr;
-    bufferPtr = 1;
-
-    buffer.writerIndex(underlyingBuffer.limit());
-    buffer.readerIndex(underlyingBuffer.position());
-
-  }
-
-  /**
-   * Checks to see if we can go over the end of our bytes constraint on the data.  If so,
-   * adjusts so that we can only read to the last character of the first line that crosses
-   * the split boundary.
-   */
-  private void updateLengthBasedOnConstraint() {
-    final long max = bStart + length;
-    for(long m = bStart + (endPos - streamPos); m < max; m++) {
-      for (int i = 0; i < lineSeparator.length; i++) {
-        long mPlus = m + i;
-        if (mPlus < max) {
-          // we found a line separator and don't need to consult the next byte.
-          if (lineSeparator[i] == PlatformDependent.getByte(mPlus) && i == lineSeparator.length - 1) {
-            length = (int) (mPlus - bStart) + 1;
-            endFound = true;
-            return;
-          }
-        } else {
-          // the last N characters of the read were remnant bytes. We'll hold off on dealing with these bytes until the next read.
-          remByte = i;
-          length = length - i;
-          return;
-        }
-      }
-    }
-  }
-
-  /**
-   * Get next byte from stream.  Also maintains the current line count.  Will throw a StreamFinishedPseudoException
-   * when the stream has run out of bytes.
-   * @return next byte from stream.
-   * @throws IOException
-   */
-  public final byte nextChar() throws IOException {
-    byte byteChar = nextCharNoNewLineCheck();
-    int bufferPtrTemp = bufferPtr - 1;
-    if (byteChar == lineSeparator[0]) {
-       for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) {
-         if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) {
-           return byteChar;
-         }
-       }
-
-        lineCount++;
-        byteChar = normalizedLineSeparator;
-
-        // we don't need to update buffer position if line separator is one byte long
-        if (lineSeparator.length > 1) {
-          bufferPtr += (lineSeparator.length - 1);
-          if (bufferPtr >= length) {
-            if (length != -1) {
-              updateBuffer();
-            } else {
-              throw StreamFinishedPseudoException.INSTANCE;
-            }
-          }
-        }
-      }
-
-    return byteChar;
-  }
-
-  /**
-   * Get next byte from stream.  Do no maintain any line count  Will throw a StreamFinishedPseudoException
-   * when the stream has run out of bytes.
-   * @return next byte from stream.
-   * @throws IOException
-   */
-  public final byte nextCharNoNewLineCheck() throws IOException {
-
-    if (length == -1) {
-      throw StreamFinishedPseudoException.INSTANCE;
-    }
-
-    rangeCheck(buffer, bufferPtr - 1, bufferPtr);
-
-    byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr);
-
-    if (bufferPtr >= length) {
-      if (length != -1) {
-        updateBuffer();
-        bufferPtr--;
-      } else {
-        throw StreamFinishedPseudoException.INSTANCE;
-      }
-    }
-
-    bufferPtr++;
-
-    return byteChar;
-  }
-
-  /**
-   * Number of lines read since the start of this split.
-   * @return
-   */
-  public final long lineCount() {
-    return lineCount;
-  }
-
-  /**
-   * Skip forward the number of line delimiters.  If you are in the middle of a line,
-   * a value of 1 will skip to the start of the next record.
-   * @param lines Number of lines to skip.
-   * @throws IOException
-   */
-  public final void skipLines(int lines) throws IOException {
-    if (lines < 1) {
-      return;
-    }
-    long expectedLineCount = this.lineCount + lines;
-
-    try {
-      do {
-        nextChar();
-      } while (lineCount < expectedLineCount);
-      if (lineCount < lines) {
-        throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
-      }
-    } catch (EOFException ex) {
-      throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
-    }
-  }
-
-  public final long charCount() {
-    return charCount + bufferPtr;
-  }
-
-  public long getLineCount() {
-    return lineCount;
-  }
-
-  public void close() throws IOException{
-    input.close();
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java
deleted file mode 100644
index 1042994..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-/* Base class for producing output record batches while dealing with
- * Text files.
- */
-abstract class TextOutput {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextOutput.class);
-
-  /**
-   * Start processing a new field within a record.
-   * @param index  index within the record
-   */
-  public abstract void startField(int index);
-
-  /**
-   * End processing a field within a record.
-   * @return  true if engine should continue processing record.  false if rest of record can be skipped.
-   */
-  public abstract boolean endField();
-
-  /**
-   * Shortcut that lets the output know that we are closing ending a field with no data.
-   * @return true if engine should continue processing record.  false if rest of record can be skipped.
-   */
-  public abstract boolean endEmptyField();
-
-  /**
-   * Add the provided data but drop any whitespace.
-   * @param data
-   */
-  public void appendIgnoringWhitespace(byte data){
-    if(TextReader.isWhite(data)){
-      // noop
-    }else{
-      append(data);
-    }
-  }
-
-  /**
-   * This function appends the byte to the output character data buffer
-   * @param data  current byte read
-   */
-  public abstract void append(byte data);
-
-  /**
-   * Completes the processing of a given record. Also completes the processing of the
-   * last field being read.
-   */
-  public abstract void finishRecord();
-
-  /**
-   *  Return the total number of records (across batches) processed
-   */
-  public abstract long getRecordCount();
-
-  /**
-   * Informs output to setup for new record batch.
-   */
-  public abstract void startBatch();
-
-  /**
-   * Does any final cleanup that is required for closing a batch.  Example might include closing the last field.
-   */
-  public abstract void finishBatch();
-
-  /**
-   * Helper method to check if the current record has any non-empty fields
-   */
-  public abstract boolean rowHasData();
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java
deleted file mode 100644
index 684c453..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-import java.io.IOException;
-
-import com.univocity.parsers.common.ParsingContext;
-
-class TextParsingContext implements ParsingContext {
-
-  private final TextInput input;
-  private final TextOutput output;
-  protected boolean stopped = false;
-
-  private int[] extractedIndexes = null;
-
-  public TextParsingContext(TextInput input, TextOutput output) {
-    this.input = input;
-    this.output = output;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void stop() {
-    stopped = true;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean isStopped() {
-    return stopped;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public long currentLine() {
-    return input.lineCount();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public long currentChar() {
-    return input.charCount();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public int currentColumn() {
-    return -1;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String[] headers() {
-    return new String[]{};
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public int[] extractedFieldIndexes() {
-    return extractedIndexes;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public long currentRecord() {
-    return output.getRecordCount();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String currentParsedContent() {
-    try {
-      return input.getStringSinceMarkForError();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void skipLines(int lines) {
-  }
-
-  @Override
-  public boolean columnsReordered() {
-    return false;
-  }
-}
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
deleted file mode 100644
index 36dd4f9..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-
-public class TextParsingSettings {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextParsingSettings.class);
-
-  public static final TextParsingSettings DEFAULT = new TextParsingSettings();
-
-  private String emptyValue = null;
-  private boolean parseUnescapedQuotes = true;
-  private byte quote = b('"');
-  private byte quoteEscape = b('"');
-  private byte delimiter = b(',');
-  private byte comment = b('#');
-
-  private long maxCharsPerColumn = Character.MAX_VALUE;
-  private byte normalizedNewLine = b('\n');
-  private byte[] newLineDelimiter = {normalizedNewLine};
-  private boolean ignoreLeadingWhitespaces = false;
-  private boolean ignoreTrailingWhitespaces = false;
-  private String lineSeparatorString = "\n";
-  private boolean skipFirstLine = false;
-
-  private boolean headerExtractionEnabled = false;
-  private boolean useRepeatedVarChar = true;
-  private int numberOfRecordsToRead = -1;
-
-  public void set(TextFormatConfig config){
-    this.quote = bSafe(config.getQuote(), "quote");
-    this.quoteEscape = bSafe(config.getEscape(), "escape");
-    this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
-    this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
-    this.comment = bSafe(config.getComment(), "comment");
-    this.skipFirstLine = config.isSkipFirstLine();
-    this.headerExtractionEnabled = config.isHeaderExtractionEnabled();
-    if (this.headerExtractionEnabled) {
-      // In case of header TextRecordReader will use set of VarChar vectors vs RepeatedVarChar
-      this.useRepeatedVarChar = false;
-    }
-  }
-
-  public byte getComment(){
-    return comment;
-  }
-
-  public boolean isSkipFirstLine() {
-    return skipFirstLine;
-  }
-
-  public void setSkipFirstLine(boolean skipFirstLine) {
-    this.skipFirstLine = skipFirstLine;
-  }
-
-  public boolean isUseRepeatedVarChar() {
-    return useRepeatedVarChar;
-  }
-
-  public void setUseRepeatedVarChar(boolean useRepeatedVarChar) {
-    this.useRepeatedVarChar = useRepeatedVarChar;
-  }
-
-
-  private static byte bSafe(char c, String name){
-    if(c > Byte.MAX_VALUE) {
-      throw new IllegalArgumentException(String.format("Failure validating configuration option %s.  Expected a "
-          + "character between 0 and 127 but value was actually %d.", name, (int) c));
-    }
-    return (byte) c;
-  }
-
-  private static byte b(char c){
-    return (byte) c;
-  }
-
-  public byte[] getNewLineDelimiter() {
-    return newLineDelimiter;
-  }
-
-  /**
-   * Returns the character used for escaping values where the field delimiter is part of the value. Defaults to '"'
-   * @return the quote character
-   */
-  public byte getQuote() {
-    return quote;
-  }
-
-  /**
-   * Defines the character used for escaping values where the field delimiter is part of the value. Defaults to '"'
-   * @param quote the quote character
-   */
-  public void setQuote(byte quote) {
-    this.quote = quote;
-  }
-
-  public String getLineSeparatorString(){
-    return lineSeparatorString;
-  }
-
-
-  /**
-   * Identifies whether or not a given character is used for escaping values where the field delimiter is part of the value
-   * @param ch the character to be verified
-   * @return true if the given character is the character used for escaping values, false otherwise
-   */
-  public boolean isQuote(byte ch) {
-    return this.quote == ch;
-  }
-
-  /**
-   * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"'
-   * @return the quote escape character
-   */
-  public byte getQuoteEscape() {
-    return quoteEscape;
-  }
-
-  /**
-   * Defines the character used for escaping quotes inside an already quoted value. Defaults to '"'
-   * @param quoteEscape the quote escape character
-   */
-  public void setQuoteEscape(byte quoteEscape) {
-    this.quoteEscape = quoteEscape;
-  }
-
-  /**
-   * Identifies whether or not a given character is used for escaping quotes inside an already quoted value.
-   * @param ch the character to be verified
-   * @return true if the given character is the quote escape character, false otherwise
-   */
-  public boolean isQuoteEscape(byte ch) {
-    return this.quoteEscape == ch;
-  }
-
-  /**
-   * Returns the field delimiter character. Defaults to ','
-   * @return the field delimiter character
-   */
-  public byte getDelimiter() {
-    return delimiter;
-  }
-
-  /**
-   * Defines the field delimiter character. Defaults to ','
-   * @param delimiter the field delimiter character
-   */
-  public void setDelimiter(byte delimiter) {
-    this.delimiter = delimiter;
-  }
-
-  /**
-   * Identifies whether or not a given character represents a field delimiter
-   * @param ch the character to be verified
-   * @return true if the given character is the field delimiter character, false otherwise
-   */
-  public boolean isDelimiter(byte ch) {
-    return this.delimiter == ch;
-  }
-
-  /**
-   * Returns the String representation of an empty value (defaults to null)
-   *
-   * <p>When reading, if the parser does not read any character from the input, and the input is within quotes, the empty is used instead of an empty string
-   *
-   * @return the String representation of an empty value
-   */
-  public String getEmptyValue() {
-    return emptyValue;
-  }
-
-  /**
-   * Sets the String representation of an empty value (defaults to null)
-   *
-   * <p>When reading, if the parser does not read any character from the input, and the input is within quotes, the empty is used instead of an empty string
-   *
-   * @param emptyValue the String representation of an empty value
-   */
-  public void setEmptyValue(String emptyValue) {
-    this.emptyValue = emptyValue;
-  }
-
-
-  /**
-   * Indicates whether the CSV parser should accept unescaped quotes inside quoted values and parse them normally. Defaults to {@code true}.
-   * @return a flag indicating whether or not the CSV parser should accept unescaped quotes inside quoted values.
-   */
-  public boolean isParseUnescapedQuotes() {
-    return parseUnescapedQuotes;
-  }
-
-  /**
-   * Configures how to handle unescaped quotes inside quoted values. If set to {@code true}, the parser will parse the quote normally as part of the value.
-   * If set the {@code false}, a {@link com.univocity.parsers.common.TextParsingException} will be thrown. Defaults to {@code true}.
-   * @param parseUnescapedQuotes indicates whether or not the CSV parser should accept unescaped quotes inside quoted values.
-   */
-  public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) {
-    this.parseUnescapedQuotes = parseUnescapedQuotes;
-  }
-
-  /**
-   * Indicates whether or not the first valid record parsed from the input should be considered as the row containing the names of each column
-   * @return true if the first valid record parsed from the input should be considered as the row containing the names of each column, false otherwise
-   */
-  public boolean isHeaderExtractionEnabled() {
-    return headerExtractionEnabled;
-  }
-
-  /**
-   * Defines whether or not the first valid record parsed from the input should be considered as the row containing the names of each column
-   * @param headerExtractionEnabled a flag indicating whether the first valid record parsed from the input should be considered as the row containing the names of each column
-   */
-  public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) {
-    this.headerExtractionEnabled = headerExtractionEnabled;
-  }
-
-  /**
-   * The number of valid records to be parsed before the process is stopped. A negative value indicates there's no limit (defaults to -1).
-   * @return the number of records to read before stopping the parsing process.
-   */
-  public int getNumberOfRecordsToRead() {
-    return numberOfRecordsToRead;
-  }
-
-  /**
-   * Defines the number of valid records to be parsed before the process is stopped. A negative value indicates there's no limit (defaults to -1).
-   * @param numberOfRecordsToRead the number of records to read before stopping the parsing process.
-   */
-  public void setNumberOfRecordsToRead(int numberOfRecordsToRead) {
-    this.numberOfRecordsToRead = numberOfRecordsToRead;
-  }
-
-  public long getMaxCharsPerColumn() {
-    return maxCharsPerColumn;
-  }
-
-  public void setMaxCharsPerColumn(long maxCharsPerColumn) {
-    this.maxCharsPerColumn = maxCharsPerColumn;
-  }
-
-  public void setComment(byte comment) {
-    this.comment = comment;
-  }
-
-  public byte getNormalizedNewLine() {
-    return normalizedNewLine;
-  }
-
-  public void setNormalizedNewLine(byte normalizedNewLine) {
-    this.normalizedNewLine = normalizedNewLine;
-  }
-
-  public boolean isIgnoreLeadingWhitespaces() {
-    return ignoreLeadingWhitespaces;
-  }
-
-  public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) {
-    this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces;
-  }
-
-  public boolean isIgnoreTrailingWhitespaces() {
-    return ignoreTrailingWhitespaces;
-  }
-
-  public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) {
-    this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces;
-  }
-
-
-
-
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
deleted file mode 100644
index 7a9ed46..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-
-import org.apache.drill.common.exceptions.UserException;
-
-import com.univocity.parsers.common.TextParsingException;
-
-/*******************************************************************************
- * Portions Copyright 2014 uniVocity Software Pty Ltd
- ******************************************************************************/
-
-/**
- * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and
- * DrillBuf support.
- */
-final class TextReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class);
-
-  private static final byte NULL_BYTE = (byte) '\0';
-
-  private final TextParsingContext context;
-
-  private final long recordsToRead;
-  private final TextParsingSettings settings;
-
-  private final TextInput input;
-  private final TextOutput output;
-  private final DrillBuf workBuf;
-
-  private byte ch;
-
-  // index of the field within this record
-  private int fieldIndex;
-
-  /** Behavior settings **/
-  private final boolean ignoreTrailingWhitespace;
-  private final boolean ignoreLeadingWhitespace;
-  private final boolean parseUnescapedQuotes;
-
-  /** Key Characters **/
-  private final byte comment;
-  private final byte delimiter;
-  private final byte quote;
-  private final byte quoteEscape;
-  private final byte newLine;
-
-  /**
-   * The CsvParser supports all settings provided by {@link CsvParserSettings}, and requires this configuration to be
-   * properly initialized.
-   * @param settings  the parser configuration
-   * @param input  input stream
-   * @param output  interface to produce output record batch
-   * @param workBuf  working buffer to handle whitespaces
-   */
-  public TextReader(TextParsingSettings settings, TextInput input, TextOutput output, DrillBuf workBuf) {
-    this.context = new TextParsingContext(input, output);
-    this.workBuf = workBuf;
-    this.settings = settings;
-
-    this.recordsToRead = settings.getNumberOfRecordsToRead() == -1 ? Long.MAX_VALUE : settings.getNumberOfRecordsToRead();
-
-    this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces();
-    this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces();
-    this.parseUnescapedQuotes = settings.isParseUnescapedQuotes();
-    this.delimiter = settings.getDelimiter();
-    this.quote = settings.getQuote();
-    this.quoteEscape = settings.getQuoteEscape();
-    this.newLine = settings.getNormalizedNewLine();
-    this.comment = settings.getComment();
-
-    this.input = input;
-    this.output = output;
-
-  }
-
-  public TextOutput getOutput(){
-    return output;
-  }
-
-  /* Check if the given byte is a white space. As per the univocity text reader
-   * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed
-   * we have an additional check to make sure its not negative
-   */
-  static final boolean isWhite(byte b){
-    return b <= ' ' && b > -1;
-  }
-
-  // Inform the output interface to indicate we are starting a new record batch
-  public void resetForNextBatch(){
-    output.startBatch();
-  }
-
-  public long getPos(){
-    return input.getPos();
-  }
-
-  /**
-   * Function encapsulates parsing an entire record, delegates parsing of the
-   * fields to parseField() function.
-   * We mark the start of the record and if there are any failures encountered (OOM for eg)
-   * then we reset the input stream to the marked position
-   * @return  true if parsing this record was successful; false otherwise
-   * @throws IOException
-   */
-  private boolean parseRecord() throws IOException {
-    final byte newLine = this.newLine;
-    final TextInput input = this.input;
-
-    input.mark();
-
-    fieldIndex = 0;
-    if (isWhite(ch) && ignoreLeadingWhitespace) {
-      skipWhitespace();
-    }
-
-    int fieldsWritten = 0;
-    try{
-      boolean earlyTerm = false;
-      while (ch != newLine) {
-        earlyTerm = !parseField();
-        fieldsWritten++;
-        if (ch != newLine) {
-          ch = input.nextChar();
-          if (ch == newLine) {
-            output.startField(fieldsWritten++);
-            output.endEmptyField();
-            break;
-          }
-        }
-        if(earlyTerm){
-          if(ch != newLine){
-            input.skipLines(1);
-          }
-          break;
-        }
-      }
-    }catch(StreamFinishedPseudoException e){
-      // if we've written part of a field or all of a field, we should send this row.
-      if(fieldsWritten == 0 && !output.rowHasData()){
-        throw e;
-      }
-    }
-
-    output.finishRecord();
-    return true;
-  }
-
-  /**
-   * Function parses an individual field and ignores any white spaces encountered
-   * by not appending it to the output vector
-   * @throws IOException
-   */
-  private void parseValueIgnore() throws IOException {
-    final byte newLine = this.newLine;
-    final byte delimiter = this.delimiter;
-    final TextOutput output = this.output;
-    final TextInput input = this.input;
-
-    byte ch = this.ch;
-    while (ch != delimiter && ch != newLine) {
-      output.appendIgnoringWhitespace(ch);
-//      fieldSize++;
-      ch = input.nextChar();
-    }
-    this.ch = ch;
-  }
-
-  /**
-   * Function parses an individual field and appends all characters till the delimeter (or newline)
-   * to the output, including white spaces
-   * @throws IOException
-   */
-  private void parseValueAll() throws IOException {
-    final byte newLine = this.newLine;
-    final byte delimiter = this.delimiter;
-    final TextOutput output = this.output;
-    final TextInput input = this.input;
-
-    byte ch = this.ch;
-    while (ch != delimiter && ch != newLine) {
-      output.append(ch);
-      ch = input.nextChar();
-    }
-    this.ch = ch;
-  }
-
-  /**
-   * Function simply delegates the parsing of a single field to the actual implementation based on parsing config
-   * @throws IOException
-   */
-  private void parseValue() throws IOException {
-    if (ignoreTrailingWhitespace) {
-      parseValueIgnore();
-    }else{
-      parseValueAll();
-    }
-  }
-
-  /**
-   * Recursive function invoked when a quote is encountered. Function also
-   * handles the case when there are non-white space characters in the field
-   * after the quoted value.
-   * @param prev  previous byte read
-   * @throws IOException
-   */
-  private void parseQuotedValue(byte prev) throws IOException {
-    final byte newLine = this.newLine;
-    final byte delimiter = this.delimiter;
-    final TextOutput output = this.output;
-    final TextInput input = this.input;
-    final byte quote = this.quote;
-
-    ch = input.nextCharNoNewLineCheck();
-
-    while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) {
-      if (ch != quote) {
-        if (prev == quote) { // unescaped quote detected
-          if (parseUnescapedQuotes) {
-            output.append(quote);
-            output.append(ch);
-            parseQuotedValue(ch);
-            break;
-          } else {
-            throw new TextParsingException(
-                context,
-                "Unescaped quote character '"
-                    + quote
-                    + "' inside quoted value of CSV field. To allow unescaped quotes, set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. Cannot parse CSV input.");
-          }
-        }
-        output.append(ch);
-        prev = ch;
-      } else if (prev == quoteEscape) {
-        output.append(quote);
-        prev = NULL_BYTE;
-      } else {
-        prev = ch;
-      }
-      ch = input.nextCharNoNewLineCheck();
-    }
-
-    // Handles whitespaces after quoted value:
-    // Whitespaces are ignored (i.e., ch <= ' ') if they are not used as delimiters (i.e., ch != ' ')
-    // For example, in tab-separated files (TSV files), '\t' is used as delimiter and should not be ignored
-    // Content after whitespaces may be parsed if 'parseUnescapedQuotes' is enabled.
-    if (ch != newLine && ch <= ' ' && ch != delimiter) {
-      final DrillBuf workBuf = this.workBuf;
-      workBuf.resetWriterIndex();
-      do {
-        // saves whitespaces after value
-        workBuf.writeByte(ch);
-        ch = input.nextChar();
-        // found a new line, go to next record.
-        if (ch == newLine) {
-          return;
-        }
-      } while (ch <= ' ' && ch != delimiter);
-
-      // there's more stuff after the quoted value, not only empty spaces.
-      if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) {
-
-        output.append(quote);
-        for(int i =0; i < workBuf.writerIndex(); i++){
-          output.append(workBuf.getByte(i));
-        }
-        // the next character is not the escape character, put it there
-        if (ch != quoteEscape) {
-          output.append(ch);
-        }
-        // sets this character as the previous character (may be escaping)
-        // calls recursively to keep parsing potentially quoted content
-        parseQuotedValue(ch);
-      }
-    }
-
-    if (!(ch == delimiter || ch == newLine)) {
-      throw new TextParsingException(context, "Unexpected character '" + ch
-          + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input.");
-    }
-  }
-
-  /**
-   * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function
-   * @return
-   * @throws IOException
-   */
-  private final boolean parseField() throws IOException {
-
-    output.startField(fieldIndex++);
-
-    if (isWhite(ch) && ignoreLeadingWhitespace) {
-      skipWhitespace();
-    }
-
-    if (ch == delimiter) {
-      return output.endEmptyField();
-    } else {
-      if (ch == quote) {
-        parseQuotedValue(NULL_BYTE);
-      } else {
-        parseValue();
-      }
-
-      return output.endField();
-    }
-
-  }
-
-  /**
-   * Helper function to skip white spaces occurring at the current input stream.
-   * @throws IOException
-   */
-  private void skipWhitespace() throws IOException {
-    final byte delimiter = this.delimiter;
-    final byte newLine = this.newLine;
-    final TextInput input = this.input;
-
-    while (isWhite(ch) && ch != delimiter && ch != newLine) {
-      ch = input.nextChar();
-    }
-  }
-
-  /**
-   * Starting point for the reader. Sets up the input interface.
-   * @throws IOException
-   */
-  public final void start() throws IOException {
-    context.stopped = false;
-    input.start();
-  }
-
-
-  /**
-   * Parses the next record from the input. Will skip the line if its a comment,
-   * this is required when the file contains headers
-   * @throws IOException
-   */
-  public final boolean parseNext() throws IOException {
-    try {
-      while (!context.stopped) {
-        ch = input.nextChar();
-        if (ch == comment) {
-          input.skipLines(1);
-          continue;
-        }
-        break;
-      }
-      final long initialLineNumber = input.lineCount();
-      boolean success = parseRecord();
-      if (initialLineNumber + 1 < input.lineCount()) {
-        throw new TextParsingException(context, "Cannot use newline character within quoted string");
-      }
-
-      if (success) {
-        if (recordsToRead > 0 && context.currentRecord() >= recordsToRead) {
-          context.stop();
-        }
-        return true;
-      } else {
-        return false;
-      }
-
-    } catch (UserException ex) {
-      stopParsing();
-      throw ex;
-    } catch (StreamFinishedPseudoException ex) {
-      stopParsing();
-      return false;
-    } catch (Exception ex) {
-      try {
-        throw handleException(ex);
-      } finally {
-        stopParsing();
-      }
-    }
-  }
-
-  private void stopParsing(){
-
-  }
-
-  private String displayLineSeparators(String str, boolean addNewLine) {
-    if (addNewLine) {
-      if (str.contains("\r\n")) {
-        str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t");
-      } else if (str.contains("\n")) {
-        str = str.replaceAll("\\n", "[\\\\n]\n\t");
-      } else {
-        str = str.replaceAll("\\r", "[\\\\r]\r\t");
-      }
-    } else {
-      str = str.replaceAll("\\n", "\\\\n");
-      str = str.replaceAll("\\r", "\\\\r");
-    }
-    return str;
-  }
-
-  /**
-   * Helper method to handle exceptions caught while processing text files and generate better error messages associated with
-   * the exception.
-   * @param ex  Exception raised
-   * @return
-   * @throws IOException
-   */
-  private TextParsingException handleException(Exception ex) throws IOException {
-
-    if (ex instanceof TextParsingException) {
-      throw (TextParsingException) ex;
-    }
-
-    if (ex instanceof ArrayIndexOutOfBoundsException) {
-      ex = UserException
-          .dataReadError(ex)
-          .message(
-              "Drill failed to read your text file.  Drill supports up to %d columns in a text file.  Your file appears to have more than that.",
-              RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS)
-          .build(logger);
-    }
-
-    String message = null;
-    String tmp = input.getStringSinceMarkForError();
-    char[] chars = tmp.toCharArray();
-    if (chars != null) {
-      int length = chars.length;
-      if (length > settings.getMaxCharsPerColumn()) {
-        message = "Length of parsed input (" + length
-            + ") exceeds the maximum number of characters defined in your parser settings ("
-            + settings.getMaxCharsPerColumn() + "). ";
-      }
-
-      if (tmp.contains("\n") || tmp.contains("\r")) {
-        tmp = displayLineSeparators(tmp, true);
-        String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false);
-        message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '"
-            + lineSeparator + "'. Parsed content:\n\t" + tmp;
-      }
-
-      int nullCharacterCount = 0;
-      // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError
-      int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length;
-      StringBuilder s = new StringBuilder(maxLength);
-      for (int i = 0; i < maxLength; i++) {
-        if (chars[i] == '\0') {
-          s.append('\\');
-          s.append('0');
-          nullCharacterCount++;
-        } else {
-          s.append(chars[i]);
-        }
-      }
-      tmp = s.toString();
-
-      if (nullCharacterCount > 0) {
-        message += "\nIdentified "
-            + nullCharacterCount
-            + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t"
-            + tmp;
-      }
-
-    }
-
-    throw new TextParsingException(context, message, ex);
-  }
-
-  /**
-   * Finish the processing of a batch, indicates to the output
-   * interface to wrap up the batch
-   */
-  public void finishBatch(){
-    output.finishBatch();
-//    System.out.println(String.format("line %d, cnt %d", input.getLineCount(), output.getRecordCount()));
-  }
-
-  /**
-   * Invoked once there are no more records and we are done with the
-   * current record reader to clean up state.
-   * @throws IOException
-   */
-  public void close() throws IOException{
-    input.close();
-  }
-
-  @Override
-  public String toString() {
-    return "TextReader[Line=" + context.currentLine()
-        + ", Column=" + context.currentChar()
-        + ", Record=" + context.currentRecord()
-        + ", Byte pos=" + getPos()
-        + "]";
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java
deleted file mode 100644
index 0ed3155..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Original version of the "compliant" text reader. This is version 2 of
- * the text reader. This version is retained for temporary backward
- * compatibility as we productize the newer version 3 based on the
- * row set framework.
- * <p>
- * TODO: Remove the files in this package and move the files from the
- * "v3" sub-package here once the version 3 implementation stabilizes.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java
deleted file mode 100644
index 70c43b7..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
-
-class StreamFinishedPseudoException extends RuntimeException {
-
-  public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException();
-
-  private StreamFinishedPseudoException() {
-    super("", null, false, true);
-
-  }
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java
similarity index 98%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java
index 5dd4284..fc17186 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
similarity index 98%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
index 19b3dbd..877d963 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -49,7 +49,7 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
   private static final int WHITE_SPACE_BUFFER = 64 * 1024;
 
   // settings to be used while parsing
-  private final TextParsingSettingsV3 settings;
+  private final TextParsingSettings settings;
   // Chunk of the file to be read by this reader
   private FileSplit split;
   // text reader implementation
@@ -62,7 +62,7 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
 
   private RowSetLoader writer;
 
-  public CompliantTextBatchReader(TextParsingSettingsV3 settings) {
+  public CompliantTextBatchReader(TextParsingSettings settings) {
     this.settings = settings;
 
     // Validate. Otherwise, these problems show up later as a data
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
similarity index 97%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
index e291b3f..048c982 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java
similarity index 99%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java
index 62eafc8..2fb0ffc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java
similarity index 98%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java
index 4c1a0b4..b8551e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/StreamFinishedPseudoException.java
similarity index 83%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/StreamFinishedPseudoException.java
index f2a32b9..0ac2cc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/StreamFinishedPseudoException.java
@@ -15,15 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant;
+package org.apache.drill.exec.store.easy.text.reader;
 
+@SuppressWarnings("serial")
 class StreamFinishedPseudoException extends RuntimeException {
-
-  public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException();
+  public static final StreamFinishedPseudoException INSTANCE =
+        new StreamFinishedPseudoException();
 
   private StreamFinishedPseudoException() {
     super("", null, false, true);
-
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java
similarity index 97%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java
index 28ddd07..3e05e58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
 
@@ -41,7 +41,7 @@ final class TextInput {
 
   private final byte[] lineSeparator;
   private final byte normalizedLineSeparator;
-  private final TextParsingSettingsV3 settings;
+  private final TextParsingSettings settings;
 
   private long lineCount;
   private long charCount;
@@ -91,7 +91,7 @@ final class TextInput {
    * {@link Format#getNormalizedNewline()}) that is used to replace any
    * lineSeparator sequence found in the input.
    */
-  public TextInput(TextParsingSettingsV3 settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
+  public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
     this.lineSeparator = settings.getNewLineDelimiter();
     byte normalizedLineSeparator = settings.getNormalizedNewLine();
     Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java
similarity index 97%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java
index 48c1849..71d4731 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 
 /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java
similarity index 97%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java
index 86cad4c..3f6dbeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 import java.io.IOException;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
similarity index 97%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
index 12bbf42..a91a8ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -24,8 +24,7 @@ import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 
-// TODO: Remove the "V3" suffix once the V2 version is retired.
-public class TextParsingSettingsV3 {
+public class TextParsingSettings {
 
   private final String emptyValue = null;
   private final boolean parseUnescapedQuotes = true;
@@ -65,7 +64,7 @@ public class TextParsingSettingsV3 {
    * <tt>`csv`</tt> config has no headers. But, if the user has a ".csv"
    * file with headers, the user can just customize the table properties.
    */
-  public TextParsingSettingsV3(TextFormatConfig config,
+  public TextParsingSettings(TextFormatConfig config,
       EasySubScan scan, OptionManager options) {
     TupleMetadata providedSchema = scan.getSchema();
     boolean extractHeaders = config.isHeaderExtractionEnabled();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
similarity index 95%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
index 78adda0..0ce856e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
 
 import java.io.IOException;
 
@@ -42,7 +42,7 @@ public final class TextReader {
 
   private final TextParsingContext context;
 
-  private final TextParsingSettingsV3 settings;
+  private final TextParsingSettings settings;
 
   private final TextInput input;
   private final TextOutput output;
@@ -70,14 +70,14 @@ public final class TextReader {
   private final byte newLine;
 
   /**
-   * The CsvParser supports all settings provided by {@link TextParsingSettingsV3},
+   * The CsvParser supports all settings provided by {@link TextParsingSettings},
    * and requires this configuration to be properly initialized.
    * @param settings  the parser configuration
    * @param input  input stream
    * @param output  interface to produce output record batch
    * @param workBuf  working buffer to handle whitespace
    */
-  public TextReader(TextParsingSettingsV3 settings, TextInput input, TextOutput output, DrillBuf workBuf) {
+  public TextReader(TextParsingSettings settings, TextInput input, TextOutput output, DrillBuf workBuf) {
     this.context = new TextParsingContext(input, output);
     this.workBuf = workBuf;
     this.settings = settings;
@@ -135,10 +135,8 @@ public final class TextReader {
     output.startRecord();
     int fieldsWritten = 0;
     try {
-      @SuppressWarnings("unused")
-      boolean earlyTerm = false;
       while (ch != newLine) {
-        earlyTerm = ! parseField();
+        parseField();
         fieldsWritten++;
         if (ch != newLine) {
           ch = input.nextChar();
@@ -148,15 +146,6 @@ public final class TextReader {
             break;
           }
         }
-
-        // Disabling early termination. See DRILL-5914
-
-//        if (earlyTerm) {
-//          if (ch != newLine) {
-//            input.skipLines(1);
-//          }
-//          break;
-//        }
       }
       output.finishRecord();
     } catch (StreamFinishedPseudoException e) {
@@ -268,11 +257,9 @@ public final class TextReader {
         if (prev == quoteEscape) {
           output.append(prev);
         }
-        if (prev == quote) { // unescaped quote detected
+        else if (prev == quote) { // unescaped quote detected
           if (parseUnescapedQuotes) {
-            output.append(quote);
-            output.append(ch);
-            parseQuotedValue(ch);
+            output.append(prev);
             break;
           } else {
             throw new TextParsingException(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/package-info.java
similarity index 93%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/package-info.java
index aced5ad..970ef85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/package-info.java
@@ -19,4 +19,4 @@
  * Version 3 of the text reader. Hosts the "compliant" text reader on
  * the row set framework.
  */
-package org.apache.drill.exec.store.easy.text.compliant.v3;
+package org.apache.drill.exec.store.easy.text.reader;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java
similarity index 94%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java
index 83a00bd..a114fd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.text;
+package org.apache.drill.exec.store.easy.text.writer;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -24,18 +24,17 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.store.StringOutputRecordWriter;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-
-public class DrillTextRecordWriter extends StringOutputRecordWriter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordWriter.class);
+public class TextRecordWriter extends StringOutputRecordWriter {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextRecordWriter.class);
 
   private final StorageStrategy storageStrategy;
 
@@ -47,7 +46,6 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
   private String fieldDelimiter;
   private String extension;
 
-  private static String eol = System.getProperty("line.separator");
   private int index;
   private PrintStream stream = null;
   private FileSystem fs = null;
@@ -58,7 +56,7 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
 
   private Configuration fsConf;
 
-  public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy, Configuration fsConf) {
+  public TextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy, Configuration fsConf) {
     super(allocator);
     this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : storageStrategy;
     this.fsConf = new Configuration(fsConf);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
deleted file mode 100644
index 0db17fb..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.text;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Predicate;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.vector.RepeatedVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
-public class DrillTextRecordReader extends AbstractRecordReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class);
-
-  private static final String COL_NAME = "columns";
-
-  private org.apache.hadoop.mapred.RecordReader<LongWritable, Text> reader;
-  private final List<ValueVector> vectors = Lists.newArrayList();
-  private byte delimiter;
-  private FieldReference ref = new FieldReference(COL_NAME);
-  private RepeatedVarCharVector vector;
-  private List<Integer> columnIds = Lists.newArrayList();
-  private LongWritable key;
-  private Text value;
-  private int numCols = 0;
-  private FileSplit split;
-  private long totalRecordsRead;
-
-  public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context,
-      char delimiter, List<SchemaPath> columns) {
-    this.delimiter = (byte) delimiter;
-    this.split = split;
-    setColumns(columns);
-
-    if (!isStarQuery()) {
-      String pathStr;
-      for (SchemaPath path : columns) {
-        assert path.getRootSegment().isNamed();
-        pathStr = path.getRootSegment().getPath();
-        Preconditions.checkArgument(COL_NAME.equals(pathStr) || (SchemaPath.DYNAMIC_STAR.equals(pathStr) && path.getRootSegment().getChild() == null),
-            "Selected column(s) must have name 'columns' or must be plain '*'");
-
-        if (path.getRootSegment().getChild() != null) {
-          Preconditions.checkArgument(path.getRootSegment().getChild().isArray(), "Selected column must be an array index");
-          int index = path.getRootSegment().getChild().getArraySegment().getIndex();
-          columnIds.add(index);
-        }
-      }
-      Collections.sort(columnIds);
-      numCols = columnIds.size();
-    }
-
-    TextInputFormat inputFormat = new TextInputFormat();
-    JobConf job = new JobConf(fsConf);
-    job.setInt("io.file.buffer.size", context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE));
-    job.setInputFormat(inputFormat.getClass());
-    try {
-      reader = inputFormat.getRecordReader(split, job, Reporter.NULL);
-      key = reader.createKey();
-      value = reader.createValue();
-      totalRecordsRead = 0;
-    } catch (Exception e) {
-      handleAndRaise("Failure in creating record reader", e);
-    }
-  }
-
-  @Override
-  protected List<SchemaPath> getDefaultColumnsToRead() {
-    return DEFAULT_TEXT_COLS_TO_READ;
-  }
-
-  @Override
-  public boolean isStarQuery() {
-    return super.isStarQuery() || Iterables.tryFind(getColumns(), new Predicate<SchemaPath>() {
-      private final SchemaPath COLUMNS = SchemaPath.getSimplePath("columns");
-      @Override
-      public boolean apply(@Nullable SchemaPath path) {
-        return path.equals(COLUMNS);
-      }
-    }).isPresent();
-  }
-
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
-    MaterializedField field = MaterializedField.create(ref.getAsNamePart().getName(), Types.repeated(TypeProtos.MinorType.VARCHAR));
-    try {
-      vector = output.addField(field, RepeatedVarCharVector.class);
-    } catch (Exception e) {
-      handleAndRaise("Failure in setting up reader", e);
-    }
-  }
-
-  protected void handleAndRaise(String s, Exception e) {
-    String message = "Error in text record reader.\nMessage: " + s +
-      "\nSplit information:\n\tPath: " + split.getPath() +
-      "\n\tStart: " + split.getStart() +
-      "\n\tLength: " + split.getLength();
-    throw new DrillRuntimeException(message, e);
-  }
-
-  @Override
-  public int next() {
-//    logger.debug("vector value capacity {}", vector.getValueCapacity());
-//    logger.debug("vector byte capacity {}", vector.getByteCapacity());
-    int batchSize = 0;
-    try {
-      int recordCount = 0;
-      final RepeatedVarCharVector.Mutator mutator = vector.getMutator();
-      while (recordCount < Character.MAX_VALUE && batchSize < 200*1000 && reader.next(key, value)) {
-        int start;
-        int end = -1;
-
-        // index of the scanned field
-        int p = 0;
-        int i = 0;
-        mutator.startNewValue(recordCount);
-        // Process each field in this line
-        while (end < value.getLength() - 1) {
-          if(numCols > 0 && p >= numCols) {
-            break;
-          }
-          start = end;
-          if (delimiter == '\n') {
-            end = value.getLength();
-          } else {
-            end = find(value, delimiter, start + 1);
-            if (end == -1) {
-              end = value.getLength();
-            }
-          }
-          if (numCols > 0 && i++ < columnIds.get(p)) {
-            mutator.addSafe(recordCount, value.getBytes(), start + 1, 0);
-            continue;
-          }
-          p++;
-          mutator.addSafe(recordCount, value.getBytes(), start + 1, end - start - 1);
-          batchSize += end - start;
-        }
-        recordCount++;
-        totalRecordsRead++;
-      }
-      for (final ValueVector v : vectors) {
-        v.getMutator().setValueCount(recordCount);
-      }
-      mutator.setValueCount(recordCount);
-      // logger.debug("text scan batch size {}", batchSize);
-      return recordCount;
-    } catch(Exception e) {
-      close();
-      handleAndRaise("Failure while parsing text. Parser was at record: " + (totalRecordsRead + 1), e);
-    }
-
-    // this is never reached
-    return 0;
-  }
-
-  /**
-   * Returns the index within the text of the first occurrence of delimiter, starting the search at the specified index.
-   *
-   * @param  text  the text being searched
-   * @param  delimiter the delimiter
-   * @param  start the index to start searching
-   * @return      the first occurrence of delimiter, starting the search at the specified index
-   */
-  public int find(Text text, byte delimiter, int start) {
-    int len = text.getLength();
-    int p = start;
-    byte[] bytes = text.getBytes();
-    boolean inQuotes = false;
-    while (p < len) {
-      if ('\"' == bytes[p]) {
-        inQuotes = !inQuotes;
-      }
-      if (!inQuotes && bytes[p] == delimiter) {
-        return p;
-      }
-      p++;
-    }
-    return -1;
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    } catch (IOException e) {
-      logger.warn("Exception closing reader: {}", e);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "DrillTextRecordReader[File=" + split.getPath()
-        + ", Record=" + (totalRecordsRead + 1)
-        + ", Start=" + split.getStart()
-        + ", Length=" + split.getLength()
-        + "]";
-  }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
index 7bc65fc..c5c283d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
@@ -170,7 +170,7 @@ public class TestSelectWithOption extends BaseTestQuery {
     // It seems that a parameter can not be called "escape"
     testWithResult(format("select columns from table(%s(`escape` => '$', type => 'TeXT', fieldDelimiter => '|', quote => '@'))", quoteTableName),
         listOf("b", "0"),
-        listOf("b$@c", "1")); // shouldn't $ be removed here?
+        listOf("b@c", "1"));
   }
 
   @Test
@@ -230,9 +230,14 @@ public class TestSelectWithOption extends BaseTestQuery {
     String jsonTableName = genCSVTable("testVariationsJSON",
         "{\"columns\": [\"f\",\"g\"]}");
     // the extension is actually csv
-    testWithResult(format("select columns from %s", jsonTableName),
-        listOf("{\"columns\": [\"f\"", "g\"]}\n")
-        );
+    // Don't try to read the CSV file, however, as it does not
+    // contain proper quotes for CSV.
+    // File contents:
+    // {"columns": ["f","g"]}
+    // CSV would require:
+    // "{""columns"": [""f"",""g""]}"
+    // A bug in older versions appeared to have the perverse
+    // effect of allowing the above to kinds-sorta work.
     String[] jsonQueries = {
         format("select columns from table(%s(type => 'JSON'))", jsonTableName),
         // we can use named format plugin configurations too!
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
index 07ffba0..403aab1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java
@@ -17,21 +17,21 @@
  */
 package org.apache.drill.exec.store;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.io.Files;
-import org.apache.drill.test.BaseTestQuery;
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.Text;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.BaseTestQuery;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
 public class TestImplicitFileColumns extends BaseTestQuery {
   public static final String CSV = "csv";
   public static final String MAIN = "main";
@@ -199,9 +199,9 @@ public class TestImplicitFileColumns extends BaseTestQuery {
   @Test
   public void testStarColumnCsv() throws Exception {
     final BatchSchema expectedSchema = new SchemaBuilder()
+        .addArray("columns", TypeProtos.MinorType.VARCHAR)
         .addNullable("dir0", TypeProtos.MinorType.VARCHAR)
         .addNullable("dir1", TypeProtos.MinorType.VARCHAR)
-        .addArray("columns", TypeProtos.MinorType.VARCHAR)
         .build();
 
     testBuilder()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
index 3aa5584..2071546 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
@@ -80,14 +80,6 @@ public class BaseCsvTest extends ClusterTest {
     buildFile(new File(nestedDir, NESTED_FILE), secondFile);
   }
 
-  protected void enableV3(boolean enable) {
-    client.alterSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY, enable);
-  }
-
-  protected void resetV3() {
-    client.resetSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY);
-  }
-
   protected void enableMultiScan() {
 
     // Special test-only feature to force even small scans
@@ -153,12 +145,10 @@ public class BaseCsvTest extends ClusterTest {
   }
 
   protected void enableSchemaSupport() {
-    enableV3(true);
     enableSchema(true);
   }
 
   protected void resetSchemaSupport() {
-    resetV3();
     resetSchema();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestCsvHeader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
similarity index 99%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestCsvHeader.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
index fdef252..d190a63 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestCsvHeader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.text;
+package org.apache.drill.exec.store.easy.text.compliant;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.BaseTestQuery;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
index 5a52664..dea2c9e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
@@ -33,9 +33,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-// CSV reader now hosted on the row set framework
 @Category(RowSetTests.class)
-public class TestCsvIgnoreHeaders  extends BaseCsvTest{
+public class TestCsvIgnoreHeaders extends BaseCsvTest{
 
   private static String withHeaders[] = {
       "a,b,c",
@@ -59,17 +58,6 @@ public class TestCsvIgnoreHeaders  extends BaseCsvTest{
   public void testColumns() throws IOException {
     String fileName = "simple.csv";
     buildFile(fileName, withHeaders);
-    try {
-      enableV3(false);
-      doTestColumns(fileName);
-      enableV3(true);
-      doTestColumns(fileName);
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestColumns(String fileName) throws IOException {
     String sql = "SELECT columns FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
 
@@ -86,17 +74,6 @@ public class TestCsvIgnoreHeaders  extends BaseCsvTest{
 
   @Test
   public void testRaggedRows() throws IOException {
-    try {
-      enableV3(false);
-      doTestRaggedRows();
-      enableV3(true);
-      doTestRaggedRows();
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestRaggedRows() throws IOException {
     String fileName = "ragged.csv";
     TestCsvWithHeaders.buildFile(new File(testDir, fileName), raggedRows);
     String sql = "SELECT columns FROM `dfs.data`.`%s`";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
index a540694..7c40128 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.PrintWriter;
 
+import org.apache.drill.TestSelectWithOption;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -47,6 +48,9 @@ import org.junit.experimental.categories.Category;
  * is provided, the text format plugin will create columns
  * using that schema rather than using the "columns" array
  * column.
+ *
+ * @see {@link TestSelectWithOption} for similar tests using table
+ * properties within SQL
  */
 
 @Category(RowSetTests.class)
@@ -448,4 +452,67 @@ public class TestCsvTableProperties extends BaseCsvTest {
       resetSchemaSupport();
     }
   }
+
+  private static String messyQuotesData[] = {
+      "first\"field\"here,another \"field",
+      "end quote\",another\"",
+      "many\"\"\"\",more\"\"",
+      "\"not\"end\",\"\"wtf\" \"",
+      "\"newline\nhere\",\"and here\"\"\n\""
+    };
+
+  /**
+   * The legacy "V2" text reader had special handling for quotes
+   * that appear inside fields. Example:<pre><tt>
+   * first"field"here,another "field</tt></pre>
+   * <p>
+   * Since behavior in this case is ill-defined, the reader
+   * apparently treated quotes as normal characters unless the
+   * field started with a quote. There is an option in the UniVocity
+   * code to set this behavior, but it is not exposed in Drill.
+   * So, this test verifies the non-customizable messy quote handling
+   * logic.
+   * <p>
+   * If a field starts with a quote, quoting rules kick in, including
+   * the quote escape, which is, by default, itself a quote. So
+   * <br><code>"foo""bar"</code><br>
+   * is read as
+   * <br><code>foo"bar</code><br>
+   * But, for fields not starting with a quote, the quote escape
+   * is ignored, so:
+   * <br><code>foo""bar</code><br>
+   * is read as
+   * <br><code>foo""bar</code><br>
+   * This seems more like a bug than a feature, but it does appear to be
+   * how the "new" text reader always worked, so the behavior is preserved.
+   * <p>
+   * Also, seems that the text reader supported embedded newlines, even
+   * though such behavior <i><b>will not work</b></i> if the embedded
+   * newline occurs near a split. In this case, the reader will scan
+   * forward to find a record delimiter (a newline by default), will
+   * find the embedded newline, and will read a partial first record.
+   * Again, this appears to be legacy behavior, and so is preserved,
+   * even if broken.
+   * <p>
+   * The key thing is that if the CSV is well-formed (no messy quotes,
+   * properly quoted fields with proper escapes, no embedded newlines)
+   * then things will work OK.
+   */
+
+  @Test
+  public void testMessyQuotes() throws Exception {
+   String tablePath = buildTable("messyQuotes", messyQuotesData);
+    RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("columns", MinorType.VARCHAR)
+        .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addSingleCol(strArray("first\"field\"here", "another \"field"))
+        .addSingleCol(strArray("end quote\"", "another\""))
+        .addSingleCol(strArray("many\"\"\"\"", "more\"\""))
+        .addSingleCol(strArray("not\"end", "\"wtf\" "))
+        .addSingleCol(strArray("newline\nhere", "and here\"\n"))
+        .build();
+    RowSetUtilities.verify(expected, actual);
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index 7abbf3d..0b0c181 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -41,47 +41,27 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
- * Sanity test of CSV files with headers. Tests both the original
- * "compliant" version and the V3 version based on the row set
- * framework.
+ * Sanity test of CSV files with headers.
  * <p>
- * The CSV reader is a "canary in the coal mine" for many scan features.
- * It turns out that there are several bugs in "V2" (AKA "new text reader")
- * that are fixed in "V3" (the one based on the row set framework), and one
- * that is not yet fixed.
+ * Open issues:
  *
  * <ul>
- * <li>Ragged rows will crash the V2 text reader when headers are used.
- * No V2 test exists as a result. Fixed in V3.</li>
- * <li>DRILL-7083: in V2, if files are nested to 2 levels, but we ask
- * for dir2 (the non-existent third level), the type of dir2 will be
- * nullable INT. In V3, the type is Nullable VARCHAR (just like for the
- * existing partition levels.)</li>
  * <li>DRILL-7080: A query like SELECT *, dir0 produces the result schema
  * of (dir0, a, b, ...) in V2 and (a, b, ... dir0, dir00) in V3. This
  * seems to be a bug in the Project operator.</li>
  * </ul>
  *
- * The V3 tests all demonstrate that the row set scan framework
+ * The tests all demonstrate that the row set scan framework
  * delivers a first empty batch from each scan. I (Paul) had understood
  * that we had an "fast schema" path as the result of the "empty batch"
- * project. However, the V2 reader does not provide the schema-only
+ * project. However, the V2 reader did not provide the schema-only
  * first batch. So, not sure if doing so is a feature, or a bug because
  * things changed. Easy enough to change if we choose to. If so, the
  * tests here would remove the test for that schema-only batch.
- * <p>
- * Tests are run for both V2 and V3. When the results are the same,
- * the test occurs once, wrapped in a "driver" to select V2 or V3 mode.
- * When behavior differs, there are separate tests for V2 and V3.
- * <p>
- * The V2 tests are temporary. Once we accept that V3 is stable, we
- * can remove V2 (and the "old text reader.") The behavior in V3 is
- * more correct, no reason to keep the old, broken behavior.
  *
  * @see {@link TestHeaderBuilder}
  */
 
-// CSV reader now hosted on the row set framework
 @Category(RowSetTests.class)
 public class TestCsvWithHeaders extends BaseCsvTest {
 
@@ -125,17 +105,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
   @Test
   public void testEmptyFile() throws IOException {
     buildFile(EMPTY_FILE, new String[] {});
-    try {
-      enableV3(false);
-      doTestEmptyFile();
-      enableV3(true);
-      doTestEmptyFile();
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestEmptyFile() throws IOException {
     RowSet rowSet = client.queryBuilder().sql(makeStatement(EMPTY_FILE)).rowSet();
     assertNull(rowSet);
   }
@@ -150,17 +119,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
   public void testEmptyCsvHeaders() throws IOException {
     buildFile(EMPTY_HEADERS_FILE, emptyHeaders);
     try {
-      enableV3(false);
-      doTestEmptyCsvHeaders();
-      enableV3(true);
-      doTestEmptyCsvHeaders();
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestEmptyCsvHeaders() throws IOException {
-    try {
       client.queryBuilder().sql(makeStatement(EMPTY_HEADERS_FILE)).run();
       fail();
     } catch (Exception e) {
@@ -170,17 +128,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
 
   @Test
   public void testValidCsvHeaders() throws IOException {
-    try {
-      enableV3(false);
-      doTestValidCsvHeaders();
-      enableV3(true);
-      doTestValidCsvHeaders();
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestValidCsvHeaders() throws IOException {
     RowSet actual = client.queryBuilder().sql(makeStatement(TEST_FILE_NAME)).rowSet();
 
     TupleMetadata expectedSchema = new SchemaBuilder()
@@ -196,17 +143,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
 
   @Test
   public void testInvalidCsvHeaders() throws IOException {
-    try {
-      enableV3(false);
-      doTestInvalidCsvHeaders();
-      enableV3(true);
-      doTestInvalidCsvHeaders();
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestInvalidCsvHeaders() throws IOException {
     String fileName = "case3.csv";
     buildFile(fileName, invalidHeaders);
     RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet();
@@ -225,20 +161,9 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     RowSetUtilities.verify(expected, actual);
   }
 
+  // Test fix for DRILL-5590
   @Test
   public void testCsvHeadersCaseInsensitive() throws IOException {
-    try {
-      enableV3(false);
-      doTestCsvHeadersCaseInsensitive();
-      enableV3(true);
-      doTestCsvHeadersCaseInsensitive();
-    } finally {
-      resetV3();
-    }
-  }
-
-  // Test fix for DRILL-5590
-  private void doTestCsvHeadersCaseInsensitive() throws IOException {
     String sql = "SELECT A, b, C FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
 
@@ -258,23 +183,12 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     return "SELECT * FROM `dfs.data`.`" + fileName + "`";
   }
 
-  @Test
-  public void testWildcard() throws IOException {
-    try {
-      enableV3(false);
-      doTestWildcard();
-      enableV3(true);
-      doTestWildcard();
-    } finally {
-      resetV3();
-    }
-  }
-
   /**
    * Verify that the wildcard expands columns to the header names, including
    * case
    */
-  private void doTestWildcard() throws IOException {
+  @Test
+  public void testWildcard() throws IOException {
     String sql = "SELECT * FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
 
@@ -292,85 +206,24 @@ public class TestCsvWithHeaders extends BaseCsvTest {
 
   /**
    * Verify that implicit columns are recognized and populated. Sanity test
-   * of just one implicit column. V2 uses nullable VARCHAR for file
-   * metadata columns.
-   */
-
-  @Test
-  public void testImplicitColsExplicitSelectV2() throws IOException {
-    try {
-      enableV3(false);
-      String sql = "SELECT A, filename FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("A", MinorType.VARCHAR)
-          .addNullable("filename", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("10", TEST_FILE_NAME)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
-  }
-
-  /**
-   * Verify that implicit columns are recognized and populated. Sanity test
    * of just one implicit column. V3 uses non-nullable VARCHAR for file
    * metadata columns.
    */
 
   @Test
-  public void testImplicitColsExplicitSelectV3() throws IOException {
-    try {
-      enableV3(true);
-      String sql = "SELECT A, filename FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("A", MinorType.VARCHAR)
-          .add("filename", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("10", TEST_FILE_NAME)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
-  }
+  public void testImplicitColsExplicitSelect() throws IOException {
+    String sql = "SELECT A, filename FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
 
-  /**
-   * Verify that implicit columns are recognized and populated. Sanity test
-   * of just one implicit column. V2 uses nullable VARCHAR for file
-   * metadata columns.
-   */
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("A", MinorType.VARCHAR)
+        .add("filename", MinorType.VARCHAR)
+        .buildSchema();
 
-  @Test
-  public void testImplicitColWildcardV2() throws IOException {
-    try {
-      enableV3(false);
-      String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .add("b", MinorType.VARCHAR)
-          .add("c", MinorType.VARCHAR)
-          .addNullable("filename", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("10", "foo", "bar", TEST_FILE_NAME)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("10", TEST_FILE_NAME)
+        .build();
+    RowSetUtilities.verify(expected, actual);
   }
 
   /**
@@ -380,41 +233,25 @@ public class TestCsvWithHeaders extends BaseCsvTest {
    */
 
   @Test
-  public void testImplicitColWildcardV3() throws IOException {
-    try {
-      enableV3(true);
-      String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .add("b", MinorType.VARCHAR)
-          .add("c", MinorType.VARCHAR)
-          .add("filename", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("10", "foo", "bar", TEST_FILE_NAME)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
+  public void testImplicitColWildcard() throws IOException {
+    String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .add("c", MinorType.VARCHAR)
+        .add("filename", MinorType.VARCHAR)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("10", "foo", "bar", TEST_FILE_NAME)
+        .build();
+    RowSetUtilities.verify(expected, actual);
   }
 
   @Test
   public void testColsWithWildcard() throws IOException {
-    try {
-      enableV3(false);
-      doTestColsWithWildcard();
-      enableV3(true);
-      doTestColsWithWildcard();
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestColsWithWildcard() throws IOException {
     String sql = "SELECT *, a as d FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
 
@@ -432,73 +269,30 @@ public class TestCsvWithHeaders extends BaseCsvTest {
   }
 
   /**
-   * V2 does not allow explicit use of dir0, dir1, etc. columns for a non-partitioned
-   * file. Treated as undefined nullable int columns.
-   */
-
-  @Test
-  public void testPartitionColsExplicitV2() throws IOException {
-    try {
-      enableV3(false);
-      String sql = "SELECT a, dir0, dir5 FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .addNullable("dir0", MinorType.INT)
-          .addNullable("dir5", MinorType.INT)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("10", null, null)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
-  }
-
-  /**
    * V3 allows the use of partition columns, even for a non-partitioned file.
    * The columns are null of type Nullable VARCHAR. This is area of Drill
    * is a bit murky: it seems reasonable to support partition columns consistently
    * rather than conditionally based on the structure of the input.
    */
   @Test
-  public void testPartitionColsExplicitV3() throws IOException {
-    try {
-      enableV3(true);
-      String sql = "SELECT a, dir0, dir5 FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .addNullable("dir0", MinorType.VARCHAR)
-          .addNullable("dir5", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("10", null, null)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
+  public void testPartitionColsExplicit() throws IOException {
+    String sql = "SELECT a, dir0, dir5 FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .addNullable("dir0", MinorType.VARCHAR)
+        .addNullable("dir5", MinorType.VARCHAR)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("10", null, null)
+        .build();
+    RowSetUtilities.verify(expected, actual);
   }
 
   @Test
   public void testDupColumn() throws IOException {
-    try {
-      enableV3(false);
-      doTestDupColumn();
-      enableV3(true);
-      doTestDupColumn();
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestDupColumn() throws IOException {
     String sql = "SELECT a, b, a FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
 
@@ -514,91 +308,31 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     RowSetUtilities.verify(expected, actual);
   }
 
-  // This test cannot be run for V2. The data gets corrupted and we get
-  // internal errors.
-
   /**
    * Test that ragged rows result in the "missing" columns being filled
    * in with the moral equivalent of a null column for CSV: a blank string.
    */
   @Test
-  public void testRaggedRowsV3() throws IOException {
-    try {
-      enableV3(true);
-      String fileName = "case4.csv";
-      buildFile(fileName, raggedRows);
-      RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .add("b", MinorType.VARCHAR)
-          .add("c", MinorType.VARCHAR)
-          .buildSchema();
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("10", "dino", "")
-          .addRow("20", "foo", "bar")
-          .addRow("30", "", "")
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
-  }
+  public void testRaggedRows() throws IOException {
+    String fileName = "case4.csv";
+    buildFile(fileName, raggedRows);
+    RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet();
 
-  /**
-   * Test partition expansion. Because the two files are read in the
-   * same scan operator, the schema is consistent. See
-   * {@link TestPartitionRace} for the multi-threaded race where all
-   * hell breaks loose.
-   * <p>
-   * V2, since Drill 1.12, puts partition columns ahead of data columns.
-   */
-  @Test
-  public void testPartitionExpansionV2() throws IOException {
-    try {
-      enableV3(false);
-
-      String sql = "SELECT * FROM `dfs.data`.`%s`";
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .addNullable("dir0", MinorType.VARCHAR)
-          .add("a", MinorType.VARCHAR)
-          .add("b", MinorType.VARCHAR)
-          .add("c", MinorType.VARCHAR)
-          .buildSchema();
-
-      // Read the two batches.
-
-      for (int i = 0; i < 2; i++) {
-        assertTrue(iter.hasNext());
-        RowSet rowSet = iter.next();
-
-        // Figure out which record this is and test accordingly.
-
-        RowSetReader reader = rowSet.reader();
-        assertTrue(reader.next());
-        String col2 = reader.scalar(1).getString();
-        if (col2.equals("10")) {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow(null, "10", "foo", "bar")
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        } else {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow(NESTED_DIR, "20", "fred", "wilma")
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        }
-      }
-      assertFalse(iter.hasNext());
-    } finally {
-      resetV3();
-    }
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .add("c", MinorType.VARCHAR)
+        .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("10", "dino", "")
+        .addRow("20", "foo", "bar")
+        .addRow("30", "", "")
+        .build();
+    RowSetUtilities.verify(expected, actual);
   }
 
   /**
-   * Test partition expansion in V3.
+   * Test partition expansion .
    * <p>
    * This test is tricky because it will return two data batches
    * (preceded by an empty schema batch.) File read order is random
@@ -609,105 +343,48 @@ public class TestCsvWithHeaders extends BaseCsvTest {
    * files are nested to another level.)
    */
   @Test
-  public void testPartitionExpansionV3() throws IOException {
-    try {
-      enableV3(true);
+  public void testPartitionExpansion() throws IOException {
+    String sql = "SELECT * FROM `dfs.data`.`%s`";
+    Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
 
-      String sql = "SELECT * FROM `dfs.data`.`%s`";
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .add("c", MinorType.VARCHAR)
+        .addNullable("dir0", MinorType.VARCHAR)
+        .buildSchema();
 
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .add("b", MinorType.VARCHAR)
-          .add("c", MinorType.VARCHAR)
-          .addNullable("dir0", MinorType.VARCHAR)
-          .buildSchema();
+    // First batch is empty; just carries the schema.
 
-      // First batch is empty; just carries the schema.
+    assertTrue(iter.hasNext());
+    RowSet rowSet = iter.next();
+    assertEquals(0, rowSet.rowCount());
+    rowSet.clear();
 
-      assertTrue(iter.hasNext());
-      RowSet rowSet = iter.next();
-      assertEquals(0, rowSet.rowCount());
-      rowSet.clear();
-
-      // Read the other two batches.
-
-      for (int i = 0; i < 2; i++) {
-        assertTrue(iter.hasNext());
-        rowSet = iter.next();
-
-        // Figure out which record this is and test accordingly.
-
-        RowSetReader reader = rowSet.reader();
-        assertTrue(reader.next());
-        String col1 = reader.scalar(0).getString();
-        if (col1.equals("10")) {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("10", "foo", "bar", null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        } else {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("20", "fred", "wilma", NESTED_DIR)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        }
-      }
-      assertFalse(iter.hasNext());
-    } finally {
-      resetV3();
-    }
-  }
+    // Read the other two batches.
 
-  /**
-   * Test the use of partition columns with the wildcard. This works for file
-   * metadata columns, but confuses the project operator when used for
-   * partition columns. DRILL-7080.
-   */
-  @Test
-  public void testWilcardAndPartitionsMultiFilesV2() throws IOException {
-    try {
-      enableV3(false);
-
-      String sql = "SELECT *, dir0, dir1 FROM `dfs.data`.`%s`";
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .addNullable("dir0", MinorType.VARCHAR)
-          .add("a", MinorType.VARCHAR)
-          .add("b", MinorType.VARCHAR)
-          .add("c", MinorType.VARCHAR)
-          .addNullable("dir00", MinorType.VARCHAR)
-          .addNullable("dir1", MinorType.INT)
-          .buildSchema();
-
-      // Read the two batches.
-
-      for (int i = 0; i < 2; i++) {
-        assertTrue(iter.hasNext());
-        RowSet rowSet = iter.next();
-
-        // Figure out which record this is and test accordingly.
-
-        RowSetReader reader = rowSet.reader();
-        assertTrue(reader.next());
-        String aCol = reader.scalar("a").getString();
-        if (aCol.equals("10")) {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow(null, "10", "foo", "bar", null, null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        } else {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow(NESTED_DIR, "20", "fred", "wilma", NESTED_DIR, null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        }
+    for (int i = 0; i < 2; i++) {
+      assertTrue(iter.hasNext());
+      rowSet = iter.next();
+
+      // Figure out which record this is and test accordingly.
+
+      RowSetReader reader = rowSet.reader();
+      assertTrue(reader.next());
+      String col1 = reader.scalar(0).getString();
+      if (col1.equals("10")) {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow("10", "foo", "bar", null)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
+      } else {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow("20", "fred", "wilma", NESTED_DIR)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
       }
-      assertFalse(iter.hasNext());
-    } finally {
-      resetV3();
     }
+    assertFalse(iter.hasNext());
   }
 
   /**
@@ -718,167 +395,103 @@ public class TestCsvWithHeaders extends BaseCsvTest {
    * partition column moves after data columns.
    */
   @Test
-  public void testWilcardAndPartitionsMultiFilesV3() throws IOException {
-    try {
-      enableV3(true);
+  public void testWilcardAndPartitionsMultiFiles() throws IOException {
+    String sql = "SELECT *, dir0, dir1 FROM `dfs.data`.`%s`";
+    Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .add("c", MinorType.VARCHAR)
+        .addNullable("dir0", MinorType.VARCHAR)
+        .addNullable("dir1", MinorType.VARCHAR)
+        .addNullable("dir00", MinorType.VARCHAR)
+        .addNullable("dir10", MinorType.VARCHAR)
+        .buildSchema();
 
-      String sql = "SELECT *, dir0, dir1 FROM `dfs.data`.`%s`";
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+    // First batch is empty; just carries the schema.
 
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .add("b", MinorType.VARCHAR)
-          .add("c", MinorType.VARCHAR)
-          .addNullable("dir0", MinorType.VARCHAR)
-          .addNullable("dir1", MinorType.VARCHAR)
-          .addNullable("dir00", MinorType.VARCHAR)
-          .addNullable("dir10", MinorType.VARCHAR)
-          .buildSchema();
+    assertTrue(iter.hasNext());
+    RowSet rowSet = iter.next();
+    RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
+        rowSet);
 
-      // First batch is empty; just carries the schema.
+    // Read the two batches.
 
+    for (int i = 0; i < 2; i++) {
       assertTrue(iter.hasNext());
-      RowSet rowSet = iter.next();
-      RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
-          rowSet);
-
-      // Read the two batches.
-
-      for (int i = 0; i < 2; i++) {
-        assertTrue(iter.hasNext());
-        rowSet = iter.next();
-
-        // Figure out which record this is and test accordingly.
-
-        RowSetReader reader = rowSet.reader();
-        assertTrue(reader.next());
-        String aCol = reader.scalar("a").getString();
-        if (aCol.equals("10")) {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("10", "foo", "bar", null, null, null, null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        } else {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("20", "fred", "wilma", NESTED_DIR, null, NESTED_DIR, null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        }
-      }
-      assertFalse(iter.hasNext());
-    } finally {
-      resetV3();
-    }
-  }
-
-  /**
-   * Test using partition columns with partitioned files in V2. Since the
-   * file is nested to one level, dir0 is a nullable VARCHAR, but dir1 is
-   * a nullable INT. Since both files are read in a single scan operator,
-   * the schema is consistent.
-   */
-  @Test
-  public void doTestExplicitPartitionsMultiFilesV2() throws IOException {
-    try {
-      enableV3(false);
-
-      String sql = "SELECT a, b, c, dir0, dir1 FROM `dfs.data`.`%s`";
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .add("b", MinorType.VARCHAR)
-          .add("c", MinorType.VARCHAR)
-          .addNullable("dir0", MinorType.VARCHAR)
-          .addNullable("dir1", MinorType.INT)
-          .buildSchema();
-
-      // Read the two batches.
-
-      for (int i = 0; i < 2; i++) {
-        assertTrue(iter.hasNext());
-        RowSet rowSet = iter.next();
-
-        // Figure out which record this is and test accordingly.
-
-        RowSetReader reader = rowSet.reader();
-        assertTrue(reader.next());
-        String aCol = reader.scalar("a").getString();
-        if (aCol.equals("10")) {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("10", "foo", "bar", null, null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        } else {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("20", "fred", "wilma", NESTED_DIR, null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        }
+      rowSet = iter.next();
+
+      // Figure out which record this is and test accordingly.
+
+      RowSetReader reader = rowSet.reader();
+      assertTrue(reader.next());
+      String aCol = reader.scalar("a").getString();
+      if (aCol.equals("10")) {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow("10", "foo", "bar", null, null, null, null)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
+      } else {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow("20", "fred", "wilma", NESTED_DIR, null, NESTED_DIR, null)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
       }
-      assertFalse(iter.hasNext());
-    } finally {
-      resetV3();
     }
+    assertFalse(iter.hasNext());
   }
 
-  /**
+   /**
    * Test using partition columns with partitioned files in V3. Although the
    * file is nested to one level, both dir0 and dir1 are nullable VARCHAR.
    * See {@link TestPartitionRace} to show that the types and schemas
    * are consistent even when used across multiple scans.
    */
   @Test
-  public void doTestExplicitPartitionsMultiFilesV3() throws IOException {
-    try {
-      enableV3(true);
+  public void doTestExplicitPartitionsMultiFiles() throws IOException {
+    String sql = "SELECT a, b, c, dir0, dir1 FROM `dfs.data`.`%s`";
+    Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .add("c", MinorType.VARCHAR)
+        .addNullable("dir0", MinorType.VARCHAR)
+        .addNullable("dir1", MinorType.VARCHAR)
+        .buildSchema();
 
-      String sql = "SELECT a, b, c, dir0, dir1 FROM `dfs.data`.`%s`";
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+    // First batch is empty; just carries the schema.
 
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .add("b", MinorType.VARCHAR)
-          .add("c", MinorType.VARCHAR)
-          .addNullable("dir0", MinorType.VARCHAR)
-          .addNullable("dir1", MinorType.VARCHAR)
-          .buildSchema();
+    assertTrue(iter.hasNext());
+    RowSet rowSet = iter.next();
+    RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
+        rowSet);
 
-      // First batch is empty; just carries the schema.
+    // Read the two batches.
 
+    for (int i = 0; i < 2; i++) {
       assertTrue(iter.hasNext());
-      RowSet rowSet = iter.next();
-      RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
-          rowSet);
-
-      // Read the two batches.
-
-      for (int i = 0; i < 2; i++) {
-        assertTrue(iter.hasNext());
-        rowSet = iter.next();
-
-        // Figure out which record this is and test accordingly.
-
-        RowSetReader reader = rowSet.reader();
-        assertTrue(reader.next());
-        String aCol = reader.scalar("a").getString();
-        if (aCol.equals("10")) {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("10", "foo", "bar", null, null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        } else {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("20", "fred", "wilma", NESTED_DIR, null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        }
+      rowSet = iter.next();
+
+      // Figure out which record this is and test accordingly.
+
+      RowSetReader reader = rowSet.reader();
+      assertTrue(reader.next());
+      String aCol = reader.scalar("a").getString();
+      if (aCol.equals("10")) {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow("10", "foo", "bar", null, null)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
+      } else {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow("20", "fred", "wilma", NESTED_DIR, null)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
       }
-      assertFalse(iter.hasNext());
-    }
-    finally {
-      resetV3();
     }
+    assertFalse(iter.hasNext());
   }
 
   /**
@@ -886,38 +499,31 @@ public class TestCsvWithHeaders extends BaseCsvTest {
    * column when using column headers.
    */
   @Test
-  public void testColumnsColV3() throws IOException {
-    try {
-      enableV3(true);
-
-      String sql = "SELECT author, columns FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, COLUMNS_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("author", MinorType.VARCHAR)
-          .add("columns", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("fred", "Rocks Today,Dino Wrangling")
-          .addRow("barney", "Bowlarama")
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
+  public void testColumnsCol() throws IOException {
+    String sql = "SELECT author, columns FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, COLUMNS_FILE_NAME).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("author", MinorType.VARCHAR)
+        .add("columns", MinorType.VARCHAR)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("fred", "Rocks Today,Dino Wrangling")
+        .addRow("barney", "Bowlarama")
+        .build();
+    RowSetUtilities.verify(expected, actual);
   }
 
   /**
    * The column name `columns` is treated as a plain old
    * column when using column headers. If used with an index,
    * validation will fail because the VarChar column is not an array
+   * @throws Exception
    */
   @Test
-  public void testColumnsIndexV3() throws IOException {
+  public void testColumnsIndex() throws Exception {
     try {
-      enableV3(true);
-
       String sql = "SELECT author, columns[0] FROM `dfs.data`.`%s`";
       client.queryBuilder().sql(sql, COLUMNS_FILE_NAME).run();
     } catch (UserRemoteException e) {
@@ -927,43 +533,32 @@ public class TestCsvWithHeaders extends BaseCsvTest {
       assertTrue(e.getMessage().contains("Plugin config name: csv"));
       assertTrue(e.getMessage().contains("Extract headers: true"));
       assertTrue(e.getMessage().contains("Skip first line: false"));
-    } catch (Exception e) {
-      fail();
-    } finally {
-      resetV3();
     }
   }
 
   @Test
-  public void testColumnsMissingV3() throws IOException {
-    try {
-      enableV3(true);
-
-      String sql = "SELECT a, columns FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("a", MinorType.VARCHAR)
-          .add("columns", MinorType.VARCHAR)
-          .buildSchema();
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("10", "")
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
+  public void testColumnsMissing() throws IOException {
+    String sql = "SELECT a, columns FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("columns", MinorType.VARCHAR)
+        .buildSchema();
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("10", "")
+        .build();
+    RowSetUtilities.verify(expected, actual);
   }
 
   /**
    * If columns[x] is used, then this can't possibly match a valid
    * text reader column, so raise an error instead.
+   * @throws Exception
    */
   @Test
-  public void testColumnsIndexMissingV3() throws IOException {
+  public void testColumnsIndexMissing() throws Exception {
     try {
-      enableV3(true);
-
       String sql = "SELECT a, columns[0] FROM `dfs.data`.`%s`";
       client.queryBuilder().sql(sql, TEST_FILE_NAME).run();
     } catch (UserRemoteException e) {
@@ -975,35 +570,26 @@ public class TestCsvWithHeaders extends BaseCsvTest {
       assertTrue(e.getMessage().contains("Plugin config name: csv"));
       assertTrue(e.getMessage().contains("Extract headers: true"));
       assertTrue(e.getMessage().contains("Skip first line: false"));
-    } catch (Exception e) {
-      fail();
-    } finally {
-      resetV3();
     }
   }
 
   @Test
   public void testHugeColumn() throws IOException {
     String fileName = buildBigColFile(true);
-    try {
-      enableV3(true);
-      String sql = "SELECT * FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
-      assertEquals(10, actual.rowCount());
-      RowSetReader reader = actual.reader();
-      while (reader.next()) {
-        int i = reader.logicalIndex();
-        assertEquals(Integer.toString(i + 1), reader.scalar(0).getString());
-        String big = reader.scalar(1).getString();
-        assertEquals(BIG_COL_SIZE, big.length());
-        for (int j = 0; j < BIG_COL_SIZE; j++) {
-          assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j));
-        }
-        assertEquals(Integer.toString((i + 1) * 10), reader.scalar(2).getString());
+    String sql = "SELECT * FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
+    assertEquals(10, actual.rowCount());
+    RowSetReader reader = actual.reader();
+    while (reader.next()) {
+      int i = reader.logicalIndex();
+      assertEquals(Integer.toString(i + 1), reader.scalar(0).getString());
+      String big = reader.scalar(1).getString();
+      assertEquals(BIG_COL_SIZE, big.length());
+      for (int j = 0; j < BIG_COL_SIZE; j++) {
+        assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j));
       }
-      actual.clear();
-    } finally {
-      resetV3();
+      assertEquals(Integer.toString((i + 1) * 10), reader.scalar(2).getString());
     }
+    actual.clear();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
index 955eb3d..9a76c1f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
@@ -34,14 +34,12 @@ import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.drill.test.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.joda.time.Instant;
 import org.joda.time.LocalDate;
 import org.joda.time.LocalTime;
 import org.joda.time.Period;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -74,11 +72,6 @@ public class TestCsvWithSchema extends BaseCsvTest {
     "4,betty,2019-05-04,NA"
   };
 
-  private static final String multi2FullContents[] = {
-    "id,name,date",
-    "3,barney,2001-01-16,NA"
-  };
-
   private static final String reordered2Contents[] = {
     "name,id,date",
     "barney,3,2001-01-16"
@@ -308,93 +301,6 @@ public class TestCsvWithSchema extends BaseCsvTest {
   }
 
   /**
-   * Test the schema we get in V2 when the table read order is random.
-   * Worst-case: the two files have different column counts and
-   * column orders.
-   * <p>
-   * Though the results are random, we iterate 10 times which, in most runs,
-   * shows the random variation in schemas:
-   * <ul>
-   * <li>Sometimes the first batch has three columns, sometimes four.</li>
-   * <li>Sometimes the column `id` is in position 0, sometimes in position 1
-   * (correlated with the above).</li>
-   * <li>Due to the fact that sometimes the first file (with four columns)
-   * is returned first, sometimes the second file (with three columns) is
-   * returned first.</li>
-   * </ul>
-   */
-  @Test
-  public void testSchemaRaceV2() throws Exception {
-    try {
-      enableV3(false);
-      enableSchema(false);
-      enableMultiScan();
-      String tablePath = buildTable("schemaRaceV2", multi1Contents, reordered2Contents);
-      boolean sawFile1First = false;
-      boolean sawFile2First = false;
-      boolean sawFullSchema = false;
-      boolean sawPartialSchema = false;
-      boolean sawIdAsCol0 = false;
-      boolean sawIdAsCol1 = false;
-      String sql = "SELECT * FROM " + tablePath;
-      for (int i = 0; i < 10; i++) {
-        Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql).rowSetIterator();
-        int batchCount = 0;
-        while(iter.hasNext()) {
-          batchCount++;
-          RowSet result = iter.next();
-          TupleMetadata resultSchema = result.schema();
-          if (resultSchema.size() == 4) {
-            sawFullSchema = true;
-          } else {
-            assertEquals(3, resultSchema.size());
-            sawPartialSchema = true;
-          }
-          if (resultSchema.index("id") == 0) {
-            sawIdAsCol0 = true;
-          } else {
-            assertEquals(1, resultSchema.index("id"));
-            sawIdAsCol1 = true;
-          }
-          if (batchCount == 1) {
-            RowSetReader reader = result.reader();
-            assertTrue(reader.next());
-            String id = reader.scalar("id").getString();
-            if (id.equals("1")) {
-              sawFile1First = true;
-            } else {
-              assertEquals("3", id);
-              sawFile2First = true;
-            }
-          }
-          result.clear();
-        }
-      }
-
-      // Outcome is random (which is the key problem). Don't assert on these
-      // because doing so can lead to a flakey test.
-
-      if (!sawFile1First || ! sawFile2First || !sawFullSchema || !sawPartialSchema || !sawIdAsCol0 || !sawIdAsCol1) {
-        System.out.println("Some variations did not occur");
-        System.out.println(String.format("File 1 first: %s", sawFile1First));
-        System.out.println(String.format("File 1 second: %s", sawFile2First));
-        System.out.println(String.format("Full schema: %s", sawFullSchema));
-        System.out.println(String.format("Partial schema: %s", sawPartialSchema));
-        System.out.println(String.format("`id` as col 0: %s", sawIdAsCol0));
-        System.out.println(String.format("`id` as col 1: %s", sawIdAsCol1));
-      }
-      // Sanity checks
-      assertTrue(sawFullSchema);
-      assertTrue(sawFile1First || sawFile2First);
-      assertTrue(sawIdAsCol0 || sawIdAsCol1);
-    } finally {
-      resetV3();
-      resetSchema();
-      resetMultiScan();
-    }
-  }
-
-  /**
    * Show that, without schema, the hard schema change for the "missing"
    * gender column causes an error in the sort operator when presented with
    * one batch in which gender is VARCHAR, another in which it is
@@ -406,89 +312,23 @@ public class TestCsvWithSchema extends BaseCsvTest {
     try {
       enableSchema(false);
       enableMultiScan();
-      enableV3(false);
       String tablePath = buildTable("wildcardSortV2", multi1Contents, reordered2Contents);
-      doTestWildcardSortFailure(tablePath);
-      enableV3(true);
-      doTestWildcardSortFailure(tablePath);
-    } finally {
-      resetV3();
-      resetSchema();
-      resetMultiScan();
-    }
-  }
-
-  private void doTestWildcardSortFailure(String tablePath) throws Exception {
-    String sql = "SELECT * FROM " + tablePath + " ORDER BY id";
-    boolean sawError = false;
-    for (int i = 0; i < 10; i++) {
-      try {
-        // When this fails it will print a nasty stack trace.
-        RowSet result = client.queryBuilder().sql(sql).rowSet();
-        assertEquals(4, result.rowCount());
-        result.clear();
-      } catch (RpcException e) {
-        assertTrue(e.getCause() instanceof UserRemoteException);
-        sawError = true;
-        break;
-      }
-    }
-    assertTrue(sawError);
-  }
-
-  /**
-   * Test an explicit projection with a sort. Using the sort 1) will blow up
-   * if the internal schema is inconsistent, and 2) allows easier verification
-   * of the merged table results.
-   * <p>
-   * Fails with <code><pre>
-   * #: id, name, gender
-   * 0: "1", "barney", ""
-   * 1: "2", "", ""
-   * 2: "3", "
-   * ", ""
-   * 3: "4", "
-   * " java.lang.NegativeArraySizeException: null
-   *   at io.netty.buffer.DrillBuf.unsafeGetMemory(DrillBuf.java:852) ~[classes/:4.0.48.Final]
-   * </pre></code>
-   */
-  @Test
-  @Ignore("Vectors get corrupted somehow")
-  public void testV2ExplicitSortFailure() throws Exception {
-    try {
-      enableSchema(false);
-      enableMultiScan();
-      enableV3(false);
-      // V2 fails on ragged columns, use consistent columns
-      String tablePath = buildTable("explicitSortV2", multi1Contents, multi2FullContents);
-      String sql = "SELECT id, name, gender FROM " + tablePath + " ORDER BY id";
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("id", MinorType.VARCHAR)
-          .add("name", MinorType.VARCHAR)
-          .add("gender", MinorType.VARCHAR)
-          .buildSchema();
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("1", "wilma", "female")
-          .addRow("2", "fred", "male")
-          .addRow("3", "barney", "NA")
-          .addRow("4", "betty", "NA")
-          .build();
+      String sql = "SELECT * FROM " + tablePath + " ORDER BY id";
       boolean sawError = false;
       for (int i = 0; i < 10; i++) {
         try {
+          // When this fails it will print a nasty stack trace.
           RowSet result = client.queryBuilder().sql(sql).rowSet();
-          result.print();
-          new RowSetComparison(expected).verifyAndClear(result);
+          assertEquals(4, result.rowCount());
+          result.clear();
         } catch (RpcException e) {
           assertTrue(e.getCause() instanceof UserRemoteException);
           sawError = true;
           break;
         }
       }
-      expected.clear();
       assertTrue(sawError);
     } finally {
-      resetV3();
       resetSchema();
       resetMultiScan();
     }
@@ -504,13 +344,11 @@ public class TestCsvWithSchema extends BaseCsvTest {
    * orders (file1 first sometimes, file2 other times.)
    */
   @Test
-  public void testV3ExplicitSort() throws Exception {
+  public void testExplicitSort() throws Exception {
     try {
       enableSchema(false);
       enableMultiScan();
-      enableV3(true);
-      // V3 handles ragged columns
-      String tablePath = buildTable("v3ExplictSort", raggedMulti1Contents, reordered2Contents);
+      String tablePath = buildTable("explictSort1", raggedMulti1Contents, reordered2Contents);
       String sql = "SELECT id, name, gender FROM " + tablePath + " ORDER BY id";
       TupleMetadata expectedSchema = new SchemaBuilder()
           .add("id", MinorType.VARCHAR)
@@ -529,7 +367,6 @@ public class TestCsvWithSchema extends BaseCsvTest {
       }
       expected.clear();
     } finally {
-      resetV3();
       resetSchema();
       resetMultiScan();
     }
@@ -547,7 +384,7 @@ public class TestCsvWithSchema extends BaseCsvTest {
       enableSchemaSupport();
       enableMultiScan();
       // V3 handles ragged columns
-      String tablePath = buildTable("v3ExplictSort", raggedMulti1Contents, reordered2Contents);
+      String tablePath = buildTable("explictSort2", raggedMulti1Contents, reordered2Contents);
       run(SCHEMA_SQL, tablePath);
       String sql = "SELECT id, name, gender FROM " + tablePath + " ORDER BY id";
       TupleMetadata expectedSchema = new SchemaBuilder()
@@ -641,8 +478,8 @@ public class TestCsvWithSchema extends BaseCsvTest {
    * variation in inputs.
    */
   @Test
-  public void testWildcardV3LenientSchema() throws Exception {
-    String tableName = "wildcardLenientV3";
+  public void testWildcardLenientSchema() throws Exception {
+    String tableName = "wildcardLenient";
     String tablePath = buildTable(tableName, multi1Contents,
         reordered2Contents, nameOnlyContents);
 
@@ -677,8 +514,8 @@ public class TestCsvWithSchema extends BaseCsvTest {
    * projected.
    */
   @Test
-  public void testWildcardV3StrictSchema() throws Exception {
-    String tableName = "wildcardStrictV3";
+  public void testWildcardStrictSchema() throws Exception {
+    String tableName = "wildcardStrict";
     String tablePath = buildTable(tableName, multi1Contents,
         reordered2Contents, nameOnlyContents);
 
@@ -717,7 +554,7 @@ public class TestCsvWithSchema extends BaseCsvTest {
    */
   @Test
   public void testMultiFragmentStrictSchema() throws Exception {
-    String tableName = "wildcardStrict2V3";
+    String tableName = "wildcardStrict2";
     String tablePath = buildTable(tableName, multi1Contents,
         reordered2Contents, nameOnlyContents);
 
@@ -1114,7 +951,6 @@ public class TestCsvWithSchema extends BaseCsvTest {
           .build();
       RowSetUtilities.verify(expected, actual);
     } finally {
-      resetV3();
       resetSchema();
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
index 1340d90..8918b39 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
@@ -21,7 +21,6 @@ import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -42,7 +41,12 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-// CSV reader now hosted on the row set framework
+/**
+ * Test behavior of the text (CSV) reader for files without headers
+ * and without an external schema. Data is represented with the
+ * `columns` array column.
+ */
+
 @Category(RowSetTests.class)
 public class TestCsvWithoutHeaders extends BaseCsvTest {
 
@@ -83,23 +87,12 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
     buildFile(new File(nestedDir, NESTED_FILE), secondSet);
   }
 
-  @Test
-  public void testWildcard() throws IOException {
-    try {
-      enableV3(false);
-      doTestWildcard();
-      enableV3(true);
-      doTestWildcard();
-    } finally {
-      resetV3();
-    }
-  }
-
   /**
    * Verify that the wildcard expands to the `columns` array
    */
 
-  private void doTestWildcard() throws IOException {
+  @Test
+  public void testWildcard() throws IOException {
     String sql = "SELECT * FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
 
@@ -116,17 +109,6 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
 
   @Test
   public void testColumns() throws IOException {
-    try {
-      enableV3(false);
-      doTestColumns();
-      enableV3(true);
-      doTestColumns();
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestColumns() throws IOException {
     String sql = "SELECT columns FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
 
@@ -142,106 +124,41 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
   }
 
   @Test
-  public void doTestWildcardAndMetadataV2() throws IOException {
-    try {
-      enableV3(false);
-      String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .addArray("columns", MinorType.VARCHAR)
-          .addNullable("filename", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
-          .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
-  }
+  public void doTestWildcardAndMetadata() throws IOException {
+    String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
 
-  @Test
-  public void doTestWildcardAndMetadataV3() throws IOException {
-    try {
-      enableV3(true);
-      String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .addArray("columns", MinorType.VARCHAR)
-          .add("filename", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
-          .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
-  }
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("columns", MinorType.VARCHAR)
+        .add("filename", MinorType.VARCHAR)
+        .buildSchema();
 
-  @Test
-  public void testColumnsAndMetadataV2() throws IOException {
-    try {
-      enableV3(false);
-      String sql = "SELECT columns, filename FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .addArray("columns", MinorType.VARCHAR)
-          .addNullable("filename", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
-          .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
+        .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
+        .build();
+    RowSetUtilities.verify(expected, actual);
   }
 
   @Test
-  public void testColumnsAndMetadataV3() throws IOException {
-    try {
-      enableV3(true);
-      String sql = "SELECT columns, filename FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .addArray("columns", MinorType.VARCHAR)
-          .add("filename", MinorType.VARCHAR)
-          .buildSchema();
-
-      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
-          .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
-          .build();
-      RowSetUtilities.verify(expected, actual);
-    } finally {
-      resetV3();
-    }
+  public void testColumnsAndMetadata() throws IOException {
+    String sql = "SELECT columns, filename FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("columns", MinorType.VARCHAR)
+        .add("filename", MinorType.VARCHAR)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
+        .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
+        .build();
+    RowSetUtilities.verify(expected, actual);
   }
 
   @Test
   public void testSpecificColumns() throws IOException {
-    try {
-      enableV3(false);
-      doTestSpecificColumns();
-      enableV3(true);
-      doTestSpecificColumns();
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestSpecificColumns() throws IOException {
     String sql = "SELECT columns[0], columns[2] FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
 
@@ -261,17 +178,6 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
   public void testRaggedRows() throws IOException {
     String fileName = "ragged.csv";
     buildFile(fileName, raggedRows);
-    try {
-      enableV3(false);
-      doTestRaggedRows(fileName);
-      enableV3(true);
-      doTestRaggedRows(fileName);
-    } finally {
-      resetV3();
-    }
-  }
-
-  private void doTestRaggedRows(String fileName) throws IOException {
     String sql = "SELECT columns FROM `dfs.data`.`%s`";
     RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
 
@@ -287,57 +193,6 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
   }
 
   /**
-   * Test partition expansion. Because the two files are read in the
-   * same scan operator, the schema is consistent.
-   * <p>
-   * V2, since Drill 1.12, puts partition columns ahead of data columns.
-   */
-  @Test
-  public void testPartitionExpansionV2() throws IOException {
-    try {
-      enableV3(false);
-
-      String sql = "SELECT * FROM `dfs.data`.`%s`";
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .addNullable("dir0", MinorType.VARCHAR)
-          .addArray("columns", MinorType.VARCHAR)
-          .buildSchema();
-
-      // Read the two batches.
-
-      for (int i = 0; i < 2; i++) {
-        assertTrue(iter.hasNext());
-        RowSet rowSet = iter.next();
-
-        // Figure out which record this is and test accordingly.
-
-        RowSetReader reader = rowSet.reader();
-        assertTrue(reader.next());
-        ArrayReader ar = reader.array(1);
-        assertTrue(ar.next());
-        String col1 = ar.scalar().getString();
-        if (col1.equals("10")) {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow(null, strArray("10", "foo", "bar"))
-              .addRow(null, strArray("20", "fred", "wilma"))
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        } else {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow(NESTED_DIR, strArray("30", "barney", "betty"))
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        }
-      }
-      assertFalse(iter.hasNext());
-    } finally {
-      resetV3();
-    }
-  }
-
-  /**
    * Test partition expansion in V3.
    * <p>
    * V3, as in V2 before Drill 1.12, puts partition columns after
@@ -346,54 +201,48 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
    */
   @Test
   public void testPartitionExpansionV3() throws IOException {
-    try {
-      enableV3(true);
+    String sql = "SELECT * FROM `dfs.data`.`%s`";
+    Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("columns", MinorType.VARCHAR)
+        .addNullable("dir0", MinorType.VARCHAR)
+        .buildSchema();
 
-      String sql = "SELECT * FROM `dfs.data`.`%s`";
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+    // First batch is empty; just carries the schema.
 
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .addArray("columns", MinorType.VARCHAR)
-          .addNullable("dir0", MinorType.VARCHAR)
-          .buildSchema();
+    assertTrue(iter.hasNext());
+    RowSet rowSet = iter.next();
+    assertEquals(0, rowSet.rowCount());
+    rowSet.clear();
 
-      // First batch is empty; just carries the schema.
+    // Read the other two batches.
 
+    for (int i = 0; i < 2; i++) {
       assertTrue(iter.hasNext());
-      RowSet rowSet = iter.next();
-      assertEquals(0, rowSet.rowCount());
-      rowSet.clear();
-
-      // Read the other two batches.
-
-      for (int i = 0; i < 2; i++) {
-        assertTrue(iter.hasNext());
-        rowSet = iter.next();
-
-        // Figure out which record this is and test accordingly.
-
-        RowSetReader reader = rowSet.reader();
-        assertTrue(reader.next());
-        ArrayReader ar = reader.array(0);
-        assertTrue(ar.next());
-        String col1 = ar.scalar().getString();
-        if (col1.equals("10")) {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow(strArray("10", "foo", "bar"), null)
-              .addRow(strArray("20", "fred", "wilma"), null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        } else {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow(strArray("30", "barney", "betty"), NESTED_DIR)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        }
+      rowSet = iter.next();
+
+      // Figure out which record this is and test accordingly.
+
+      RowSetReader reader = rowSet.reader();
+      assertTrue(reader.next());
+      ArrayReader ar = reader.array(0);
+      assertTrue(ar.next());
+      String col1 = ar.scalar().getString();
+      if (col1.equals("10")) {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow(strArray("10", "foo", "bar"), null)
+            .addRow(strArray("20", "fred", "wilma"), null)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
+      } else {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow(strArray("30", "barney", "betty"), NESTED_DIR)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
       }
-      assertFalse(iter.hasNext());
-    } finally {
-      resetV3();
     }
+    assertFalse(iter.hasNext());
   }
 
   /**
@@ -401,22 +250,18 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
    * implicitly suggest that `columns` is a map.
    * <p>
    * V2 message: DATA_READ ERROR: Selected column 'columns' must be an array index
+   * @throws Exception
    */
 
   @Test
-  public void testColumnsAsMap() throws IOException {
+  public void testColumnsAsMap() throws Exception {
     String sql = "SELECT `%s`.columns.foo FROM `dfs.data`.`%s`";
     try {
-      enableV3(true);
       client.queryBuilder().sql(sql, TEST_FILE_NAME, TEST_FILE_NAME).run();
     } catch (UserRemoteException e) {
       assertTrue(e.getMessage().contains(
           "VALIDATION ERROR: Column `columns` has map elements, but must be an array"));
       assertTrue(e.getMessage().contains("Plugin config name: csv"));
-    } catch (Exception e) {
-      fail();
-    } finally {
-      resetV3();
     }
   }
   /**
@@ -424,51 +269,42 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
    * it must be below the maximum.
    * <p>
    * V2 message: INTERNAL_ERROR ERROR: 70000
+   * @throws Exception
    */
 
   @Test
-  public void testColumnsIndexOverflow() throws IOException {
+  public void testColumnsIndexOverflow() throws Exception {
     String sql = "SELECT columns[70000] FROM `dfs.data`.`%s`";
     try {
-      enableV3(true);
       client.queryBuilder().sql(sql, TEST_FILE_NAME, TEST_FILE_NAME).run();
     } catch (UserRemoteException e) {
       assertTrue(e.getMessage().contains(
           "VALIDATION ERROR: `columns`[70000] index out of bounds, max supported size is 65536"));
       assertTrue(e.getMessage().contains("Plugin config name: csv"));
-    } catch (Exception e) {
-      fail();
-    } finally {
-      resetV3();
     }
   }
 
   @Test
   public void testHugeColumn() throws IOException {
     String fileName = buildBigColFile(false);
-    try {
-      enableV3(true);
-      String sql = "SELECT * FROM `dfs.data`.`%s`";
-      RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
-      assertEquals(10, actual.rowCount());
-      RowSetReader reader = actual.reader();
-      ArrayReader arrayReader = reader.array(0);
-      while (reader.next()) {
-        int i = reader.logicalIndex();
-        arrayReader.next();
-        assertEquals(Integer.toString(i + 1), arrayReader.scalar().getString());
-        arrayReader.next();
-        String big = arrayReader.scalar().getString();
-        assertEquals(BIG_COL_SIZE, big.length());
-        for (int j = 0; j < BIG_COL_SIZE; j++) {
-          assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j));
-        }
-        arrayReader.next();
-        assertEquals(Integer.toString((i + 1) * 10), arrayReader.scalar().getString());
+    String sql = "SELECT * FROM `dfs.data`.`%s`";
+    RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
+    assertEquals(10, actual.rowCount());
+    RowSetReader reader = actual.reader();
+    ArrayReader arrayReader = reader.array(0);
+    while (reader.next()) {
+      int i = reader.logicalIndex();
+      arrayReader.next();
+      assertEquals(Integer.toString(i + 1), arrayReader.scalar().getString());
+      arrayReader.next();
+      String big = arrayReader.scalar().getString();
+      assertEquals(BIG_COL_SIZE, big.length());
+      for (int j = 0; j < BIG_COL_SIZE; j++) {
+        assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j));
       }
-      actual.clear();
-    } finally {
-      resetV3();
+      arrayReader.next();
+      assertEquals(Integer.toString((i + 1) * 10), arrayReader.scalar().getString());
     }
+    actual.clear();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
index 67429fb..42b687b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.store.easy.text.compliant.v3.HeaderBuilder;
+import org.apache.drill.exec.store.easy.text.reader.HeaderBuilder;
 import org.apache.drill.test.DrillTest;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
index 9302a9a..b1a2c94 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.rowSet.DirectRowSet;
@@ -49,14 +48,8 @@ import org.junit.Test;
  * persist.
  * <p>
  * The solution is to figure out the max partition depth in the
- * EasySubScan rather than in each scan operator.
- * <p>
- * The tests here test both the "V2" (AKA "new text reader") which has
- * many issues, and the "V3" (row-set-based version) that has fixes.
- * <p>
- * See DRILL-7082 for the multi-scan race (fixed in V3), and
- * DRILL-7083 for the problem with partition columns returning nullable INT
- * (also fixed in V3.)
+ * EasySubScan rather than in each scan operator, which is done in the
+ * current "V3" version. The tests here verify this behavior.
  */
 
 public class TestPartitionRace extends BaseCsvTest {
@@ -77,61 +70,11 @@ public class TestPartitionRace extends BaseCsvTest {
 
   /**
    * Oddly, when run in a single fragment, the files occur in a
-   * stable order, the partition always appars, and it appears in
+   * stable order, the partition always appears, and it appears in
    * the first column position.
    */
   @Test
-  public void testSingleScanV2() throws IOException {
-    String sql = "SELECT * FROM `dfs.data`.`%s`";
-
-    try {
-      enableV3(false);
-
-      // Loop to run the query 10 times, or until we see the race
-
-      boolean sawMissingPartition = false;
-      boolean sawPartitionFirst = false;
-      boolean sawPartitionLast = false;
-
-      // Read the two batches.
-
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
-      for (int j = 0; j < 2; j++) {
-        assertTrue(iter.hasNext());
-        RowSet rowSet = iter.next();
-
-        // Check location of partition column
-
-        int posn = rowSet.schema().index("dir0");
-        if (posn == -1) {
-          sawMissingPartition = true;
-        } else if (posn == 0) {
-          sawPartitionFirst = true;
-        } else {
-          sawPartitionLast = true;
-        }
-        rowSet.clear();
-      }
-      assertFalse(iter.hasNext());
-
-      // When run in a single fragment, the partition column appears
-      // all the time, and is in the first column position.
-
-      assertFalse(sawMissingPartition);
-      assertTrue(sawPartitionFirst);
-      assertFalse(sawPartitionLast);
-    } finally {
-      resetV3();
-      client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY);
-    }
-  }
-
-  /**
-   * V3 provides the same schema for the single- and multi-scan
-   * cases.
-   */
-  @Test
-  public void testSingleScanV3() throws IOException {
+  public void testSingleScan() throws IOException {
     String sql = "SELECT * FROM `dfs.data`.`%s`";
 
     TupleMetadata expectedSchema = new SchemaBuilder()
@@ -141,142 +84,40 @@ public class TestPartitionRace extends BaseCsvTest {
         .addNullable("dir0", MinorType.VARCHAR)
         .buildSchema();
 
-    try {
-      enableV3(true);
-
-      // Loop to run the query 10 times to verify no race
-
-      // First batch is empty; just carries the schema.
-
-      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
-      assertTrue(iter.hasNext());
-      RowSet rowSet = iter.next();
-      assertEquals(0, rowSet.rowCount());
-      rowSet.clear();
+    // Loop to run the query 10 times to verify no race
 
-      // Read the two batches.
+    // First batch is empty; just carries the schema.
 
-      for (int j = 0; j < 2; j++) {
-        assertTrue(iter.hasNext());
-        rowSet = iter.next();
+    Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+    assertTrue(iter.hasNext());
+    RowSet rowSet = iter.next();
+    assertEquals(0, rowSet.rowCount());
+    rowSet.clear();
 
-        // Figure out which record this is and test accordingly.
+    // Read the two batches.
 
-        RowSetReader reader = rowSet.reader();
-        assertTrue(reader.next());
-        String col1 = reader.scalar("a").getString();
-        if (col1.equals("10")) {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("10", "foo", "bar", null)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        } else {
-          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-              .addRow("20", "fred", "wilma", NESTED_DIR)
-              .build();
-          RowSetUtilities.verify(expected, rowSet);
-        }
-      }
-      assertFalse(iter.hasNext());
-    } finally {
-      resetV3();
-    }
-  }
-
-  /**
-   * When forced to run in two fragments, the fun really starts. The
-   * partition column (usually) appears in the last column position instead
-   * of the first. The partition may or may not occur in the first row
-   * depending on which file is read first. The result is that the
-   * other columns will jump around. If we tried to create an expected
-   * result set, we'd be frustrated because the schema randomly changes.
-   * <p>
-   * Just to be clear: this behavior is a bug, not a feature. But, it is
-   * an established baseline for the "V2" reader.
-   * <p>
-   * This is really a test (demonstration) of the wrong behavior. This test
-   * is pretty unreliable. In particular, the position of the partition column
-   * seems to randomly shift from first to last position across runs.
-   */
-  @Test
-  public void testRaceV2() throws IOException {
-    String sql = "SELECT * FROM `dfs.data`.`%s`";
-
-    try {
-      enableV3(false);
-      enableMultiScan();
-
-      // Loop to run the query 10 times, or until we see the race
-
-      boolean sawRootFirst = false;
-      boolean sawNestedFirst = false;
-      boolean sawMissingPartition = false;
-      boolean sawPartitionFirst = false;
-      boolean sawPartitionLast = false;
-      for (int i = 0; i < 10; i++) {
-
-        // Read the two batches.
-
-        Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
-        for (int j = 0; j < 2; j++) {
-          assertTrue(iter.hasNext());
-          RowSet rowSet = iter.next();
-
-          // Check location of partition column
-
-          int posn = rowSet.schema().index("dir0");
-          if (posn == -1) {
-            sawMissingPartition = true;
-          } else if (posn == 0) {
-            sawPartitionFirst = true;
-          } else {
-            sawPartitionLast = true;
-          }
-
-          // Figure out which record this is and test accordingly.
-
-          RowSetReader reader = rowSet.reader();
-          assertTrue(reader.next());
-          String col1 = reader.scalar("a").getString();
-          if (col1.equals("10")) {
-            if (i == 0) {
-              sawRootFirst = true;
-            }
-          } else {
-            if (i == 0) {
-              sawNestedFirst = true;
-            }
-          }
-          rowSet.clear();
-        }
-        assertFalse(iter.hasNext());
-        if (sawMissingPartition &&
-            sawPartitionFirst &&
-            sawPartitionLast &&
-            sawRootFirst &&
-            sawNestedFirst) {
-          // The following should appear most of the time.
-          System.out.println("All variations occurred");
-          return;
-        }
+    for (int j = 0; j < 2; j++) {
+      assertTrue(iter.hasNext());
+      rowSet = iter.next();
+
+      // Figure out which record this is and test accordingly.
+
+      RowSetReader reader = rowSet.reader();
+      assertTrue(reader.next());
+      String col1 = reader.scalar("a").getString();
+      if (col1.equals("10")) {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow("10", "foo", "bar", null)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
+      } else {
+        RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow("20", "fred", "wilma", NESTED_DIR)
+            .build();
+        RowSetUtilities.verify(expected, rowSet);
       }
-
-      // If you see this, maybe something got fixed. Or, maybe the
-      // min parallelization hack above stopped working.
-      // Or, you were just unlucky and can try the test again.
-      // We print messages, rather than using assertTrue, to avoid
-      // introducing a flaky test.
-
-      System.out.println("Some variations did not occur");
-      System.out.println(String.format("Missing partition: %s", sawMissingPartition));
-      System.out.println(String.format("Partition first: %s", sawPartitionFirst));
-      System.out.println(String.format("Partition last: %s", sawPartitionLast));
-      System.out.println(String.format("Outer first: %s", sawRootFirst));
-      System.out.println(String.format("Nested first: %s", sawNestedFirst));
-    } finally {
-      resetV3();
-      resetMultiScan();
     }
+    assertFalse(iter.hasNext());
   }
 
   /**
@@ -286,7 +127,7 @@ public class TestPartitionRace extends BaseCsvTest {
    * "jump around" when files are shifted to a new partition depth.
    */
   @Test
-  public void testNoRaceV3() throws IOException {
+  public void testNoRace() throws IOException {
     String sql = "SELECT * FROM `dfs.data`.`%s`";
 
     TupleMetadata expectedSchema = new SchemaBuilder()
@@ -297,7 +138,6 @@ public class TestPartitionRace extends BaseCsvTest {
         .buildSchema();
 
     try {
-      enableV3(true);
       enableMultiScan();
 
       // Loop to run the query 10 times or until we see both files
@@ -363,7 +203,6 @@ public class TestPartitionRace extends BaseCsvTest {
       System.out.println(String.format("Outer first: %s", sawRootFirst));
       System.out.println(String.format("Nested first: %s", sawNestedFirst));
     } finally {
-      resetV3();
       resetMultiScan();
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextColumn.java
similarity index 91%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextColumn.java
index 2b96924..1712a22 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextColumn.java
@@ -15,16 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.text;
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.test.BaseTestQuery;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 public class TestTextColumn extends BaseTestQuery {
   @Test
   public void testCsvColumnSelection() throws Exception {
@@ -51,9 +51,9 @@ public class TestTextColumn extends BaseTestQuery {
         + "columns[3] as col4 from cp.`store/text/data/letters.csv`");
 
     final TestResultSet expectedResultSet = new TestResultSet();
-    expectedResultSet.addRow("a, b,", "c", "d,, \\n e", "f\\\"g");
-    expectedResultSet.addRow("d, e,", "f", "g,, \\n h", "i\\\"j");
-    expectedResultSet.addRow("g, h,", "i", "j,, \\n k", "l\\\"m");
+    expectedResultSet.addRow("a, b,", "c", "d,, \\n e", "f\"g");
+    expectedResultSet.addRow("d, e,", "f", "g,, \\n h", "i\"j");
+    expectedResultSet.addRow("g, h,", "i", "j,, \\n k", "l\"m");
 
     TestResultSet actualResultSet = new TestResultSet(actualResults);
     assertEquals(expectedResultSet, actualResultSet);
@@ -73,5 +73,4 @@ public class TestTextColumn extends BaseTestQuery {
         .sqlBaselineQuery("select COLUMNS[0], CoLuMnS[1] from cp.`store/text/data/letters.csv`")
         .go();
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java
similarity index 91%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java
index 43ad0d8..3798206 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java
@@ -15,21 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.text;
+package org.apache.drill.exec.store.easy.text.compliant;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.test.BaseTestQuery;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-public class TestNewTextReader extends BaseTestQuery {
+public class TestTextReader extends BaseTestQuery {
 
   @Test
   public void fieldDelimiterWithinQuotes() throws Exception {
@@ -60,10 +60,7 @@ public class TestNewTextReader extends BaseTestQuery {
       test("select max(columns[1]) as %s from cp.`textinput/input1.csv` where %s is not null", COL_NAME, COL_NAME);
       fail("Query should have failed");
     } catch(UserRemoteException ex) {
-      assertEquals(ErrorType.DATA_READ, ex.getErrorType());
-      // Change to the following if V3 is enabled
-      // assertEquals(ErrorType.VALIDATION, ex.getErrorType());
-      assertTrue("Error message should contain " + COL_NAME, ex.getMessage().contains(COL_NAME));
+      assertEquals(ErrorType.VALIDATION, ex.getErrorType());
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TextRecordReaderTest.java
similarity index 97%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TextRecordReaderTest.java
index fb8b50b..8e37c30 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TextRecordReaderTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.text;
+package org.apache.drill.exec.store.easy.text.compliant;
 
 import static org.junit.Assert.assertEquals;
 
@@ -29,10 +29,9 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.util.VectorUtil;
-import org.junit.Test;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.junit.Test;
 
 public class TextRecordReaderTest extends PopUnitTestBase {
 
@@ -63,5 +62,4 @@ public class TextRecordReaderTest extends PopUnitTestBase {
       assertEquals(5, count);
     }
   }
-
 }
diff --git a/exec/java-exec/src/test/resources/store/text/data/letters.csv b/exec/java-exec/src/test/resources/store/text/data/letters.csv
index 4d724e8..1ef564d 100644
--- a/exec/java-exec/src/test/resources/store/text/data/letters.csv
+++ b/exec/java-exec/src/test/resources/store/text/data/letters.csv
@@ -1,3 +1,6 @@
-"a, b,","c","d,, \n e","f\"g"
-"d, e,","f","g,, \n h","i\"j"
-"g, h,","i","j,, \n k","l\"m"
\ No newline at end of file
+# CSV uses quote as the quote escape by default.
+# Earlier versions accidentally used back-slash.
+# The back-slash, and \n, don't do anything; they are regular chars.
+"a, b,","c","d,, \n e","f""g"
+"d, e,","f","g,, \n h","i""j"
+"g, h,","i","j,, \n k","l""m"
\ No newline at end of file