You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/03/24 14:52:26 UTC

[drill] branch master updated: DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2 (#2485)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new fb016f4  DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2 (#2485)
fb016f4 is described below

commit fb016f4f07c5ace99f6b5e8dba236627538b4812
Author: Paul Rogers <pa...@users.noreply.github.com>
AuthorDate: Thu Mar 24 07:52:19 2022 -0700

    DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2 (#2485)
    
    * DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2
    
    Also includes:
    
    * DRILL-8159: Convert the HTTPD reader to use EVF V2
    
    * Build fix
    
    * Changes from review comments
    
    * Fix test issue
---
 .../exec/store/httpd/HttpdLogBatchReader.java      |  50 ++--
 .../exec/store/httpd/HttpdLogFormatPlugin.java     |  35 +--
 .../apache/drill/exec/store/httpd/HttpdParser.java |   4 +-
 .../drill/exec/store/http/TestHttpPlugin.java      |  68 ++---
 .../exec/store/easy/json/JsonRecordWriter.java     |   8 +-
 .../exec/store/easy/text/TextFormatConfig.java     | 130 +++++++++
 .../exec/store/easy/text/TextFormatPlugin.java     | 172 ++----------
 .../easy/text/reader/CompliantTextBatchReader.java | 301 +++++++++++----------
 .../easy/text/reader/ConstrainedFieldOutput.java   |   7 +-
 .../store/easy/text/reader/FieldVarCharOutput.java |  14 +
 .../easy/text/reader/TextParsingSettings.java      |   3 +-
 .../exec/store/easy/text/reader/TextReader.java    |   7 +-
 .../exec/physical/impl/writer/TestTextWriter.java  |   2 +-
 .../drill/exec/server/rest/TestRestJson.java       |   2 +-
 .../drill/exec/store/DropboxFileSystemTest.java    |   2 +-
 .../drill/exec/store/TestPluginRegistry.java       |   2 +-
 .../store/dfs/TestFormatPluginOptionExtractor.java |   2 +-
 .../store/easy/text/compliant/BaseCsvTest.java     |  13 +-
 .../easy/text/compliant/TestCsvWithHeaders.java    |  40 ++-
 .../easy/text/compliant/TestCsvWithSchema.java     |  90 +++++-
 .../store/easy/text/compliant/TestTextReader.java  |   6 +-
 .../drill/exec/store/mock/TestMockRowReader.java   |  13 +-
 .../drill/exec/util/StoragePluginTestUtils.java    |   2 +-
 23 files changed, 555 insertions(+), 418 deletions(-)

diff --git a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogBatchReader.java b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogBatchReader.java
index 275132a..664fd7c 100644
--- a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogBatchReader.java
+++ b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogBatchReader.java
@@ -22,8 +22,9 @@ import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -31,7 +32,7 @@ import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,36 +41,29 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 
-public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class HttpdLogBatchReader implements ManagedReader {
 
   private static final Logger logger = LoggerFactory.getLogger(HttpdLogBatchReader.class);
   public static final String RAW_LINE_COL_NAME = "_raw";
   public static final String MATCHED_COL_NAME = "_matched";
   private final HttpdLogFormatConfig formatConfig;
-  private final int maxRecords;
-  private final EasySubScan scan;
-  private HttpdParser parser;
-  private FileSplit split;
+  private final HttpdParser parser;
+  private final FileDescrip file;
   private InputStream fsStream;
-  private RowSetLoader rowWriter;
+  private final RowSetLoader rowWriter;
   private BufferedReader reader;
   private int lineNumber;
-  private CustomErrorContext errorContext;
-  private ScalarWriter rawLineWriter;
-  private ScalarWriter matchedWriter;
+  private final CustomErrorContext errorContext;
+  private final ScalarWriter rawLineWriter;
+  private final ScalarWriter matchedWriter;
   private int errorCount;
 
-
-  public HttpdLogBatchReader(HttpdLogFormatConfig formatConfig, int maxRecords, EasySubScan scan) {
+  public HttpdLogBatchReader(HttpdLogFormatConfig formatConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
     this.formatConfig = formatConfig;
-    this.maxRecords = maxRecords;
-    this.scan = scan;
-  }
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
     // Open the input stream to the log file
-    openFile(negotiator);
+    file = negotiator.file();
+    openFile();
     errorContext = negotiator.parentErrorContext();
     try {
       parser = new HttpdParser(
@@ -92,7 +86,6 @@ public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator>
     parser.addFieldsToParser(rowWriter);
     rawLineWriter = addImplicitColumn(RAW_LINE_COL_NAME, MinorType.VARCHAR);
     matchedWriter = addImplicitColumn(MATCHED_COL_NAME, MinorType.BIT);
-    return true;
   }
 
   @Override
@@ -108,11 +101,6 @@ public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator>
   private boolean nextLine(RowSetLoader rowWriter) {
     String line;
 
-    // Check if the limit has been reached
-    if (rowWriter.limitReached(maxRecords)) {
-      return false;
-    }
-
     try {
       line = reader.readLine();
       if (line == null) {
@@ -164,19 +152,19 @@ public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator>
     try {
       fsStream.close();
     } catch (IOException e) {
-      logger.warn("Error when closing HTTPD file: {} {}", split.getPath().toString(), e.getMessage());
+      logger.warn("Error when closing HTTPD file: {} {}", file.split().getPath().toString(), e.getMessage());
     }
     fsStream = null;
   }
 
-  private void openFile(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
+  private void openFile() {
+    Path path = file.split().getPath();
     try {
-      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      fsStream = file.fileSystem().openPossiblyCompressedStream(path);
     } catch (Exception e) {
       throw UserException
         .dataReadError(e)
-        .message("Failed to open open input file: %s", split.getPath().toString())
+        .message("Failed to open open input file: %s", path.toString())
         .addContext(e.getMessage())
         .build(logger);
     }
diff --git a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index ab6d80a..5c04fa3 100644
--- a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -21,12 +21,12 @@ package org.apache.drill.exec.store.httpd;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
@@ -41,18 +41,16 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
   private static class HttpLogReaderFactory extends FileReaderFactory {
 
     private final HttpdLogFormatConfig config;
-    private final int maxRecords;
     private final EasySubScan scan;
 
-    private HttpLogReaderFactory(HttpdLogFormatConfig config, int maxRecords, EasySubScan scan) {
+    private HttpLogReaderFactory(HttpdLogFormatConfig config, EasySubScan scan) {
       this.config = config;
-      this.maxRecords = maxRecords;
       this.scan = scan;
     }
 
     @Override
-    public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
-      return new HttpdLogBatchReader(config, maxRecords, scan);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+      return new HttpdLogBatchReader(config, scan, negotiator);
     }
   }
 
@@ -76,24 +74,15 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
         .readerOperatorType(OPERATOR_TYPE)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
         .build();
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
-    EasySubScan scan, OptionManager options) {
-    return new HttpdLogBatchReader(formatConfig, scan.getMaxRecords(), scan);
-  }
-
-  @Override
-  protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
-    FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
-    builder.setReaderFactory(new HttpLogReaderFactory(formatConfig, scan.getMaxRecords(), scan));
-
-    initScanBuilder(builder, scan);
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
-    return builder;
+    builder.readerFactory(new HttpLogReaderFactory(formatConfig, scan));
   }
 }
+
diff --git a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java
index c31c5ad..4aec836 100644
--- a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java
+++ b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java
@@ -186,9 +186,7 @@ public class HttpdParser {
 
     dummy.addParseTarget(String.class.getMethod("indexOf", String.class), allParserPaths);
 
-    /*
-    If the column is not requested explicitly, remove it from the requested path list.
-     */
+    // If the column is not requested explicitly, remove it from the requested path list.
     if (!isStarQuery() &&
         !isMetadataQuery() &&
         !isOnlyImplicitColumns()) {
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 1ddbb05..dd1f13d 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -69,12 +69,18 @@ import static org.junit.Assert.fail;
  */
 public class TestHttpPlugin extends ClusterTest {
 
-  private static final int MOCK_SERVER_PORT = 8091;
+  // Use high-numbered ports to avoid colliding with other tools on the
+  // build machine.
+  private static final int MOCK_SERVER_PORT = 44332;
   private static String TEST_JSON_RESPONSE;
   private static String TEST_CSV_RESPONSE;
   private static String TEST_XML_RESPONSE;
   private static String TEST_JSON_RESPONSE_WITH_DATATYPES;
 
+  public static String makeUrl(String url) {
+    return String.format(url, MOCK_SERVER_PORT);
+  }
+
   @BeforeClass
   public static void setup() throws Exception {
     startCluster(ClusterFixture.builder(dirTestWatcher));
@@ -149,7 +155,7 @@ public class TestHttpPlugin extends ClusterTest {
     // The connection acts like a schema.
     // Ignores the message body except for data.
     HttpApiConfig mockSchema = HttpApiConfig.builder()
-      .url("http://localhost:8091/json")
+      .url(makeUrl("http://localhost:%d/json"))
       .method("GET")
       .headers(headers)
       .authType("basic")
@@ -166,7 +172,7 @@ public class TestHttpPlugin extends ClusterTest {
     // This is the preferred approach, the base URL contains as much info as possible;
     // all other parameters are specified in SQL. See README for an example.
     HttpApiConfig mockTable = HttpApiConfig.builder()
-      .url("http://localhost:8091/json")
+      .url(makeUrl("http://localhost:%d/json"))
       .method("GET")
       .headers(headers)
       .authType("basic")
@@ -178,14 +184,14 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockPostConfig = HttpApiConfig.builder()
-      .url("http://localhost:8091/")
+      .url(makeUrl("http://localhost:%d/"))
       .method("POST")
       .headers(headers)
       .postBody("key1=value1\nkey2=value2")
       .build();
 
     HttpApiConfig mockPostPushdownWithStaticParams = HttpApiConfig.builder()
-      .url("http://localhost:8091/")
+      .url(makeUrl("http://localhost:%d/"))
       .method("POST")
       .headers(headers)
       .requireTail(false)
@@ -195,7 +201,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockPostPushdown = HttpApiConfig.builder()
-      .url("http://localhost:8091/")
+      .url(makeUrl("http://localhost:%d/"))
       .method("POST")
       .headers(headers)
       .requireTail(false)
@@ -204,7 +210,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockJsonNullBodyPost = HttpApiConfig.builder()
-      .url("http://localhost:8091/")
+      .url(makeUrl("http://localhost:%d/"))
       .method("POST")
       .headers(headers)
       .requireTail(false)
@@ -213,7 +219,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockJsonPostConfig = HttpApiConfig.builder()
-      .url("http://localhost:8091/")
+      .url(makeUrl("http://localhost:%d/"))
       .method("POST")
       .headers(headers)
       .requireTail(false)
@@ -230,7 +236,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockJsonConfigWithPaginator = HttpApiConfig.builder()
-      .url("http://localhost:8091/json")
+      .url(makeUrl("http://localhost:%d/json"))
       .method("get")
       .headers(headers)
       .requireTail(false)
@@ -239,14 +245,14 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockPostConfigWithoutPostBody = HttpApiConfig.builder()
-      .url("http://localhost:8091/")
+      .url(makeUrl("http://localhost:%d/"))
       .method("POST")
       .authType("basic")
       .headers(headers)
       .build();
 
     HttpApiConfig mockCsvConfig = HttpApiConfig.builder()
-      .url("http://localhost:8091/csv")
+      .url(makeUrl("http://localhost:%d/csv"))
       .method("GET")
       .headers(headers)
       .authType("basic")
@@ -257,7 +263,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockCsvConfigWithPaginator = HttpApiConfig.builder()
-      .url("http://localhost:8091/csv")
+      .url(makeUrl("http://localhost:%d/csv"))
       .method("get")
       .paginator(offsetPaginatorForJson)
       .inputType("csv")
@@ -266,7 +272,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockXmlConfig = HttpApiConfig.builder()
-      .url("http://localhost:8091/xml")
+      .url(makeUrl("http://localhost:%d/xml"))
       .method("GET")
       .headers(headers)
       .authType("basic")
@@ -278,7 +284,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockGithubWithParam = HttpApiConfig.builder()
-      .url("http://localhost:8091/orgs/{org}/repos")
+      .url(makeUrl("http://localhost:%d/orgs/{org}/repos"))
       .method("GET")
       .headers(headers)
       .params(Arrays.asList("lat", "lng", "date"))
@@ -287,7 +293,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockGithubWithDuplicateParam = HttpApiConfig.builder()
-      .url("http://localhost:8091/orgs/{org}/repos")
+      .url(makeUrl("http://localhost:%d/orgs/{org}/repos"))
       .method("GET")
       .headers(headers)
       .params(Arrays.asList("org", "lng", "date"))
@@ -296,7 +302,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockGithubWithParamInQuery = HttpApiConfig.builder()
-      .url("http://localhost:8091/orgs/{org}/repos?p1={p1}")
+      .url(makeUrl("http://localhost:%d/orgs/{org}/repos?p1={p1}"))
       .method("GET")
       .headers(headers)
       .params(Arrays.asList("p2", "p3"))
@@ -305,7 +311,7 @@ public class TestHttpPlugin extends ClusterTest {
       .build();
 
     HttpApiConfig mockTableWithJsonOptions = HttpApiConfig.builder()
-      .url("http://localhost:8091/json")
+      .url(makeUrl("http://localhost:%d/json"))
       .method("GET")
       .headers(headers)
       .requireTail(false)
@@ -554,7 +560,7 @@ public class TestHttpPlugin extends ClusterTest {
           .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow("http://localhost:8091/orgs/apache/repos")
+          .addRow(makeUrl("http://localhost:%d/orgs/apache/repos"))
           .build();
 
       RowSetUtilities.verify(expected, results);
@@ -580,7 +586,7 @@ public class TestHttpPlugin extends ClusterTest {
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow("http://localhost:8091/orgs/true/repos")
+        .addRow(makeUrl("http://localhost:%d/orgs/true/repos"))
         .build();
 
       RowSetUtilities.verify(expected, results);
@@ -606,7 +612,7 @@ public class TestHttpPlugin extends ClusterTest {
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow("http://localhost:8091/orgs/1234/repos")
+        .addRow(makeUrl("http://localhost:%d/orgs/1234/repos"))
         .build();
 
       RowSetUtilities.verify(expected, results);
@@ -657,7 +663,7 @@ public class TestHttpPlugin extends ClusterTest {
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow("http://localhost:8091/orgs/apache/repos?org=apache")
+        .addRow(makeUrl("http://localhost:%d/orgs/apache/repos?org=apache"))
         .build();
 
       RowSetUtilities.verify(expected, results);
@@ -683,7 +689,7 @@ public class TestHttpPlugin extends ClusterTest {
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow("http://localhost:8091/orgs/apache/repos?p1=param1&p2=param2")
+        .addRow(makeUrl("http://localhost:%d/orgs/apache/repos?p1=param1&p2=param2"))
         .build();
 
       RowSetUtilities.verify(expected, results);
@@ -845,7 +851,7 @@ public class TestHttpPlugin extends ClusterTest {
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow(200, "OK", "http/1.1", "http://localhost:8091/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02")
+        .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02"))
         .build();
 
       RowSetUtilities.verify(expected, results);
@@ -868,8 +874,8 @@ public class TestHttpPlugin extends ClusterTest {
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow(200, "OK", "http/1.1", "http://localhost:8091/csvcsv?arg1=4")
-        .addRow(200, "OK", "http/1.1", "http://localhost:8091/csvcsv?arg1=4")
+        .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/csvcsv?arg1=4"))
+        .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/csvcsv?arg1=4"))
         .build();
 
       RowSetUtilities.verify(expected, results);
@@ -892,11 +898,11 @@ public class TestHttpPlugin extends ClusterTest {
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
-        .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
-        .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
-        .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
-        .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
+        .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+        .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+        .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+        .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+        .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
         .build();
 
       RowSetUtilities.verify(expected, results);
@@ -1227,7 +1233,7 @@ public class TestHttpPlugin extends ClusterTest {
         .build();
 
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .addRow(404, "Client Error", "http/1.1", "http://localhost:8091/json")
+        .addRow(404, "Client Error", "http/1.1", makeUrl("http://localhost:%d/json"))
         .build();
 
       RowSetUtilities.verify(expected, results);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index b73fa76..160ad8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -36,6 +36,8 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -43,7 +45,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWriter {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
+  private static final Logger logger = LoggerFactory.getLogger(JsonRecordWriter.class);
   private static final String LINE_FEED = String.format("%n");
 
   private Path cleanUpLocation;
@@ -54,8 +56,8 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
   private boolean useExtendedOutput;
 
   private int index;
-  private FileSystem fs = null;
-  private OutputStream stream = null;
+  private FileSystem fs;
+  private OutputStream stream;
 
   private final JsonFactory factory = new JsonFactory();
   private final StorageStrategy storageStrategy;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatConfig.java
new file mode 100644
index 0000000..01ce072
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatConfig.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import com.fasterxml.jackson.annotation.JsonAlias;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+
+@JsonTypeName(TextFormatPlugin.PLUGIN_NAME)
+@JsonInclude(Include.NON_DEFAULT)
+public class TextFormatConfig implements FormatPluginConfig {
+
+  public final List<String> extensions;
+  public final String lineDelimiter;
+  public final char fieldDelimiter;
+  public final char quote;
+  public final char escape;
+  public final char comment;
+  public final boolean skipFirstLine;
+  public final boolean extractHeader;
+
+  @JsonCreator
+  public TextFormatConfig(
+      @JsonProperty("extensions") List<String> extensions,
+      @JsonProperty("lineDelimiter") String lineDelimiter,
+      // Drill 1.17 and before used "delimiter" in the
+      // bootstrap storage plugins file, assume many instances
+      // exist in the field.
+      @JsonAlias("delimiter")
+      @JsonProperty("fieldDelimiter") String fieldDelimiter,
+      @JsonProperty("quote") String quote,
+      @JsonProperty("escape") String escape,
+      @JsonProperty("comment") String comment,
+      @JsonProperty("skipFirstLine") Boolean skipFirstLine,
+      @JsonProperty("extractHeader") Boolean extractHeader) {
+    this.extensions = extensions == null ?
+        ImmutableList.of() : ImmutableList.copyOf(extensions);
+    this.lineDelimiter = Strings.isNullOrEmpty(lineDelimiter) ? "\n" : lineDelimiter;
+    this.fieldDelimiter = Strings.isNullOrEmpty(fieldDelimiter) ? ',' : fieldDelimiter.charAt(0);
+    this.quote = Strings.isNullOrEmpty(quote) ? '"' : quote.charAt(0);
+    this.escape = Strings.isNullOrEmpty(escape) ? '"' : escape.charAt(0);
+    this.comment = Strings.isNullOrEmpty(comment) ? '#' : comment.charAt(0);
+    this.skipFirstLine = skipFirstLine == null ? false : skipFirstLine;
+    this.extractHeader = extractHeader == null ? false : extractHeader;
+  }
+
+  public List<String> getExtensions() { return extensions; }
+  public String getLineDelimiter() { return lineDelimiter; }
+  public char getFieldDelimiter() { return fieldDelimiter; }
+  public char getQuote() { return quote; }
+  public char getEscape() { return escape; }
+  public char getComment() { return comment; }
+  public boolean isSkipFirstLine() { return skipFirstLine; }
+  @JsonProperty("extractHeader")
+  public boolean isHeaderExtractionEnabled() { return extractHeader; }
+
+  /**
+   * Used for JSON serialization to handle \u0000 value which
+   * Jackson converts to a null string.
+   */
+  @JsonProperty("fieldDelimiter")
+  public String firldDeliterString() {
+    return Character.toString(fieldDelimiter);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(extensions, lineDelimiter, fieldDelimiter,
+        quote, escape, comment, skipFirstLine, extractHeader);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    TextFormatConfig other = (TextFormatConfig) obj;
+    return Objects.equals(extensions, other.extensions) &&
+           Objects.equals(lineDelimiter, other.lineDelimiter) &&
+           Objects.equals(fieldDelimiter, other.fieldDelimiter) &&
+           Objects.equals(quote, other.quote) &&
+           Objects.equals(escape, other.escape) &&
+           Objects.equals(comment, other.comment) &&
+           Objects.equals(skipFirstLine, other.skipFirstLine) &&
+           Objects.equals(extractHeader, other.extractHeader);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("extensions", extensions)
+      .field("skipFirstLine", skipFirstLine)
+      .field("extractHeader", extractHeader)
+      .escapedField("fieldDelimiter", fieldDelimiter)
+      .escapedField("lineDelimiter", lineDelimiter)
+      .escapedField("quote", quote)
+      .escapedField("escape", escape)
+      .escapedField("comment", comment)
+      .toString();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index a8b5736..ce8120c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -17,19 +17,9 @@
  */
 package org.apache.drill.exec.store.easy.text;
 
-import com.fasterxml.jackson.annotation.JsonAlias;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.ChildErrorContext;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -39,11 +29,11 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.metadata.Propertied;
@@ -59,15 +49,12 @@ import org.apache.drill.exec.store.easy.text.reader.CompliantTextBatchReader;
 import org.apache.drill.exec.store.easy.text.reader.TextParsingSettings;
 import org.apache.drill.exec.store.easy.text.writer.TextRecordWriter;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
-import org.apache.drill.shaded.guava.com.google.common.base.Strings;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 /**
  * Text format plugin for CSV and other delimited text formats.
@@ -81,9 +68,8 @@ import java.util.Objects;
  * to allow tight control of the size of produced batches (as well
  * as to support provided schema.)
  */
-public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
-
-  private final static String PLUGIN_NAME = "text";
+public class TextFormatPlugin extends EasyFormatPlugin<TextFormatConfig> {
+  final static String PLUGIN_NAME = "text";
 
   public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
 
@@ -111,121 +97,19 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
 
   public static final String WRITER_OPERATOR_TYPE = "TEXT_WRITER";
 
-  @JsonTypeName(PLUGIN_NAME)
-  @JsonInclude(Include.NON_DEFAULT)
-  public static class TextFormatConfig implements FormatPluginConfig {
-
-    public final List<String> extensions;
-    public final String lineDelimiter;
-    public final char fieldDelimiter;
-    public final char quote;
-    public final char escape;
-    public final char comment;
-    public final boolean skipFirstLine;
-    public final boolean extractHeader;
-
-    @JsonCreator
-    public TextFormatConfig(
-        @JsonProperty("extensions") List<String> extensions,
-        @JsonProperty("lineDelimiter") String lineDelimiter,
-        // Drill 1.17 and before used "delimiter" in the
-        // bootstrap storage plugins file, assume many instances
-        // exist in the field.
-        @JsonAlias("delimiter")
-        @JsonProperty("fieldDelimiter") String fieldDelimiter,
-        @JsonProperty("quote") String quote,
-        @JsonProperty("escape") String escape,
-        @JsonProperty("comment") String comment,
-        @JsonProperty("skipFirstLine") Boolean skipFirstLine,
-        @JsonProperty("extractHeader") Boolean extractHeader) {
-      this.extensions = extensions == null ?
-          ImmutableList.of() : ImmutableList.copyOf(extensions);
-      this.lineDelimiter = lineDelimiter == null ? "\n" : lineDelimiter;
-      this.fieldDelimiter = Strings.isNullOrEmpty(fieldDelimiter) ? ',' : fieldDelimiter.charAt(0);
-      this.quote = Strings.isNullOrEmpty(quote) ? '"' : quote.charAt(0);
-      this.escape = Strings.isNullOrEmpty(escape) ? '"' : escape.charAt(0);
-      this.comment = Strings.isNullOrEmpty(comment) ? '#' : comment.charAt(0);
-      this.skipFirstLine = skipFirstLine != null && skipFirstLine;
-      this.extractHeader = extractHeader != null && extractHeader;
-    }
-
-    public TextFormatConfig() {
-      this(null, null, null, null, null, null, null, null);
-    }
-
-    public List<String> getExtensions() { return extensions; }
-    public String getLineDelimiter() { return lineDelimiter; }
-    public char getFieldDelimiter() { return fieldDelimiter; }
-    public char getQuote() { return quote; }
-    public char getEscape() { return escape; }
-    public char getComment() { return comment; }
-    public boolean isSkipFirstLine() { return skipFirstLine; }
-    @JsonProperty("extractHeader")
-    public boolean isHeaderExtractionEnabled() { return extractHeader; }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(extensions, lineDelimiter, fieldDelimiter,
-          quote, escape, comment, skipFirstLine, extractHeader);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (obj == null || getClass() != obj.getClass()) {
-        return false;
-      }
-      TextFormatConfig other = (TextFormatConfig) obj;
-      return Objects.equals(extensions, other.extensions) &&
-             Objects.equals(lineDelimiter, other.lineDelimiter) &&
-             Objects.equals(fieldDelimiter, other.fieldDelimiter) &&
-             Objects.equals(quote, other.quote) &&
-             Objects.equals(escape, other.escape) &&
-             Objects.equals(comment, other.comment) &&
-             Objects.equals(skipFirstLine, other.skipFirstLine) &&
-             Objects.equals(extractHeader, other.extractHeader);
-    }
-
-    @Override
-    public String toString() {
-      return new PlanStringBuilder(this)
-        .field("extensions", extensions)
-        .field("skipFirstLine", skipFirstLine)
-        .field("extractHeader", extractHeader)
-        .escapedField("fieldDelimiter", fieldDelimiter)
-        .escapedField("lineDelimiter", lineDelimiter)
-        .escapedField("quote", quote)
-        .escapedField("escape", escape)
-        .escapedField("comment", comment)
-        .toString();
-    }
-  }
-
-  /**
-   * Builds the readers for the V3 text scan operator.
-   */
-  private static class ColumnsReaderFactory extends FileReaderFactory {
-
+  private static class TextReaderFactory extends FileReaderFactory {
     private final TextParsingSettings settings;
-    private final int maxRecords;
 
-    public ColumnsReaderFactory(TextParsingSettings settings, int maxRecords) {
+    public TextReaderFactory(TextParsingSettings settings) {
       this.settings = settings;
-      this.maxRecords = maxRecords;
     }
 
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-       return new CompliantTextBatchReader(settings, maxRecords);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+       return new CompliantTextBatchReader(negotiator, settings);
     }
   }
 
-  public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
-     this(name, context, fsConf, storageConfig, new TextFormatConfig());
-  }
-
   public TextFormatPlugin(String name, DrillbitContext context,
       Configuration fsConf, StoragePluginConfig config,
       TextFormatConfig formatPluginConfig) {
@@ -243,7 +127,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
         .fsConf(fsConf)
         .defaultName(PLUGIN_NAME)
         .writerOperatorType(WRITER_OPERATOR_TYPE)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
         .build();
   }
@@ -267,19 +151,10 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(
-      OptionManager options, EasySubScan scan) throws ExecutionSetupException {
-    ColumnsScanBuilder builder = new ColumnsScanBuilder();
-    initScanBuilder(builder, scan);
-
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
     TextParsingSettings settings =
         new TextParsingSettings(getConfig(), scan.getSchema());
-    builder.setReaderFactory(new ColumnsReaderFactory(settings, scan.getMaxRecords()));
-
-    // If this format has no headers, or wants to skip them,
-    // then we must use the columns column to hold the data.
-    builder.requireColumnsArray(
-        ! settings.isHeaderExtractionEnabled() && builder.providedSchema() == null);
+    builder.readerFactory(new TextReaderFactory(settings));
 
     // Text files handle nulls in an unusual way. Missing columns
     // are set to required Varchar and filled with blanks. Yes, this
@@ -288,23 +163,18 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     // files have been defined within Drill.
     builder.nullType(Types.required(MinorType.VARCHAR));
 
-    // The text readers use required Varchar columns to represent null columns.
-    builder.allowRequiredNullColumns(true);
-
     // Provide custom error context
     builder.errorContext(
         new ChildErrorContext(builder.errorContext()) {
           @Override
           public void addContext(UserException.Builder builder) {
             super.addContext(builder);
-            builder.addContext("Extract headers:",
+            builder.addContext("Extract headers",
                 Boolean.toString(getConfig().isHeaderExtractionEnabled()));
-            builder.addContext("Skip first line:",
+            builder.addContext("Skip first line",
                 Boolean.toString(getConfig().isSkipFirstLine()));
           }
         });
-
-    return builder;
   }
 
   @Override
@@ -342,14 +212,8 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     for (final CompleteFileWork work : scan.getWorkIterable()) {
       data += work.getTotalBytes();
     }
-
     final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
-
-    if (scan.supportsLimitPushdown() && scan.getLimit() > 0) {
-      return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, scan.getLimit(), 1, data);
-    } else {
-      final double estRowCount = data / estimatedRowSize;
-      return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
-    }
+    final double estRowCount = data / estimatedRowSize;
+    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
index 4c0c698..934b306 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
@@ -17,60 +17,66 @@
  */
 package org.apache.drill.exec.store.easy.text.reader;
 
-import com.univocity.parsers.common.TextParsingException;
-import io.netty.buffer.DrillBuf;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.metadata.TupleSchema;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.exec.vector.accessor.ValueWriter;
 import org.apache.hadoop.mapred.FileSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.InputStream;
+import com.univocity.parsers.common.TextParsingException;
+
+import io.netty.buffer.DrillBuf;
 
 /**
  * Text reader, Complies with the RFC 4180 standard for text/csv files.
  */
-public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNegotiator> {
+public class CompliantTextBatchReader implements ManagedReader {
   private static final Logger logger = LoggerFactory.getLogger(CompliantTextBatchReader.class);
 
+  public static final String COLUMNS_COL = "columns";
   private static final int MAX_RECORDS_PER_BATCH = 8096;
   private static final int READ_BUFFER = 1024 * 1024;
   private static final int WHITE_SPACE_BUFFER = 64 * 1024;
 
   // settings to be used while parsing
   private final TextParsingSettings settings;
-  // Chunk of the file to be read by this reader
-  private FileSplit split;
-  // Limit pushed down from the query
-  private final int maxRecords;
+  private final CustomErrorContext errorContext;
   // text reader implementation
-  private TextReader reader;
+  private final TextReader reader;
   // input buffer
-  private DrillBuf readBuffer;
+  private final DrillBuf readBuffer;
   // working buffer to handle whitespace
-  private DrillBuf whitespaceBuffer;
-  private DrillFileSystem dfs;
+  private final DrillBuf whitespaceBuffer;
 
   private RowSetLoader writer;
 
-  public CompliantTextBatchReader(TextParsingSettings settings, int maxRecords) {
+  /**
+   * Create and open the text reader.
+   * @throws EarlyEofException
+   */
+  public CompliantTextBatchReader(FileSchemaNegotiator schemaNegotiator,
+      TextParsingSettings settings) throws EarlyEofException {
     this.settings = settings;
-    this.maxRecords = maxRecords;
+    this.errorContext = schemaNegotiator.parentErrorContext();
 
     // Validate. Otherwise, these problems show up later as a data
     // read error which is very confusing.
@@ -78,23 +84,11 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
       throw UserException
         .validationError()
         .message("The text format line delimiter cannot be blank.")
+        .addContext(errorContext)
         .build(logger);
     }
-  }
 
-  /**
-   * Performs the initial setup required for the record reader.
-   * Initializes the input stream, handling of the output record batch
-   * and the actual reader to be used.
-   *
-   * @param schemaNegotiator Used to create the schema in the output record batch
-   * @return true if opens successfully, false if output is null
-   */
-  @Override
-  public boolean open(ColumnsSchemaNegotiator schemaNegotiator) {
     final OperatorContext context = schemaNegotiator.context();
-    dfs = schemaNegotiator.fileSystem();
-    split = schemaNegotiator.split();
 
     // Note: DO NOT use managed buffers here. They remain in existence
     // until the fragment is shut down. The buffers here are large.
@@ -102,9 +96,8 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
     // holding onto 1 GB of memory in managed buffers.
     // Instead, we allocate the buffers explicitly, and must free
     // them.
-
-    readBuffer = context.getAllocator().buffer(READ_BUFFER);
-    whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
+    this.readBuffer = context.getAllocator().buffer(READ_BUFFER);
+    this.whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
     schemaNegotiator.batchSize(MAX_RECORDS_PER_BATCH);
 
     // setup Output, Input, and Reader
@@ -116,12 +109,14 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
         output = openWithoutHeaders(schemaNegotiator);
       }
       if (output == null) {
-        return false;
+        throw new EarlyEofException();
       }
-      openReader(output);
-      return true;
+      this.reader = openReader(schemaNegotiator, output);
     } catch (final IOException e) {
-      throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger);
+      throw UserException.dataReadError(e)
+        .addContext("File open failed")
+        .addContext(errorContext)
+        .build(logger);
     }
   }
 
@@ -129,17 +124,14 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
    * Extract header and use that to define the reader schema.
    *
    * @param schemaNegotiator used to define the reader schema
-   * provided schema, if any. Used when using a provided schema
-   * with a text file that contains no headers; ignored for
-   * text file with headers
-   * @return text output
    */
-  private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException {
-    final String [] fieldNames = extractHeader();
+  private TextOutput openWithHeaders(FileSchemaNegotiator schemaNegotiator) throws IOException {
+    validateNoColumnsProjection(schemaNegotiator);
+    final String [] fieldNames = extractHeader(schemaNegotiator);
     if (fieldNames == null) {
       return null;
     }
-    if (schemaNegotiator.hasProvidedSchema()) {
+    if (schemaNegotiator.providedSchema() != null) {
       return buildWithSchema(schemaNegotiator, fieldNames);
     } else {
       return buildFromColumnHeaders(schemaNegotiator, fieldNames);
@@ -150,30 +142,25 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
    * File has headers and a provided schema is provided. Convert from VARCHAR
    * input type to the provided output type, but only if the column is projected.
    */
-  private FieldVarCharOutput buildWithSchema(ColumnsSchemaNegotiator schemaNegotiator,
+  private FieldVarCharOutput buildWithSchema(FileSchemaNegotiator schemaNegotiator,
       String[] fieldNames) {
-    TupleMetadata readerSchema = mergeSchemas(schemaNegotiator.providedSchema(), fieldNames);
-    schemaNegotiator.tableSchema(readerSchema, true);
-    writer = schemaNegotiator.build().writer();
-    StandardConversions conversions = conversions(schemaNegotiator.providedSchema());
-    ValueWriter[] colWriters = new ValueWriter[fieldNames.length];
-    for (int i = 0; i < fieldNames.length; i++) {
-      ScalarWriter colWriter = writer.scalar(fieldNames[i]);
-      if (writer.isProjected()) {
-        colWriters[i] = conversions.converterFor(colWriter, MinorType.VARCHAR);
-      } else {
-        colWriters[i] = colWriter;
-      }
-    }
-    return new FieldVarCharOutput(writer, colWriters);
+
+    TupleMetadata readerSchema = buildSchemaFromHeaders(fieldNames);
+
+    // Build converting column writers
+    FixedReceiver.Builder builder = FixedReceiver.builderFor(schemaNegotiator)
+        .schemaIsComplete();
+    builder.conversionBuilder().blankAs(ColumnMetadata.BLANK_AS_NULL);
+    FixedReceiver receiver = builder.build(readerSchema);
+    writer = receiver.rowWriter();
+    return new FieldVarCharOutput(receiver);
   }
 
-  private TupleMetadata mergeSchemas(TupleMetadata providedSchema,
-      String[] fieldNames) {
-    final TupleMetadata readerSchema = new TupleSchema();
-    for (String fieldName : fieldNames) {
-      final ColumnMetadata providedCol = providedSchema.metadata(fieldName);
-      readerSchema.addColumn(providedCol == null ? textColumn(fieldName) : providedCol);
+  private TupleMetadata buildSchemaFromHeaders(String[] fieldNames) {
+    // Build table schema from headers
+    TupleMetadata readerSchema = new TupleSchema();
+    for (String name : fieldNames) {
+      readerSchema.addColumn(textColumn(name));
     }
     return readerSchema;
   }
@@ -186,13 +173,10 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
    * File has column headers. No provided schema. Build schema from the
    * column headers.
    */
-  private FieldVarCharOutput buildFromColumnHeaders(ColumnsSchemaNegotiator schemaNegotiator,
+  private FieldVarCharOutput buildFromColumnHeaders(FileSchemaNegotiator schemaNegotiator,
       String[] fieldNames) {
-    final TupleMetadata schema = new TupleSchema();
-    for (final String colName : fieldNames) {
-      schema.addColumn(textColumn(colName));
-    }
-    schemaNegotiator.tableSchema(schema, true);
+    TupleMetadata readerSchema = buildSchemaFromHeaders(fieldNames);
+    schemaNegotiator.tableSchema(readerSchema, true);
     writer = schemaNegotiator.build().writer();
     ValueWriter[] colWriters = new ValueWriter[fieldNames.length];
     for (int i = 0; i < fieldNames.length; i++) {
@@ -205,53 +189,112 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
    * When no headers, create a single array column "columns".
    */
   private TextOutput openWithoutHeaders(
-      ColumnsSchemaNegotiator schemaNegotiator) {
-    if (schemaNegotiator.hasProvidedSchema()) {
+      FileSchemaNegotiator schemaNegotiator) {
+    // Treat a property-only schema as no schema
+    TupleMetadata providedSchema = schemaNegotiator.providedSchema();
+    if (providedSchema != null && providedSchema.size() > 0) {
       return buildWithSchema(schemaNegotiator);
     } else {
       return buildColumnsArray(schemaNegotiator);
     }
   }
 
-  private FieldVarCharOutput buildWithSchema(ColumnsSchemaNegotiator schemaNegotiator) {
-    TupleMetadata providedSchema = schemaNegotiator.providedSchema();
-    schemaNegotiator.tableSchema(providedSchema, true);
-    writer = schemaNegotiator.build().writer();
-    StandardConversions conversions = conversions(providedSchema);
-    ValueWriter[] colWriters = new ValueWriter[providedSchema.size()];
-    for (int i = 0; i < colWriters.length; i++) {
-      colWriters[i] = conversions.converterFor(
-          writer.scalar(providedSchema.metadata(i).name()), MinorType.VARCHAR);
+  private FieldVarCharOutput buildWithSchema(FileSchemaNegotiator schemaNegotiator) {
+    validateNoColumnsProjection(schemaNegotiator);
+
+    // Build table schema from provided
+    TupleMetadata readerSchema = new TupleSchema();
+    for (ColumnMetadata providedCol : schemaNegotiator.providedSchema()) {
+      readerSchema.addColumn(textColumn(providedCol.name()));
+    }
+
+    // Build converting column writers
+    FixedReceiver.Builder builder = FixedReceiver.builderFor(schemaNegotiator)
+        .schemaIsComplete();
+    builder.conversionBuilder().blankAs(ColumnMetadata.BLANK_AS_NULL);
+    FixedReceiver receiver = builder.build(readerSchema);
+
+    // Convert to format for this reader
+    writer = receiver.rowWriter();
+    return new ConstrainedFieldOutput(receiver);
+  }
+
+  private void validateNoColumnsProjection(FileSchemaNegotiator schemaNegotiator) {
+    // If we do not require the columns array, then we presume that
+    // the reader does not provide arrays, so any use of the columns[x]
+    // column is likely an error. We rely on the plugin's own error
+    // context to fill in information that would explain the issue
+    // in the context of that plugin.
+
+    ProjectedColumn colProj = schemaNegotiator.projectionFor(COLUMNS_COL);
+    if (colProj != null && colProj.isArray()) {
+      throw UserException
+          .validationError()
+          .message("Unexpected `columns`[x]; file has headers or schema")
+          .addContext(errorContext)
+          .build(logger);
     }
-    return new ConstrainedFieldOutput(writer, colWriters);
   }
 
   private TextOutput buildColumnsArray(
-      ColumnsSchemaNegotiator schemaNegotiator) {
-    schemaNegotiator.tableSchema(ColumnsScanFramework.columnsSchema(), true);
+      FileSchemaNegotiator schemaNegotiator) {
+    ProjectedColumn colProj = schemaNegotiator.projectionFor(COLUMNS_COL);
+    validateColumnsProjection(colProj);
+    schemaNegotiator.tableSchema(columnsSchema(), true);
     writer = schemaNegotiator.build().writer();
-    return new RepeatedVarCharOutput(writer, schemaNegotiator.projectedIndexes());
+    return new RepeatedVarCharOutput(writer,
+        colProj == null ? null : colProj.indexes());
+  }
+
+  private void validateColumnsProjection(ProjectedColumn colProj) {
+    if (colProj == null) {
+      return;
+    }
+
+    // The columns column cannot be a map. That is, the following is
+    // not allowed: columns.foo.
+
+    if (colProj.isMap()) {
+      throw UserException
+        .validationError()
+        .message("Column `%s` has map elements, but must be an array", colProj.name())
+        .addContext(errorContext)
+        .build(logger);
+    }
+
+    if (colProj.isArray()) {
+      int maxIndex = colProj.maxIndex();
+      if (maxIndex > TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) {
+        throw UserException
+          .validationError()
+          .message("`columns`[%d] index out of bounds, max supported size is %d",
+              maxIndex, TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
+          .addContext("Column:", colProj.name())
+          .addContext("Maximum index:", TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
+          .addContext("Actual index:", maxIndex)
+          .addContext(errorContext)
+          .build(logger);
+      }
+    }
   }
 
-  private void openReader(TextOutput output) throws IOException {
+  private TextReader openReader(FileSchemaNegotiator schemaNegotiator, TextOutput output) throws IOException {
+    FileSplit split = schemaNegotiator.file().split();
     logger.trace("Opening file {}", split.getPath());
-    final InputStream stream = dfs.openPossiblyCompressedStream(split.getPath());
+    final InputStream stream = schemaNegotiator.file().open();
     final TextInput input = new TextInput(settings, stream, readBuffer,
         split.getStart(), split.getStart() + split.getLength());
 
     // setup Reader using Input and Output
-    reader = new TextReader(settings, input, output, whitespaceBuffer);
+    TextReader reader = new TextReader(settings, input, output, whitespaceBuffer);
     reader.start();
+    return reader;
   }
 
-  private StandardConversions conversions(TupleMetadata providedSchema) {
-
-    // CSV maps blank columns to nulls (for nullable non-string columns),
-    // or to the default value (for non-nullable non-string columns.)
-    return StandardConversions.builder()
-      .withSchema(providedSchema)
-      .blankAs(ColumnMetadata.BLANK_AS_NULL)
-      .build();
+  public static TupleMetadata columnsSchema() {
+    return new SchemaBuilder()
+      .addArray(COLUMNS_COL, MinorType.VARCHAR)
+      .buildSchema();
   }
 
   /**
@@ -260,31 +303,31 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
    * TODO: enhance to support more common header patterns
    * @return field name strings
    */
-  private String[] extractHeader() throws IOException {
+  private String[] extractHeader(FileSchemaNegotiator schemaNegotiator) throws IOException {
     assert settings.isHeaderExtractionEnabled();
 
     // don't skip header in case skipFirstLine is set true
     settings.setSkipFirstLine(false);
 
+    FileSplit split = schemaNegotiator.file().split();
+    logger.trace("Opening file {}", split.getPath());
+    final InputStream hStream = schemaNegotiator.file().open();
     final HeaderBuilder hOutput = new HeaderBuilder(split.getPath());
 
-    // setup Input using InputStream
     // we should read file header irrespective of split given given to this reader
-    final InputStream hStream = dfs.openPossiblyCompressedStream(split.getPath());
     final TextInput hInput = new TextInput(settings, hStream, readBuffer, 0, split.getLength());
 
-    // setup Reader using Input and Output
-    this.reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer);
-    reader.start();
+    final String [] fieldNames;
+    try (TextReader reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer)) {
+      reader.start();
 
-    // extract first row only
-    reader.parseNext();
+      // extract first row only
+      reader.parseNext();
 
-    // grab the field names from output
-    final String [] fieldNames = hOutput.getHeaders();
+      // grab the field names from output
+      fieldNames = hOutput.getHeaders();
+    }
 
-    // cleanup and set to skip the first line next time we read input
-    reader.close();
     settings.setSkipFirstLine(true);
 
     readBuffer.clear();
@@ -300,14 +343,9 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
   public boolean next() {
     reader.resetForNextBatch();
 
-    // If the limit is defined and the row count is greater than the limit, stop reading the file.
-    if (maxRecords > 0 && writer.rowCount() > maxRecords) {
-      return false;
-    }
-
     try {
       boolean more = false;
-      while (! writer.isFull()) {
+      while (!writer.isFull()) {
         more = reader.parseNext();
         if (! more) {
           break;
@@ -324,8 +362,9 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
         throw (UserException) e.getCause();
       }
       throw UserException.dataReadError(e)
-          .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
-            split.getPath(), reader.getPos())
+          .addContext("Failure while reading file")
+          .addContext("Happened at or shortly before byte position", reader.getPos())
+          .addContext(errorContext)
           .build(logger);
     }
   }
@@ -339,22 +378,8 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
 
     // Release the buffers allocated above. Double-check to handle
     // unexpected multiple calls to close().
-
-    if (readBuffer != null) {
-      readBuffer.release();
-      readBuffer = null;
-    }
-    if (whitespaceBuffer != null) {
-      whitespaceBuffer.release();
-      whitespaceBuffer = null;
-    }
-    try {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    } catch (final IOException e) {
-      logger.warn("Exception while closing stream.", e);
-    }
+    readBuffer.release();
+    whitespaceBuffer.release();
+    AutoCloseables.closeSilently(reader);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/ConstrainedFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/ConstrainedFieldOutput.java
index 5078a57..56b6e7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/ConstrainedFieldOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/ConstrainedFieldOutput.java
@@ -17,8 +17,7 @@
  */
 package org.apache.drill.exec.store.easy.text.reader;
 
-import org.apache.drill.exec.physical.resultSet.RowSetLoader;
-import org.apache.drill.exec.vector.accessor.ValueWriter;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
 
 /**
  * For CSV files without headers, but with a provided schema,
@@ -27,8 +26,8 @@ import org.apache.drill.exec.vector.accessor.ValueWriter;
  */
 public class ConstrainedFieldOutput extends FieldVarCharOutput {
 
-  ConstrainedFieldOutput(RowSetLoader writer, ValueWriter[] colWriters) {
-    super(writer, colWriters);
+  ConstrainedFieldOutput(FixedReceiver receiver) {
+    super(receiver);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
index f8e44f7..f3aa08f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.easy.text.reader;
 
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
@@ -45,6 +46,19 @@ class FieldVarCharOutput extends BaseFieldOutput {
     this.colWriters = colWriters;
   }
 
+  FieldVarCharOutput(FixedReceiver receiver) {
+    this(receiver.rowWriter(), makeWriters(receiver));
+  }
+
+  private static ValueWriter[] makeWriters(FixedReceiver receiver) {
+    final TupleMetadata schema = receiver.rowWriter().tupleSchema();
+    final ValueWriter[] colWriters = new ValueWriter[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      colWriters[i] = receiver.scalar(i);
+    }
+    return colWriters;
+  }
+
   private static boolean[] makeMask(RowSetLoader writer) {
     final TupleMetadata schema = writer.tupleSchema();
     final boolean[] projectionMask = new boolean[schema.size()];
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
index 2028d24..bf0e267 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.store.easy.text.reader;
 
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 
 public class TextParsingSettings {
@@ -40,7 +40,6 @@ public class TextParsingSettings {
   private final boolean parseUnescapedQuotes;
   private final boolean ignoreLeadingWhitespace;
   private final boolean ignoreTrailingWhitespace;
-
   /**
    * Configure the properties for this one scan based on:
    * <p>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
index e3a5edd..c67c681 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
@@ -31,10 +31,10 @@ import java.io.IOException;
  ******************************************************************************/
 
 /**
- * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and
- * DrillBuf support.
+ * A byte-based Text parser implementation. Builds heavily upon the uniVocity
+ * parsers. Customized for UTF8 parsing and DrillBuf support.
  */
-public final class TextReader {
+public final class TextReader implements AutoCloseable {
 
   private static final Logger logger = LoggerFactory.getLogger(TextReader.class);
 
@@ -515,6 +515,7 @@ public final class TextReader {
    * current record reader to clean up state.
    * @throws IOException for input file read errors
    */
+  @Override
   public void close() throws IOException {
     input.close();
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
index 58bdb73..7cccee6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.writer;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
@@ -39,7 +40,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
 import static org.junit.Assert.assertEquals;
 
 public class TestTextWriter extends ClusterTest {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
index 14a3f43..b4d25a0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
@@ -30,7 +30,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
index 317c0c7..b48f29f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.rowSet.RowSetComparison;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
index 16553e4..ab315c6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
@@ -42,7 +42,7 @@ import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
 import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.BaseTest;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index 88a7b94..1a9175b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.RunTimeScan;
 import org.apache.drill.common.scanner.persistence.ScanResult;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
 import org.apache.drill.test.BaseTest;
 import org.junit.Test;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
index ff45968..6919481 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 
@@ -148,6 +148,17 @@ public class BaseCsvTest extends ClusterTest {
     return fileName;
   }
 
+  protected String buildBiggishFile() throws IOException {
+    String fileName = "biggish.csv";
+    try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) {
+      out.println("id");
+      for (int i = 0; i < 100; i++) {
+        out.println(i + 1);
+      }
+    }
+    return fileName;
+  }
+
   protected static final String FILE_N_NAME = "file%d.csv";
 
   protected static String buildTable(String tableName, String[]...fileContents) throws IOException {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index a0bfbbf..ec90618 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -115,7 +115,7 @@ public class TestCsvWithHeaders extends BaseCsvTest {
    * <p>
    * Prior research revealed that most DB engines can handle a null
    * empty result set: no schema, no rows. For example:
-   * <br><tt>SELECT * FROM VALUES ();</tt><br>
+   * <br>{@code SELECT * FROM VALUES ()}<br>
    * The implementation tested here follows that pattern.
    *
    * @see TestCsvWithoutHeaders#testEmptyFile()
@@ -127,7 +127,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     assertNull(rowSet);
 
     // Try again with COUNT(*)
-
     long count = client.queryBuilder().sql(COUNT_STAR, EMPTY_FILE).singletonLong();
     assertEquals(0, count);
   }
@@ -343,7 +342,7 @@ public class TestCsvWithHeaders extends BaseCsvTest {
   }
 
   /**
-   * V3 allows the use of partition columns, even for a non-partitioned file.
+   * Drill allows the use of partition columns, even for a non-partitioned file.
    * The columns are null of type Nullable VARCHAR. This is area of Drill
    * is a bit murky: it seems reasonable to support partition columns consistently
    * rather than conditionally based on the structure of the input.
@@ -603,8 +602,8 @@ public class TestCsvWithHeaders extends BaseCsvTest {
       client.queryBuilder().sql(sql, COLUMNS_FILE_NAME).run();
     } catch (UserRemoteException e) {
       assertTrue(e.getMessage().contains(
-          "VALIDATION ERROR: Unexpected `columns`[x]; columns array not enabled"));
-      assertTrue(e.getMessage().contains("Format plugin: text"));
+          "VALIDATION ERROR: Unexpected `columns`[x]; file has headers or schema"));
+      assertTrue(e.getMessage().contains("Format plugin type: text"));
       assertTrue(e.getMessage().contains("Plugin config name: csv"));
       assertTrue(e.getMessage().contains("Extract headers: true"));
       assertTrue(e.getMessage().contains("Skip first line: false"));
@@ -639,8 +638,8 @@ public class TestCsvWithHeaders extends BaseCsvTest {
       // Note: this error is caught before reading any tables,
       // so no table information is available.
       assertTrue(e.getMessage().contains(
-          "VALIDATION ERROR: Unexpected `columns`[x]; columns array not enabled"));
-      assertTrue(e.getMessage().contains("Format plugin: text"));
+          "VALIDATION ERROR: Unexpected `columns`[x]; file has headers or schema"));
+      assertTrue(e.getMessage().contains("Format plugin type: text"));
       assertTrue(e.getMessage().contains("Plugin config name: csv"));
       assertTrue(e.getMessage().contains("Extract headers: true"));
       assertTrue(e.getMessage().contains("Skip first line: false"));
@@ -685,6 +684,33 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     RowSetUtilities.verify(expected, actual);
   }
 
+  /**
+   * Test the LIMIT operation. No way to verify here that pushdown worked,
+   * do that by setting a breakpoint.
+   */
+  @Test
+  public void testLimit() throws IOException {
+    String fileName = buildBiggishFile();
+    {
+      String stmt = makeStatement(fileName) + " LIMIT 10";
+      RowSet actual = client.queryBuilder().sql(stmt).rowSet();
+      assertEquals(10, actual.rowCount());
+      RowSetReader reader = actual.reader();
+      reader.next();
+      assertEquals("1", reader.scalar("id").getString());
+      actual.clear();
+    }
+    {
+      String stmt = makeStatement(fileName) + " LIMIT 10 OFFSET 20";
+      RowSet actual = client.queryBuilder().sql(stmt).rowSet();
+      assertEquals(10, actual.rowCount());
+      RowSetReader reader = actual.reader();
+      reader.next();
+      assertEquals("21", reader.scalar("id").getString());
+      actual.clear();
+    }
+  }
+
   private String makeStatement(String fileName) {
     return "SELECT * FROM `dfs.data`.`" + fileName + "`";
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
index 7358d59..bc8e93a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
@@ -101,6 +101,62 @@ public class TestCsvWithSchema extends BaseCsvTest {
   }
 
   /**
+   * An empty file with a schema is valid. Although there is no header line,
+   * we've provided a schema so we know about at least those columns.
+   * In the no-schema case, we return no batches at all. In the with-schema
+   * case, we return an empty batch with schema.
+   * <p>
+   * The analogy is:
+   * <br>{@code SELECT * FROM foo WHERE 1 = 0}<br>
+   * The implementation tested here follows that pattern.
+   * @throws Exception
+   *
+   * @see TestCsvWithoutHeaders#testEmptyFile()
+   * @see TestCsvWithHeaders#testEmptyFile()
+   */
+  @Test
+  public void testEmptyFile() throws Exception {
+    String tablePath = buildTable("empty", new String[] {});
+    try {
+      enableSchemaSupport();
+      String schemaSql = "create schema (a VARCHAR) " +
+          "for table " + tablePath;
+      run(schemaSql);
+      String sql = "SELECT * FROM " + tablePath;
+      RowSet actual = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addNullable("a", MinorType.VARCHAR)
+          .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  @Test
+  public void testEmptyFileInline() throws Exception {
+    String tablePath = buildTable("empty", new String[] {});
+    try {
+      enableSchemaSupport();
+      String sql = "SELECT * FROM TABLE(" + tablePath +
+          "(schema=>'inline=(`a` VARCHAR)'))";
+      RowSet actual = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addNullable("a", MinorType.VARCHAR)
+          .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  /**
    * Test the simplest possible case: a table with one file:
    * <ul>
    * <li>Column in projection, table, and schema</li>
@@ -139,6 +195,36 @@ public class TestCsvWithSchema extends BaseCsvTest {
     }
   }
 
+  @Test
+  public void testBasicInlineSchema() throws Exception {
+    String tablePath = buildTable("basic2", basicFileContents);
+
+    try {
+      enableSchemaSupport();
+      String sql =
+          "SELECT `intcol`, `datecol`, `str`, `dub`, `extra`, `missing`\n" +
+          " FROM TABLE(" + tablePath +
+          " (schema=>'inline=(intcol int not null, datecol date not null, " +
+          "     `dub` double not null, `extra` bigint not null default ''20'')'))";
+      RowSet actual = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("intcol", MinorType.INT)      // Has a schema
+          .add("datecol", MinorType.DATE)    // Has a schema
+          .add("str", MinorType.VARCHAR)     // No schema, retains type
+          .add("dub", MinorType.FLOAT8)      // Has a schema
+          .add("extra", MinorType.BIGINT)    // No data, has default value
+          .add("missing", MinorType.VARCHAR) // No data, no schema, default type
+          .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(10, LocalDate.of(2019, 3, 20), "it works!", 1234.5D, 20L, "")
+          .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
   /**
    * Show that the projection framework generates a reasonable default value
    * if told to create a required column that does not exist. In this case,
@@ -1024,7 +1110,7 @@ public class TestCsvWithSchema extends BaseCsvTest {
           "col_int integer not null default '10', " +
           "col_bigint bigint not null default '10', " +
           "col_double double not null default '10.5', " +
-          "col_float float not null default '10.5f', " +
+          "col_float float not null default '10.5', " +
           "col_var varchar not null default 'foo', " +
           "col_boolean boolean not null default '1', " +
           "col_interval interval not null default 'P10D', " +
@@ -1053,7 +1139,7 @@ public class TestCsvWithSchema extends BaseCsvTest {
       LocalDate ld = LocalDate.of(2019, 3, 28);
       Instant ts = LocalDateTime.of(ld, lt).toInstant(ZoneOffset.UTC);
       RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-          .addRow(10, 10L, 10.5, 10.5f, "foo", true, new Period(0).plusDays(10),
+          .addRow(10, 10L, 10.5, 10.5D, "foo", true, new Period(0).plusDays(10),
               lt, ld, ts, "1")
           .build();
       RowSetUtilities.verify(expected, actual);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java
index 3798206..11a085b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java
@@ -41,7 +41,7 @@ public class TestTextReader extends BaseTestQuery {
         .go();
   }
 
-  @Ignore ("Not needed any more. (DRILL-3178)")
+  @Ignore("Not needed any more. (DRILL-3178)")
   @Test
   public void ensureFailureOnNewLineDelimiterWithinQuotes() {
     try {
@@ -53,6 +53,10 @@ public class TestTextReader extends BaseTestQuery {
   }
 
   @Test
+  @Ignore("Query succeeds with EVF V2")
+  // Test should be modified for some other case. Note that this test is a bit
+  // inadequate: there are many places that can throw a validation error, all
+  // should be tested.
   public void ensureColumnNameDisplayedinError() throws Exception {
     final String COL_NAME = "col1";
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
index cdf0e8c..ad207ef 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
@@ -49,13 +49,14 @@ import org.junit.experimental.categories.Category;
 
 /**
  * Tests the mock data source directly by wrapping it in a mock
- * scan operator, without the rest of Drill.
+ * scan operator, without the rest of Drill. A side effect is that this
+ * also tests the scan mechanism itself.
  */
-
 @Category({RowSetTests.class, UnlikelyTest.class})
 public class TestMockRowReader extends SubOperatorTest {
 
-  private static ScanFixture buildScan(MockSubScanPOP config, List<ManagedReader<SchemaNegotiator>> readers) {
+  private static ScanFixture buildScan(MockSubScanPOP config,
+      List<ManagedReader<SchemaNegotiator>> readers) {
     BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
     List<SchemaPath> projList = new ArrayList<>();
     projList.add(SchemaPath.STAR_COLUMN);
@@ -67,7 +68,6 @@ public class TestMockRowReader extends SubOperatorTest {
   /**
    * Test the most basic case: required integers and strings.
    */
-
   @Test
   public void testBasics() {
     int rowCount = 10;
@@ -117,7 +117,6 @@ public class TestMockRowReader extends SubOperatorTest {
    * including filling values with nulls at some percentage, 25% by
    * default.
    */
-
   @Test
   public void testOptional() {
     int rowCount = 10;
@@ -165,7 +164,6 @@ public class TestMockRowReader extends SubOperatorTest {
   /**
    * Test a repeated column.
    */
-
   @Test
   public void testColumnRepeat() {
     int rowCount = 10;
@@ -214,7 +212,6 @@ public class TestMockRowReader extends SubOperatorTest {
   /**
    * Verify limit on individual batch size (limiting row count per batch).
    */
-
   @Test
   public void testBatchSize() {
     int rowCount = 20;
@@ -259,7 +256,6 @@ public class TestMockRowReader extends SubOperatorTest {
   /**
    * Test a mock varchar column large enough to cause vector overflow.
    */
-
   @Test
   public void testOverflow() {
     int rowCount = ValueVector.MAX_ROW_COUNT;
@@ -271,7 +267,6 @@ public class TestMockRowReader extends SubOperatorTest {
     MockSubScanPOP config = new MockSubScanPOP("dummy", true, Collections.singletonList(entry));
 
     ManagedReader<SchemaNegotiator> reader = new ExtendedMockBatchReader(entry);
-    @SuppressWarnings("unchecked")
     List<ManagedReader<SchemaNegotiator>> readers = Collections.singletonList(reader);
 
     // Create options and the scan operator
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
index 2cd1a0c..9ca931a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
 
 import org.apache.drill.exec.store.easy.sequencefile.SequenceFileFormatConfig;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
 
 /**
  * Utility methods to speed up tests.