You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/05/24 17:35:31 UTC

[drill] branch master updated: DRILL-7181: Improve V3 text reader (row set) error messages

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f67e95  DRILL-7181: Improve V3 text reader (row set) error messages
9f67e95 is described below

commit 9f67e95a91824a617785764f1bfbf1faa768526d
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Wed May 15 19:35:19 2019 -0700

    DRILL-7181: Improve V3 text reader (row set) error messages
    
    Adds an error context to the User Error mechanism. The context allows
    information to be passed through an intermediate layer and applied when
    errors are raised in lower-level code; without the need for that
    low-level code to know the details of the error context information.
    
    Modifies the scan framework and V3 text plugin to use the framework to
    improve error messages.
    
    Refines how the `columns` column can be used with the text reader. If
    headers are used, then `columns` is just another column. An error is
    raised, however, if `columns[x]` is used when headers are enabled.
    
    Added another builder abstraction where a constructor argument list
    became too long.
    
    Added the drill file system and split to the file schema negotiator
    to simplify reader construction.
    
    Added additional unit tests to fully define the `columns` column
    behavior.
---
 .../drill/common/exceptions/ChildErrorContext.java |  41 ++++++++
 .../common/exceptions/CustomErrorContext.java      |  44 ++++++++
 .../drill/common/exceptions/EmptyErrorContext.java |  26 +++++
 .../drill/common/exceptions/UserException.java     |   7 ++
 .../impl/scan/columns/ColumnsArrayParser.java      |  40 ++++++--
 .../physical/impl/scan/file/FileScanFramework.java |  69 +++++++++++--
 .../impl/scan/framework/ManagedScanFramework.java  |   3 +
 .../impl/scan/framework/SchemaNegotiator.java      |  16 +++
 .../impl/scan/framework/SchemaNegotiatorImpl.java  |  16 +++
 .../impl/scan/framework/ShimBatchReader.java       |   3 +-
 .../scan/project/ExplicitSchemaProjection.java     |  21 ++--
 .../scan/project/ReaderSchemaOrchestrator.java     |   8 ++
 .../impl/scan/project/ScanLevelProjection.java     | 106 ++++++++++++++++---
 .../impl/scan/project/ScanSchemaOrchestrator.java  |  42 ++++++--
 .../exec/physical/rowSet/ResultSetLoader.java      |   8 ++
 .../exec/physical/rowSet/impl/ColumnBuilder.java   |  33 ++----
 .../exec/physical/rowSet/impl/OptionBuilder.java   |  16 ++-
 .../physical/rowSet/impl/ResultSetLoaderImpl.java  |  13 ++-
 .../rowSet/impl/SchemaTransformerImpl.java         |   6 +-
 .../exec/store/easy/text/TextFormatPlugin.java     |  25 ++++-
 .../compliant/v3/CompliantTextBatchReader.java     |  18 ++--
 .../exec/physical/impl/scan/TestColumnsArray.java  |  41 +++++++-
 .../impl/scan/TestColumnsArrayFramework.java       |   9 +-
 .../physical/impl/scan/TestColumnsArrayParser.java |  40 ++++----
 .../impl/scan/TestFileMetadataColumnParser.java    |  35 +++----
 .../impl/scan/TestFileMetadataProjection.java      |  12 +--
 .../physical/impl/scan/TestFileScanFramework.java  |   5 +-
 .../scan/project/TestReaderLevelProjection.java    |  28 ++---
 .../impl/scan/project/TestScanLevelProjection.java |  22 ++--
 .../impl/scan/project/TestSchemaSmoothing.java     |  30 +++---
 .../exec/physical/rowSet/impl/RowSetTestUtils.java |   2 +-
 .../impl/TestResultSetLoaderTypeConversion.java    |   3 +-
 .../easy/text/compliant/TestCsvWithHeaders.java    | 113 ++++++++++++++++++++-
 .../easy/text/compliant/TestCsvWithoutHeaders.java |  60 +++++++++--
 .../drill/exec/record/metadata/ProjectionType.java |  17 ++++
 35 files changed, 783 insertions(+), 195 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/exceptions/ChildErrorContext.java b/common/src/main/java/org/apache/drill/common/exceptions/ChildErrorContext.java
new file mode 100644
index 0000000..eb9bcc7
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/exceptions/ChildErrorContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.exceptions;
+
+/**
+ * Represents an additional level of error context detail that
+ * adds to that provided by some outer context. Done via composition
+ * rather than subclassing to allow many child contexts to work with
+ * many parent contexts.
+ */
+
+public class ChildErrorContext implements CustomErrorContext {
+
+  private final CustomErrorContext parent;
+
+  public ChildErrorContext(CustomErrorContext parent) {
+    this.parent = parent;
+  }
+
+  @Override
+  public void addContext(UserException.Builder builder) {
+    if (parent != null) {
+      parent.addContext(builder);
+    }
+  }
+}
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/CustomErrorContext.java b/common/src/main/java/org/apache/drill/common/exceptions/CustomErrorContext.java
new file mode 100644
index 0000000..b9ad5e6
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/exceptions/CustomErrorContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.exceptions;
+
+/**
+ * Generic mechanism to pass error context throughout the row set
+ * mechanism and scan framework. The idea is to encapsulate error
+ * context information in an object that can be passed around, rather
+ * than having to pass the error information directly. In many cases, the
+ * same mechanisms are called from multiple contexts, making it hard to
+ * generalize the information that would be required. By hiding that
+ * information in an error context, the caller decides what to add to
+ * the error, the intermediate classes just pass along this opaque
+ * context.
+ * <p>
+ * In some cases, such as file scans within a scan operator, there can be
+ * multiple levels of context. A format plugin, say, can describe the
+ * plugin and any interesting options. Then, a file scan can create a child
+ * context that adds things like file name, split offset, etc.
+ * <p>
+ * If this proves useful elsewhere, it can be moved into the same
+ * package as UserError, and a new <tt>addContext()</tt> method added
+ * to the <tt>UserException.Builder</tt> to make the error context
+ * easier to use.
+ */
+
+public interface CustomErrorContext {
+  void addContext(UserException.Builder builder);
+}
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/EmptyErrorContext.java b/common/src/main/java/org/apache/drill/common/exceptions/EmptyErrorContext.java
new file mode 100644
index 0000000..9589714
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/exceptions/EmptyErrorContext.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.exceptions;
+
+import org.apache.drill.common.exceptions.UserException.Builder;
+
+public class EmptyErrorContext implements CustomErrorContext {
+
+  @Override
+  public void addContext(Builder builder) { }
+}
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 2aeb5cd..eccdb8e 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -531,6 +531,13 @@ public class UserException extends DrillRuntimeException {
       return this;
     }
 
+    public Builder addContext(CustomErrorContext context) {
+      if (context != null) {
+        context.addContext(this);
+      }
+      return this;
+    }
+
     /**
      * pushes a string value to the top of the context
      *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index 383de52..1737271 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -18,10 +18,10 @@
 package org.apache.drill.exec.physical.impl.scan.columns;
 
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
 import org.apache.drill.exec.physical.impl.scan.project.ColumnProjection;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
-import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
 import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
 import org.apache.drill.exec.store.easy.text.compliant.v3.TextReader;
@@ -92,7 +92,24 @@ public class ColumnsArrayParser implements ScanProjectionParser {
 
   @Override
   public boolean parse(RequestedColumn inCol) {
-    if (requireColumnsArray && inCol.isWildcard()) {
+    if (! requireColumnsArray) {
+
+      // If we do not require the columns array, then we presume that
+      // the reader does not provide arrays, so any use of the columns[x]
+      // column is likely an error. We rely on the plugin's own error
+      // context to fill in information that would explain the issue
+      // in the context of that plugin.
+
+      if (inCol.isArray()) {
+        throw UserException
+            .validationError()
+            .message("Unexpected `columns`[x]; columns array not enabled")
+            .addContext(builder.context())
+            .build(logger);
+      }
+      return false;
+    }
+    if (inCol.isWildcard()) {
       createColumnsCol(
           new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL));
       return true;
@@ -107,7 +124,8 @@ public class ColumnsArrayParser implements ScanProjectionParser {
     if (inCol.isTuple()) {
       throw UserException
         .validationError()
-        .message("{} has map elements, but cannot be a map", inCol.name())
+        .message("Column `%s` has map elements, but must be an array", inCol.name())
+        .addContext(builder.context())
         .build(logger);
     }
 
@@ -116,11 +134,12 @@ public class ColumnsArrayParser implements ScanProjectionParser {
       if (maxIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
         throw UserException
           .validationError()
-          .message(String.format(
-              "`columns`[%d] index out of bounds, max supported size is %d",
-              maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS))
-          .addContext("Column", inCol.name())
-          .addContext("Maximum index", TextReader.MAXIMUM_NUMBER_COLUMNS)
+          .message("`columns`[%d] index out of bounds, max supported size is %d",
+              maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS)
+          .addContext("Column:", inCol.name())
+          .addContext("Maximum index:", TextReader.MAXIMUM_NUMBER_COLUMNS)
+          .addContext("Actual index:", maxIndex)
+          .addContext(builder.context())
           .build(logger);
       }
     }
@@ -149,14 +168,17 @@ public class ColumnsArrayParser implements ScanProjectionParser {
       if (columnsArrayCol != null) {
         throw UserException
           .validationError()
-          .message("Cannot select columns[] and other table columns. Column alias incorrectly used in the WHERE clause?")
+          .message("Cannot select columns[] and other table columns. "+
+              "Column alias incorrectly used in the WHERE clause?")
           .addContext("Column name", col.name())
+          .addContext(builder.context())
           .build(logger);
       }
       if (requireColumnsArray) {
         throw UserException
           .validationError()
           .message("Only `columns` column is allowed. Found: " + col.name())
+          .addContext(builder.context())
           .build(logger);
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
index e1b3374..f69aa40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
@@ -22,7 +22,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.exceptions.UserException.Builder;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
@@ -73,6 +76,20 @@ public class FileScanFramework extends ManagedScanFramework {
    */
 
   public interface FileSchemaNegotiator extends SchemaNegotiator {
+
+    /**
+     * Gives the Drill file system for this operator.
+     */
+
+    DrillFileSystem fileSystem();
+
+    /**
+     * Describes the file split (path and block offset) for this scan.
+     *
+     * @return Hadoop file split object with the file path, block
+     * offset, and length.
+     */
+    FileSplit split();
   }
 
   /**
@@ -84,8 +101,39 @@ public class FileScanFramework extends ManagedScanFramework {
   public static class FileSchemaNegotiatorImpl extends SchemaNegotiatorImpl
       implements FileSchemaNegotiator {
 
-    public FileSchemaNegotiatorImpl(ManagedScanFramework framework) {
+    private final FileSplit split;
+
+    public FileSchemaNegotiatorImpl(FileScanFramework framework) {
       super(framework);
+      this.split = framework.currentSplit;
+      context = new FileRowSetContext(parentErrorContext(), split);
+    }
+
+    @Override
+    public DrillFileSystem fileSystem() {
+      return ((FileScanFramework) framework).dfs;
+    }
+
+    @Override
+    public FileSplit split() { return split; }
+  }
+
+  public static class FileRowSetContext extends ChildErrorContext {
+
+    private final FileSplit split;
+
+    public FileRowSetContext(CustomErrorContext parent, FileSplit split) {
+      super(parent);
+      this.split = split;
+    }
+
+    @Override
+    public void addContext(Builder builder) {
+      super.addContext(builder);
+      builder.addContext("File:", Path.getPathWithoutSchemeAndAuthority(split.getPath()).toString());
+      if (split.getStart() != 0) {
+        builder.addContext("Offset:", split.getStart());
+      }
     }
   }
 
@@ -113,9 +161,17 @@ public class FileScanFramework extends ManagedScanFramework {
     }
   }
 
+  /**
+   * Iterates over the splits for the present scan. For each, creates a
+   * new reader. The file framework passes the file split (and the Drill
+   * file system) in via the schema negotiator at open time. This protocol
+   * makes clear that the constructor for the reader should do nothing;
+   * work should be done in the open() call.
+   */
+
   public abstract static class FileReaderFactory implements ReaderFactory {
 
-    protected FileScanFramework fileFramework;
+    private FileScanFramework fileFramework;
 
     @Override
     public void bind(ManagedScanFramework baseFramework) {
@@ -124,16 +180,13 @@ public class FileScanFramework extends ManagedScanFramework {
 
     @Override
     public ManagedReader<? extends SchemaNegotiator> next() {
-      FileSplit split = fileFramework.nextSplit();
-      if (split == null) {
+      if (fileFramework.nextSplit() == null) {
         return null;
       }
-      return newReader(split);
+      return newReader();
     }
 
-    protected DrillFileSystem fileSystem() { return fileFramework.dfs; }
-
-    public abstract ManagedReader<? extends FileSchemaNegotiator> newReader(FileSplit split);
+    public abstract ManagedReader<? extends FileSchemaNegotiator> newReader();
   }
 
   private FileMetadataManager metadataManager;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
index 394e2d9..52203da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.scan.framework;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
 import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents;
@@ -170,6 +171,8 @@ public class ManagedScanFramework implements ScanOperatorEvents {
     return scanOrchestrator;
   }
 
+  public CustomErrorContext errorContext() { return builder.errorContext(); }
+
   protected void configure() { }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index dead9cb..bc303ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -17,8 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.scan.framework;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 /**
@@ -59,6 +61,12 @@ public interface SchemaNegotiator {
   OperatorContext context();
 
   /**
+   * Specify an advanced error context which allows the reader to
+   * fill in custom context values.
+   */
+  void setErrorContext(CustomErrorContext context);
+
+  /**
    * Specify the table schema if this is an early-schema reader. Need
    * not be called for a late-schema readers. The schema provided here,
    * if any, is a base schema: the reader is free to discover additional
@@ -110,4 +118,12 @@ public interface SchemaNegotiator {
    */
 
   boolean isProjectionEmpty();
+
+  /**
+   * The context to use as a parent when creating a custom context.
+   * <p>
+   * (Obtain the error context for this reader from the
+   * {@link ResultSetLoader}.
+   */
+  CustomErrorContext parentErrorContext();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
index a8e02cd..ab70734 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.scan.framework;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -56,6 +57,7 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
 
   protected final ManagedScanFramework framework;
   private NegotiatorListener listener;
+  protected CustomErrorContext context;
   protected TupleMetadata tableSchema;
   protected boolean isSchemaComplete;
   protected int batchSize = ValueVector.MAX_ROW_COUNT;
@@ -74,6 +76,20 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
   }
 
   @Override
+  public CustomErrorContext parentErrorContext() {
+    return framework.errorContext();
+  }
+
+  public CustomErrorContext errorContext() {
+    return context;
+  }
+
+  @Override
+  public void setErrorContext(CustomErrorContext context) {
+    this.context = context;
+  }
+
+  @Override
   public void setTableSchema(TupleMetadata schema, boolean isComplete) {
     tableSchema = schema;
     this.isSchemaComplete = schema != null && isComplete;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
index d3407d3..37a23a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
@@ -197,7 +197,8 @@ public class ShimBatchReader implements RowBatchReader, NegotiatorListener {
   public ResultSetLoader build(SchemaNegotiatorImpl schemaNegotiator) {
     this.schemaNegotiator = schemaNegotiator;
     readerOrchestrator.setBatchSize(schemaNegotiator.batchSize);
-    tableLoader = readerOrchestrator.makeTableLoader(schemaNegotiator.tableSchema);
+    tableLoader = readerOrchestrator.makeTableLoader(schemaNegotiator.errorContext(),
+        schemaNegotiator.tableSchema);
     return tableLoader;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
index 16f0035..ce39411 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -44,16 +44,18 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 public class ExplicitSchemaProjection extends ReaderLevelProjection {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplicitSchemaProjection.class);
 
+  private final ScanLevelProjection scanProj;
+
   public ExplicitSchemaProjection(ScanLevelProjection scanProj,
       TupleMetadata readerSchema,
       ResolvedTuple rootTuple,
       List<ReaderProjectionResolver> resolvers) {
     super(resolvers);
-    resolveRootTuple(scanProj, rootTuple, readerSchema);
+    this.scanProj = scanProj;
+    resolveRootTuple(rootTuple, readerSchema);
   }
 
-  private void resolveRootTuple(ScanLevelProjection scanProj,
-      ResolvedTuple rootTuple,
+  private void resolveRootTuple(ResolvedTuple rootTuple,
       TupleMetadata readerSchema) {
     for (ColumnProjection col : scanProj.columns()) {
       if (col instanceof UnresolvedColumn) {
@@ -117,8 +119,10 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
       throw UserException
         .validationError()
         .message("Project list implies a map column, but actual column is not a map")
-        .addContext("Projected column", requestedCol.fullName())
-        .addContext("Actual type", column.type().name())
+        .addContext("Projected column:", requestedCol.fullName())
+        .addContext("Table column:", column.name())
+        .addContext("Type:", column.type().name())
+        .addContext(scanProj.context())
         .build(logger);
     }
 
@@ -172,8 +176,11 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
       throw UserException
         .validationError()
         .message("Project list implies an array, but actual column is not an array")
-        .addContext("Projected column", requestedCol.fullName())
-        .addContext("Actual cardinality", column.mode().name())
+        .addContext("Projected column:", requestedCol.fullName())
+        .addContext("Table column:", column.name())
+        .addContext("Type:", column.type().name())
+        .addContext("Actual cardinality:", column.mode().name())
+        .addContext(scanProj.context())
         .build(logger);
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
index 5a2c651..49a39f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.scan.project;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder.NullBuilderBuilder;
 import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
@@ -25,6 +26,7 @@ import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 
 /**
  * Orchestrates projection tasks for a single reader within the set that the
@@ -60,12 +62,18 @@ public class ReaderSchemaOrchestrator implements VectorSource {
     }
   }
 
+  @VisibleForTesting
   public ResultSetLoader makeTableLoader(TupleMetadata readerSchema) {
+    return makeTableLoader(scanOrchestrator.scanProj.context(), readerSchema);
+  }
+
+  public ResultSetLoader makeTableLoader(CustomErrorContext errorContext, TupleMetadata readerSchema) {
     OptionBuilder options = new OptionBuilder();
     options.setRowCountLimit(Math.min(readerBatchSize, scanOrchestrator.options.scanBatchRecordLimit));
     options.setVectorCache(scanOrchestrator.vectorCache);
     options.setBatchSizeLimit(scanOrchestrator.options.scanBatchByteLimit);
     options.setSchemaTransform(scanOrchestrator.options.schemaTransformer);
+    options.setContext(errorContext);
 
     // Set up a selection list if available and is a subset of
     // table columns. (Only needed for non-wildcard queries.)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
index aed590f..7718119 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.scan.project;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedSchemaColumn;
@@ -31,6 +32,7 @@ import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 
 /**
  * Parses and analyzes the projection list passed to the scanner. The
@@ -224,8 +226,67 @@ public class ScanLevelProjection {
     void build();
   }
 
+  public static class Builder {
+    private List<SchemaPath> projectionList;
+    private List<ScanProjectionParser> parsers = new ArrayList<>();
+    private TupleMetadata outputSchema;
+    /**
+     * Context used with error messages.
+     */
+    protected CustomErrorContext errorContext;
+
+    /**
+     * Specify the set of columns in the SELECT list. Since the column list
+     * comes from the query planner, assumes that the planner has checked
+     * the list for syntax and uniqueness.
+     *
+     * @param queryCols list of columns in the SELECT list in SELECT list order
+     * @return this builder
+     */
+    public Builder projection(List<SchemaPath> projectionList) {
+      this.projectionList = projectionList;
+      return this;
+    }
+
+    public Builder parsers(List<ScanProjectionParser> parsers) {
+      this.parsers.addAll(parsers);
+      return this;
+    }
+
+    public Builder outputSchema(TupleMetadata outputSchema) {
+      this.outputSchema = outputSchema;
+      return this;
+    }
+
+    public Builder context(CustomErrorContext context) {
+      this.errorContext = context;
+      return this;
+    }
+
+    public ScanLevelProjection build() {
+      return new ScanLevelProjection(this);
+    }
+
+    public TupleMetadata outputSchema( ) {
+      return outputSchema == null || outputSchema.size() == 0
+          ? null : outputSchema;
+    }
+
+    public List<SchemaPath> projectionList() {
+      if (projectionList == null) {
+        projectionList = new ArrayList<>();
+        projectionList.add(SchemaPath.STAR_COLUMN);
+      }
+      return projectionList;
+    }
+  }
+
   // Input
 
+  /**
+   * Context used with error messages.
+   */
+  protected final CustomErrorContext errorContext;
   protected final List<SchemaPath> projectionList;
   protected final TupleMetadata outputSchema;
 
@@ -257,27 +318,42 @@ public class ScanLevelProjection {
   protected RequestedTuple readerProjection;
   protected ScanProjectionType projectionType = ScanProjectionType.EMPTY;
 
+  private ScanLevelProjection(Builder builder) {
+    this.projectionList = builder.projectionList();
+    this.parsers = builder.parsers;
+    this.outputSchema = builder.outputSchema();
+    this.errorContext = builder.errorContext;
+    doParse();
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
   /**
-   * Specify the set of columns in the SELECT list. Since the column list
-   * comes from the query planner, assumes that the planner has checked
-   * the list for syntax and uniqueness.
-   *
-   * @param queryCols list of columns in the SELECT list in SELECT list order
-   * @return this builder
+   * Builder shortcut, primarily for tests.
    */
-  public ScanLevelProjection(List<SchemaPath> projectionList,
+  @VisibleForTesting
+  public static ScanLevelProjection build(List<SchemaPath> projectionList,
       List<ScanProjectionParser> parsers) {
-    this(projectionList, parsers, null);
+    return new Builder()
+        .projection(projectionList)
+        .parsers(parsers)
+        .build();
   }
 
-  public ScanLevelProjection(List<SchemaPath> projectionList,
+  /**
+   * Builder shortcut, primarily for tests.
+   */
+  @VisibleForTesting
+  public static ScanLevelProjection build(List<SchemaPath> projectionList,
       List<ScanProjectionParser> parsers,
       TupleMetadata outputSchema) {
-    this.projectionList = projectionList;
-    this.parsers = parsers;
-    this.outputSchema = outputSchema == null || outputSchema.size() == 0
-        ? null : outputSchema;
-    doParse();
+    return new Builder()
+        .projection(projectionList)
+        .parsers(parsers)
+        .outputSchema(outputSchema)
+        .build();
   }
 
   private void doParse() {
@@ -484,6 +560,8 @@ public class ScanLevelProjection {
     }
   }
 
+  public CustomErrorContext context() { return errorContext; }
+
   /**
    * Return the set of columns from the SELECT list
    * @return the SELECT list columns, in SELECT list order
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index 23b4b1b..a315a3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -22,11 +22,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
 import org.apache.drill.exec.physical.impl.scan.project.ReaderLevelProjection.ReaderProjectionResolver;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
 import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
 import org.apache.drill.exec.physical.rowSet.impl.SchemaTransformer;
 import org.apache.drill.exec.physical.rowSet.impl.SchemaTransformerImpl;
@@ -170,6 +171,11 @@ public class ScanSchemaOrchestrator {
     private Map<String, String> conversionProps;
 
     /**
+     * Context for error messages.
+     */
+    private CustomErrorContext context;
+
+    /**
      * Specify an optional metadata manager. Metadata is a set of constant
      * columns with per-reader values. For file-based sources, this is usually
      * the implicit and partition columns; but it could be other items for other
@@ -258,6 +264,14 @@ public class ScanSchemaOrchestrator {
       }
       conversionProps.put(key, value);
     }
+
+    public void setContext(CustomErrorContext context) {
+      this.context = context;
+    }
+
+    public CustomErrorContext errorContext() {
+      return context;
+    }
   }
 
   public static class ScanSchemaOptions {
@@ -286,6 +300,11 @@ public class ScanSchemaOrchestrator {
     public final boolean allowRequiredNullColumns;
     public final SchemaTransformer schemaTransformer;
 
+    /**
+     * Context for error messages.
+     */
+    public final CustomErrorContext context;
+
     protected ScanSchemaOptions(ScanOrchestratorBuilder builder) {
       nullType = builder.nullType;
       scanBatchRecordLimit = builder.scanBatchRecordLimit;
@@ -294,13 +313,15 @@ public class ScanSchemaOrchestrator {
       schemaResolvers = builder.schemaResolvers;
       projection = builder.projection;
       useSchemaSmoothing = builder.useSchemaSmoothing;
+      context = builder.context;
       boolean allowRequiredNulls = builder.allowRequiredNullColumns;
       if (builder.schemaTransformer != null) {
         // Use client-provided conversions
         schemaTransformer = builder.schemaTransformer;
       } else if (builder.outputSchema != null) {
         // Use only implicit conversions
-        schemaTransformer = new SchemaTransformerImpl(builder.outputSchema, builder.conversionProps);
+        schemaTransformer = new SchemaTransformerImpl(
+            builder.outputSchema, builder.conversionProps);
         if (builder.outputSchema.getBooleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)) {
           allowRequiredNulls = true;
         }
@@ -333,14 +354,14 @@ public class ScanSchemaOrchestrator {
    * vectors rather than vector instances, this cache can be deprecated.
    */
 
-  ResultVectorCacheImpl vectorCache;
-  ScanLevelProjection scanProj;
+  protected final ResultVectorCacheImpl vectorCache;
+  protected final ScanLevelProjection scanProj;
   private ReaderSchemaOrchestrator currentReader;
-  SchemaSmoother schemaSmoother;
+  protected final SchemaSmoother schemaSmoother;
 
   // Output
 
-  VectorContainer outputContainer;
+  protected VectorContainer outputContainer;
 
   public ScanSchemaOrchestrator(BufferAllocator allocator, ScanOrchestratorBuilder builder) {
     this.allocator = allocator;
@@ -376,9 +397,16 @@ public class ScanSchemaOrchestrator {
     if (options.schemaTransformer != null) {
       outputSchema = options.schemaTransformer.outputSchema();
     }
-    scanProj = new ScanLevelProjection(options.projection, options.parsers, outputSchema);
+    scanProj = ScanLevelProjection.builder()
+        .projection(options.projection)
+        .parsers(options.parsers)
+        .outputSchema(outputSchema)
+        .context(builder.errorContext())
+        .build();
     if (scanProj.projectAll() && options.useSchemaSmoothing) {
       schemaSmoother = new SchemaSmoother(scanProj, options.schemaResolvers);
+    } else {
+      schemaSmoother = null;
     }
 
     // Build the output container.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
index ec10610..fa42d98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
@@ -17,9 +17,11 @@
  */
 package org.apache.drill.exec.physical.rowSet;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 
 /**
  * Builds a result set (series of zero or more row sets) based on a defined
@@ -42,6 +44,12 @@ public interface ResultSetLoader {
   public static final int DEFAULT_ROW_COUNT = BaseValueVector.INITIAL_VALUE_ALLOCATION;
 
   /**
+   * Context for error messages.
+   */
+
+  CustomErrorContext context();
+
+  /**
    * Current schema version. The version increments by one each time
    * a column is added.
    * @return the current schema version
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
index adf2362..88da7f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.rowSet.impl;
 
 import java.util.ArrayList;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -120,9 +121,11 @@ public class ColumnBuilder {
   }
 
   private final SchemaTransformer schemaTransformer;
+  private final CustomErrorContext context;
 
-  public ColumnBuilder(SchemaTransformer schemaTransformer) {
+  public ColumnBuilder(SchemaTransformer schemaTransformer, CustomErrorContext context) {
     this.schemaTransformer = schemaTransformer;
+    this.context = context;
   }
 
   /**
@@ -252,27 +255,13 @@ public class ColumnBuilder {
 
   private void incompatibleProjection(ProjectionType projType,
       ColumnMetadata columnSchema) {
-    StringBuilder buf = new StringBuilder()
-      .append("Projection of type ");
-    switch (projType) {
-    case ARRAY:
-      buf.append("array (a[n])");
-      break;
-    case TUPLE:
-      buf.append("tuple (a.x)");
-      break;
-    case TUPLE_ARRAY:
-      buf.append("tuple array (a[n].x");
-      break;
-    default:
-      throw new IllegalStateException("Unexpected projection type: " + projType);
-    }
-    buf.append(" is not compatible with column `")
-      .append(columnSchema.name())
-      .append("` of type ")
-      .append(Types.getSqlTypeName(columnSchema.majorType()));
-    throw UserException.validationError()
-      .message(buf.toString())
+    throw UserException
+      .validationError()
+      .message("Incompatible projection type and data type for column `%s`", columnSchema.name())
+      .addContext("Column:", columnSchema.name())
+      .addContext("Type:", Types.getSqlTypeName(columnSchema.majorType()))
+      .addContext("Projection type:", projType.label())
+      .addContext(context)
       .build(logger);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
index 0a6ac35..0079f50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
@@ -19,11 +19,13 @@ package org.apache.drill.exec.physical.rowSet.impl;
 
 import java.util.Collection;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
@@ -43,6 +45,11 @@ public class OptionBuilder {
   protected long maxBatchSize;
   protected SchemaTransformer schemaTransformer;
 
+  /**
+   * Error message context
+   */
+  protected CustomErrorContext errorContext;
+
   public OptionBuilder() {
     // Start with the default option values.
     ResultSetOptions options = new ResultSetOptions();
@@ -143,8 +150,13 @@ public class OptionBuilder {
     return this;
   }
 
-  // TODO: No setter for vector length yet: is hard-coded
-  // at present in the value vector.
+  /**
+   * Provides context for error messages.
+   */
+  public OptionBuilder setContext(CustomErrorContext context) {
+    this.errorContext = context;
+    return this;
+  }
 
   public ResultSetOptions build() {
     Preconditions.checkArgument(projection == null || projectionSet == null);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
index 1ad9304..ab2bc48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
@@ -53,6 +54,11 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
     protected final long maxBatchSize;
     protected final SchemaTransformer schemaTransformer;
 
+    /**
+     * Context for error messages.
+     */
+    protected final CustomErrorContext errorContext;
+
     public ResultSetOptions() {
       vectorSizeLimit = ValueVector.MAX_BUFFER_SIZE;
       rowCountLimit = DEFAULT_ROW_COUNT;
@@ -61,6 +67,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
       schema = null;
       maxBatchSize = -1;
       schemaTransformer = null;
+      errorContext = null;
     }
 
     public ResultSetOptions(OptionBuilder builder) {
@@ -70,6 +77,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
       schema = builder.schema;
       maxBatchSize = builder.maxBatchSize;
       schemaTransformer = builder.schemaTransformer;
+      errorContext = builder.errorContext;
 
       // If projection, build the projection map.
       // The caller might have already built the map. If so,
@@ -287,7 +295,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
     if (schemaTransformer == null) {
       schemaTransformer = new DefaultSchemaTransformer(null);
     }
-    columnBuilder = new ColumnBuilder(schemaTransformer);
+    columnBuilder = new ColumnBuilder(schemaTransformer, options.errorContext);
 
     // Set the projections
 
@@ -845,4 +853,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
 
   @Override
   public ColumnBuilder columnBuilder() { return columnBuilder; }
+
+  @Override
+  public CustomErrorContext context() { return options.errorContext; }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java
index 1812fa3..4df02e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.rowSet.impl;
 import java.util.Map;
 
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -155,8 +156,9 @@ public class SchemaTransformerImpl implements SchemaTransformer {
         if (defn.conversionClass == null) {
           throw UserException.validationError()
             .message("Runtime type conversion not available")
-            .addContext("Input type", inputSchema.typeString())
-            .addContext("Output type", outputCol.typeString())
+            .addContext("Column:", outputCol.name())
+            .addContext("Input type:", inputSchema.typeString())
+            .addContext("Output type:", outputCol.typeString())
             .build(logger);
         }
         factory = StandardConversions.factory(defn.conversionClass, properties);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 0d6f262..f764c38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -35,12 +37,12 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.MetadataProviderManager;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
 import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
@@ -63,6 +65,7 @@ import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.drill.exec.vector.accessor.convert.AbstractConvertFromString;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -73,7 +76,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
 public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
   private final static String PLUGIN_NAME = "text";
@@ -190,11 +192,10 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     }
 
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader(
-        FileSplit split) {
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
       TextParsingSettingsV3 settings = new TextParsingSettingsV3();
       settings.set(plugin.getConfig());
-      return new CompliantTextBatchReader(split, fileSystem(), settings);
+      return new CompliantTextBatchReader(settings);
     }
   }
 
@@ -216,6 +217,20 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
       ColumnsScanBuilder builder = new ColumnsScanBuilder();
       builder.setReaderFactory(new ColumnsReaderFactory(textPlugin));
 
+      // Provide custom error context
+      builder.setContext(
+          new CustomErrorContext() {
+            @Override
+            public void addContext(UserException.Builder builder) {
+              builder.addContext("Format plugin:", PLUGIN_NAME);
+              builder.addContext("Plugin config name:", textPlugin.getName());
+              builder.addContext("Extract headers:",
+                  Boolean.toString(textPlugin.getConfig().isHeaderExtractionEnabled()));
+              builder.addContext("Skip headers:",
+                  Boolean.toString(textPlugin.getConfig().isSkipFirstLine()));
+            }
+          });
+
       // If this format has no headers, or wants to skip them,
       // then we must use the columns column to hold the data.
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
index 323f793..54143ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
@@ -51,21 +51,19 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
   // settings to be used while parsing
   private final TextParsingSettingsV3 settings;
   // Chunk of the file to be read by this reader
-  private final FileSplit split;
+  private FileSplit split;
   // text reader implementation
   private TextReader reader;
   // input buffer
   private DrillBuf readBuffer;
   // working buffer to handle whitespaces
   private DrillBuf whitespaceBuffer;
-  private final DrillFileSystem dfs;
+  private DrillFileSystem dfs;
 
   private RowSetLoader writer;
 
-  public CompliantTextBatchReader(FileSplit split, DrillFileSystem dfs, TextParsingSettingsV3 settings) {
-    this.split = split;
+  public CompliantTextBatchReader(TextParsingSettingsV3 settings) {
     this.settings = settings;
-    this.dfs = dfs;
 
     // Validate. Otherwise, these problems show up later as a data
     // read error which is very confusing.
@@ -82,13 +80,15 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
    * Performs the initial setup required for the record reader.
    * Initializes the input stream, handling of the output record batch
    * and the actual reader to be used.
-   * @param context  operator context from which buffer's will be allocated and managed
+   * @param errorContext  operator context from which buffer's will be allocated and managed
    * @param outputMutator  Used to create the schema in the output record batch
    */
 
   @Override
   public boolean open(ColumnsSchemaNegotiator schemaNegotiator) {
     final OperatorContext context = schemaNegotiator.context();
+    dfs = schemaNegotiator.fileSystem();
+    split = schemaNegotiator.split();
 
     // Note: DO NOT use managed buffers here. They remain in existence
     // until the fragment is shut down. The buffers here are large.
@@ -99,10 +99,6 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
 
     readBuffer = context.getAllocator().buffer(READ_BUFFER);
     whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
-
-    // TODO: Set this based on size of record rather than
-    // absolute count.
-
     schemaNegotiator.setBatchSize(MAX_RECORDS_PER_BATCH);
 
     // setup Output, Input, and Reader
@@ -173,7 +169,7 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
    * @return field name strings
    */
 
-  private String [] extractHeader() throws IOException {
+  private String[] extractHeader() throws IOException {
     assert settings.isHeaderExtractionEnabled();
 
     // don't skip header in case skipFirstLine is set true
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
index 8caece4..7c9dd87 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
@@ -35,13 +35,13 @@ import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
  * Test the "columns" array mechanism integrated with the scan schema
@@ -251,11 +251,11 @@ public class TestColumnsArray extends SubOperatorTest {
     mock.scanner.close();
   }
 
-  private ScanSchemaOrchestrator buildScan(List<SchemaPath> cols) {
+  private ScanSchemaOrchestrator buildScan(boolean requireColumns, List<SchemaPath> cols) {
 
     // Set up the columns array manager
 
-    ColumnsArrayManager colsManager = new ColumnsArrayManager(false);
+    ColumnsArrayManager colsManager = new ColumnsArrayManager(requireColumns);
 
     // Configure the schema orchestrator
 
@@ -273,7 +273,7 @@ public class TestColumnsArray extends SubOperatorTest {
 
   @Test
   public void testMissingColumnsColumn() {
-    ScanSchemaOrchestrator scanner = buildScan(
+    ScanSchemaOrchestrator scanner = buildScan(true,
         RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL));
 
     TupleMetadata tableSchema = new SchemaBuilder()
@@ -294,7 +294,7 @@ public class TestColumnsArray extends SubOperatorTest {
 
   @Test
   public void testNotRepeated() {
-    ScanSchemaOrchestrator scanner = buildScan(
+    ScanSchemaOrchestrator scanner = buildScan(true,
         RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL));
 
     TupleMetadata tableSchema = new SchemaBuilder()
@@ -312,4 +312,35 @@ public class TestColumnsArray extends SubOperatorTest {
 
     scanner.close();
   }
+
+  /**
+   * Verify that if the columns column is not required, that `columns`
+   * is treated like any other column.
+   */
+  @Test
+  public void testReqularCol() {
+    ScanSchemaOrchestrator scanner = buildScan(false,
+        RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL));
+
+    TupleMetadata tableSchema = new SchemaBuilder()
+        .add(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR)
+        .buildSchema();
+
+    ReaderSchemaOrchestrator reader = scanner.startReader();
+    ResultSetLoader rsLoader = reader.makeTableLoader(tableSchema);
+    reader.defineSchema();
+
+    reader.startBatch();
+    rsLoader.writer()
+      .addRow("fred");
+    reader.endBatch();
+
+    SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
+      .addRow("fred")
+      .build();
+
+    RowSetUtilities.verify(expected,
+        fixture.wrap(scanner.output()));
+    scanner.close();
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
index 2095f04..00f169f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
@@ -46,13 +46,12 @@ import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 
 /**
@@ -72,11 +71,9 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
     }
 
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader(
-        FileSplit split) {
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
       DummyColumnsReader reader = readerIter.next();
       assert reader != null;
-      assert split.getPath().equals(reader.filePath());
       return reader;
     }
   }
@@ -199,6 +196,7 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
     ColumnsScanFixtureBuilder builder = new ColumnsScanFixtureBuilder();
     builder.setProjection(RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL));
     builder.addReader(reader);
+    builder.builder.requireColumnsArray(true);
     ScanFixture scanFixture = builder.build();
     ScanOperatorExec scan = scanFixture.scanOp;
 
@@ -234,6 +232,7 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
         SchemaPath.parseFromString(ColumnsArrayManager.COLUMNS_COL + "[1]"),
         SchemaPath.parseFromString(ColumnsArrayManager.COLUMNS_COL + "[3]")));
     builder.addReader(reader);
+    builder.builder.requireColumnsArray(true);
     ScanFixture scanFixture = builder.build();
     ScanOperatorExec scan = scanFixture.scanOp;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
index 4a49033..4429853 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
@@ -34,11 +34,11 @@ import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 @Category(RowSetTests.class)
 public class TestColumnsArrayParser extends SubOperatorTest {
@@ -51,9 +51,9 @@ public class TestColumnsArrayParser extends SubOperatorTest {
 
   @Test
   public void testColumnsArray() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL),
-        ScanTestUtils.parsers(new ColumnsArrayParser(false)));
+        ScanTestUtils.parsers(new ColumnsArrayParser(true)));
 
     assertFalse(scanProj.projectAll());
     assertEquals(1, scanProj.requestedCols().size());
@@ -68,7 +68,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
 
   @Test
   public void testRequiredColumnsArray() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL),
         ScanTestUtils.parsers(new ColumnsArrayParser(true)));
 
@@ -85,7 +85,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
 
   @Test
   public void testRequiredWildcard() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers(new ColumnsArrayParser(true)));
 
@@ -105,9 +105,9 @@ public class TestColumnsArrayParser extends SubOperatorTest {
 
     // Sic: case variation of standard name
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("Columns"),
-        ScanTestUtils.parsers(new ColumnsArrayParser(false)));
+        ScanTestUtils.parsers(new ColumnsArrayParser(true)));
 
     assertFalse(scanProj.projectAll());
     assertEquals(1, scanProj.requestedCols().size());
@@ -123,11 +123,11 @@ public class TestColumnsArrayParser extends SubOperatorTest {
   @Test
   public void testColumnsElements() {
 
-   ScanLevelProjection scanProj = new ScanLevelProjection(
+   ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(
             ColumnsArrayManager.COLUMNS_COL + "[3]",
             ColumnsArrayManager.COLUMNS_COL + "[1]"),
-        ScanTestUtils.parsers(new ColumnsArrayParser(false)));
+        ScanTestUtils.parsers(new ColumnsArrayParser(true)));
 
     assertFalse(scanProj.projectAll());
     assertEquals(2, scanProj.requestedCols().size());
@@ -158,9 +158,9 @@ public class TestColumnsArrayParser extends SubOperatorTest {
   @Test
   public void testErrorColumnsArrayAndColumn() {
     try {
-      new ScanLevelProjection(
+      ScanLevelProjection.build(
           RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL, "a"),
-          ScanTestUtils.parsers(new ColumnsArrayParser(false)));
+          ScanTestUtils.parsers(new ColumnsArrayParser(true)));
       fail();
     } catch (UserException e) {
       // Expected
@@ -174,9 +174,9 @@ public class TestColumnsArrayParser extends SubOperatorTest {
   @Test
   public void testErrorColumnAndColumnsArray() {
     try {
-      new ScanLevelProjection(
+      ScanLevelProjection.build(
           RowSetTestUtils.projectList("a", ColumnsArrayManager.COLUMNS_COL),
-          ScanTestUtils.parsers(new ColumnsArrayParser(false)));
+          ScanTestUtils.parsers(new ColumnsArrayParser(true)));
       fail();
     } catch (UserException e) {
       // Expected
@@ -190,7 +190,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
   @Test
   public void testErrorTwoColumnsArray() {
     try {
-      new ScanLevelProjection(
+      ScanLevelProjection.build(
           RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL, ColumnsArrayManager.COLUMNS_COL),
           ScanTestUtils.parsers(new ColumnsArrayParser(false)));
       fail();
@@ -202,7 +202,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
   @Test
   public void testErrorRequiredAndExtra() {
     try {
-      new ScanLevelProjection(
+      ScanLevelProjection.build(
           RowSetTestUtils.projectList("a"),
           ScanTestUtils.parsers(new ColumnsArrayParser(true)));
       fail();
@@ -214,7 +214,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
   @Test
   public void testColumnsIndexTooLarge() {
     try {
-      new ScanLevelProjection(
+      ScanLevelProjection.build(
           RowSetTestUtils.projectCols(SchemaPath.parseFromString("columns[70000]")),
           ScanTestUtils.parsers(new ColumnsArrayParser(true)));
       fail();
@@ -243,11 +243,11 @@ public class TestColumnsArrayParser extends SubOperatorTest {
         fixture.getOptionManager(),
         standardOptions(filePath));
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL,
             ColumnsArrayManager.COLUMNS_COL,
             ScanTestUtils.SUFFIX_COL),
-        ScanTestUtils.parsers(new ColumnsArrayParser(false),
+        ScanTestUtils.parsers(new ColumnsArrayParser(true),
             metadataManager.projectionParser()));
 
     assertFalse(scanProj.projectAll());
@@ -277,7 +277,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
 
   @Test
   public void testWildcardAndColumns() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(
             SchemaPath.DYNAMIC_STAR,
             ColumnsArrayManager.COLUMNS_COL),
@@ -297,7 +297,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
   @Test
   public void testColumnsAsMap() {
     try {
-        new ScanLevelProjection(
+        ScanLevelProjection.build(
           RowSetTestUtils.projectList("columns.x"),
           ScanTestUtils.parsers(new ColumnsArrayParser(true)));
         fail();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
index f2b6c3b..7075c90 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
@@ -20,24 +20,25 @@ package org.apache.drill.exec.physical.impl.scan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+
 import java.util.List;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
-import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
-import org.apache.drill.exec.physical.impl.scan.project.ColumnProjection;
-import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedWildcardColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ColumnProjection;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 @Category(RowSetTests.class)
 public class TestFileMetadataColumnParser extends SubOperatorTest {
@@ -63,7 +64,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
 
     // Simulate SELECT a, b, c ...
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a", "b", "c"),
         Lists.newArrayList(metadataManager.projectionParser()));
 
@@ -87,7 +88,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
 
     // Simulate SELECT a, fqn, filEPath, filename, suffix ...
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a",
             ScanTestUtils.FULLY_QUALIFIED_NAME_COL,
             "filEPath", // Sic, to test case sensitivity
@@ -133,7 +134,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
     // is preferred over "natural" name.
     String dir1 = "DIR1";
     String dir2 = ScanTestUtils.partitionColName(2);
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(dir2, dir1, dir0, "a"),
         Lists.newArrayList(metadataManager.projectionParser()));
 
@@ -159,7 +160,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         standardOptions(filePath));
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         Lists.newArrayList(metadataManager.projectionParser()));
 
@@ -185,7 +186,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         options);
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         Lists.newArrayList(metadataManager.projectionParser()));
 
@@ -215,7 +216,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         options);
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(
             SchemaPath.DYNAMIC_STAR,
             ScanTestUtils.FILE_NAME_COL,
@@ -246,7 +247,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         options);
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(
             ScanTestUtils.FILE_NAME_COL,
             SchemaPath.DYNAMIC_STAR,
@@ -278,7 +279,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         standardOptions(filePath));
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
             ScanTestUtils.partitionColName(8)),
         Lists.newArrayList(metadataManager.projectionParser()));
@@ -299,7 +300,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         options);
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
             ScanTestUtils.partitionColName(8)),
         Lists.newArrayList(metadataManager.projectionParser()));
@@ -325,7 +326,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         options);
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
             ScanTestUtils.partitionColName(8)),
         Lists.newArrayList(metadataManager.projectionParser()));
@@ -358,7 +359,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         options);
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
             ScanTestUtils.partitionColName(1)),
         Lists.newArrayList(metadataManager.projectionParser()));
@@ -382,7 +383,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         options);
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
             ScanTestUtils.partitionColName(1)),
         Lists.newArrayList(metadataManager.projectionParser()));
@@ -409,7 +410,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         fixture.getOptionManager(),
         standardOptions(filePath));
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(
             ScanTestUtils.FILE_NAME_COL + ".a",
             ScanTestUtils.FILE_PATH_COL + "[0]",
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
index 6ef69cf..53fd9c3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
@@ -32,9 +32,10 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadata;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
 import org.apache.drill.exec.physical.impl.scan.file.MetadataColumn;
 import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
+import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
 import org.apache.drill.exec.physical.impl.scan.project.ColumnProjection;
 import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection;
 import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
@@ -42,15 +43,14 @@ import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder.NullBu
 import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
 import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
-import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 @Category(RowSetTests.class)
 public class TestFileMetadataProjection extends SubOperatorTest {
@@ -179,7 +179,7 @@ public class TestFileMetadataProjection extends SubOperatorTest {
         fixture.getOptionManager(),
         standardOptions(filePath));
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(
             ScanTestUtils.FILE_NAME_COL,
             "a",
@@ -264,7 +264,7 @@ public class TestFileMetadataProjection extends SubOperatorTest {
         fixture.getOptionManager(),
         standardOptions(filePath));
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(
           "a",
           ScanTestUtils.FULLY_QUALIFIED_NAME_COL,
@@ -314,7 +314,7 @@ public class TestFileMetadataProjection extends SubOperatorTest {
         fixture.getOptionManager(),
         standardOptions(filePath));
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("dir11"),
         ScanTestUtils.parsers(metadataManager.projectionParser()));
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index 29ee78c..96ce354 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -50,7 +50,6 @@ import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -115,11 +114,9 @@ public class TestFileScanFramework extends SubOperatorTest {
     }
 
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader(
-        FileSplit split) {
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
       MockFileReader reader = readerIter.next();
       assert reader != null;
-      assert split.getPath().equals(reader.filePath());
       return reader;
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestReaderLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestReaderLevelProjection.java
index 0ad2bc4..b760b34 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestReaderLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestReaderLevelProjection.java
@@ -30,10 +30,10 @@ import java.util.List;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
 import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder.NullBuilderBuilder;
 import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
-import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -59,7 +59,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
   @Test
   public void testWildcard() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
     assertEquals(1, scanProj.columns().size());
@@ -100,7 +100,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT c, b, a ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("c", "b", "a"),
         ScanTestUtils.parsers());
     assertEquals(3, scanProj.columns().size());
@@ -144,7 +144,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT c, v, b, w ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("c", "v", "b", "w"),
         ScanTestUtils.parsers());
     assertEquals(4, scanProj.columns().size());
@@ -195,7 +195,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT c, a ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("c", "a"),
         ScanTestUtils.parsers());
     assertEquals(2, scanProj.columns().size());
@@ -236,7 +236,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT c, a ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("b"),
         ScanTestUtils.parsers());
     assertEquals(1, scanProj.columns().size());
@@ -273,7 +273,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT a, b.c.d ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a", "b.c.d"),
         ScanTestUtils.parsers());
     assertEquals(2, scanProj.columns().size());
@@ -343,7 +343,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT a.c, a.d, a.e.f ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("x", "a.c", "a.d", "a.e.f", "y"),
         ScanTestUtils.parsers());
     assertEquals(3, scanProj.columns().size());
@@ -444,7 +444,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT a.b, a.c ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a.b", "a.c"),
         ScanTestUtils.parsers());
     assertEquals(1, scanProj.columns().size());
@@ -489,7 +489,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT a.b ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a.b"),
         ScanTestUtils.parsers());
 
@@ -522,7 +522,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT a[0] ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a[0]"),
         ScanTestUtils.parsers());
 
@@ -558,7 +558,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT a[0] ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a[0]"),
         ScanTestUtils.parsers());
 
@@ -594,7 +594,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT * ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers(),
         outputSchema);
@@ -654,7 +654,7 @@ public class TestReaderLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT * ...
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers(),
         outputSchema);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
index 4dd7a1d..a803bfc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -63,7 +63,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
     // Simulate SELECT a, b, c ...
     // Build the projection plan and verify
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a", "b", "c"),
         ScanTestUtils.parsers());
 
@@ -106,7 +106,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
   @Test
   public void testMap() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
         ScanTestUtils.parsers());
 
@@ -155,7 +155,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
   @Test
   public void testArray() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a[1]", "a[3]"),
         ScanTestUtils.parsers());
 
@@ -199,7 +199,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
   @Test
   public void testWildcard() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -238,7 +238,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
   @Test
   public void testEmptyProjection() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(),
         ScanTestUtils.parsers());
 
@@ -264,7 +264,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
   @Test
   public void testWildcardAndColumns() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
           RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"),
           ScanTestUtils.parsers());
 
@@ -293,7 +293,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
   @Test
   public void testColumnAndWildcard() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
           RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR),
           ScanTestUtils.parsers());
 
@@ -313,7 +313,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
   @Test
   public void testErrorTwoWildcards() {
     try {
-      new ScanLevelProjection(
+      ScanLevelProjection.build(
           RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR),
           ScanTestUtils.parsers());
       fail();
@@ -328,7 +328,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
     // Simulate SELECT a
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a"),
         ScanTestUtils.parsers(),
         outputSchema);
@@ -347,7 +347,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
         .add("b", MinorType.BIGINT)
         .buildSchema();
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers(),
         outputSchema);
@@ -378,7 +378,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
         .buildSchema();
     outputSchema.setProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, Boolean.TRUE.toString());
 
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers(),
         outputSchema);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
index 2887dc5..22d7431 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -39,13 +39,13 @@ import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
  * Tests schema smoothing at the schema projection level.
@@ -117,7 +117,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
     // Set up the scan level projection
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"),
         ScanTestUtils.parsers(metadataManager.projectionParser()));
 
@@ -203,7 +203,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testSmoothingProjection() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -305,7 +305,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testSmaller() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -343,7 +343,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testDisjoint() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -380,7 +380,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testDifferentTypes() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -414,7 +414,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testSameSchemas() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -449,7 +449,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testDifferentCase() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -484,7 +484,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testRequired() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -516,7 +516,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testMissingNullableColumns() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -549,7 +549,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testReordering() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
@@ -594,7 +594,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
     // Set up the scan level projection
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         ScanTestUtils.projectAllWithMetadata(2),
         ScanTestUtils.parsers(metadataManager.projectionParser()));
 
@@ -641,7 +641,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
     // Set up the scan level projection
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         ScanTestUtils.projectAllWithMetadata(2),
         ScanTestUtils.parsers(metadataManager.projectionParser()));
 
@@ -688,7 +688,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
     // Set up the scan level projection
 
-    ScanLevelProjection scanProj = new ScanLevelProjection(
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
         ScanTestUtils.projectAllWithMetadata(2),
         ScanTestUtils.parsers(metadataManager.projectionParser()));
 
@@ -722,7 +722,7 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   @Test
   public void testSmoothableSchemaBatches() {
-    final ScanLevelProjection scanProj = new ScanLevelProjection(
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers());
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/RowSetTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/RowSetTestUtils.java
index b46ac55..805c864 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/RowSetTestUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/RowSetTestUtils.java
@@ -55,7 +55,7 @@ public class RowSetTestUtils {
 
   public static List<SchemaPath> projectAll() {
     return Lists.newArrayList(
-        new SchemaPath[] {SchemaPath.getSimplePath(SchemaPath.DYNAMIC_STAR)});
+        new SchemaPath[] {SchemaPath.STAR_COLUMN});
   }
 
   @SafeVarargs
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java
index b9bb531..76e8f96 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.rowSet.impl;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
@@ -29,8 +30,8 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.drill.test.rowSet.test.TestColumnConverter;
 import org.apache.drill.test.rowSet.test.TestColumnConverter.ConverterFactory;
 import org.junit.Test;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index 655d04d..784c4be 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -84,7 +85,7 @@ import org.junit.experimental.categories.Category;
 @Category(RowSetTests.class)
 public class TestCsvWithHeaders extends BaseCsvTest {
 
-  private static final String TEST_FILE_NAME = "case2.csv";
+  private static final String TEST_FILE_NAME = "basic.csv";
 
   private static String invalidHeaders[] = {
       "$,,9b,c,c,c_2",
@@ -103,11 +104,20 @@ public class TestCsvWithHeaders extends BaseCsvTest {
       "30"
   };
 
+  public static final String COLUMNS_FILE_NAME = "columns.csv";
+
+  private static String columnsCol[] = {
+      "author,columns",
+      "fred,\"Rocks Today,Dino Wrangling\"",
+      "barney,Bowlarama"
+  };
+
   @BeforeClass
   public static void setup() throws Exception {
     BaseCsvTest.setup(false,  true);
     buildFile(TEST_FILE_NAME, validHeaders);
     buildNestedTable();
+    buildFile(COLUMNS_FILE_NAME, columnsCol);
   }
 
   private static final String EMPTY_FILE = "empty.csv";
@@ -870,4 +880,105 @@ public class TestCsvWithHeaders extends BaseCsvTest {
       resetV3();
     }
   }
+
+  /**
+   * The column name `columns` is treated as a plain old
+   * column when using column headers.
+   */
+  @Test
+  public void testColumnsColV3() throws IOException {
+    try {
+      enableV3(true);
+
+      String sql = "SELECT author, columns FROM `dfs.data`.`%s`";
+      RowSet actual = client.queryBuilder().sql(sql, COLUMNS_FILE_NAME).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("author", MinorType.VARCHAR)
+          .add("columns", MinorType.VARCHAR)
+          .buildSchema();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow("fred", "Rocks Today,Dino Wrangling")
+          .addRow("barney", "Bowlarama")
+          .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetV3();
+    }
+  }
+
+  /**
+   * The column name `columns` is treated as a plain old
+   * column when using column headers. If used with an index,
+   * validation will fail because the VarChar column is not an array
+   */
+  @Test
+  public void testColumnsIndexV3() throws IOException {
+    try {
+      enableV3(true);
+
+      String sql = "SELECT author, columns[0] FROM `dfs.data`.`%s`";
+      client.queryBuilder().sql(sql, COLUMNS_FILE_NAME).run();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains(
+          "VALIDATION ERROR: Unexpected `columns`[x]; columns array not enabled"));
+      assertTrue(e.getMessage().contains("Format plugin: text"));
+      assertTrue(e.getMessage().contains("Plugin config name: csv"));
+      assertTrue(e.getMessage().contains("Extract headers: true"));
+      assertTrue(e.getMessage().contains("Skip headers: false"));
+    } catch (Exception e) {
+      fail();
+    } finally {
+      resetV3();
+    }
+  }
+
+  @Test
+  public void testColumnsMissingV3() throws IOException {
+    try {
+      enableV3(true);
+
+      String sql = "SELECT a, columns FROM `dfs.data`.`%s`";
+      RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("a", MinorType.VARCHAR)
+          .add("columns", MinorType.VARCHAR)
+          .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow("10", "")
+          .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetV3();
+    }
+  }
+
+  /**
+   * If columns[x] is used, then this can't possibly match a valid
+   * text reader column, so raise an error instead.
+   */
+  @Test
+  public void testColumnsIndexMissingV3() throws IOException {
+    try {
+      enableV3(true);
+
+      String sql = "SELECT a, columns[0] FROM `dfs.data`.`%s`";
+      client.queryBuilder().sql(sql, TEST_FILE_NAME).run();
+    } catch (UserRemoteException e) {
+      // Note: this error is caught before reading any tables,
+      // so no table information is available.
+      assertTrue(e.getMessage().contains(
+          "VALIDATION ERROR: Unexpected `columns`[x]; columns array not enabled"));
+      assertTrue(e.getMessage().contains("Format plugin: text"));
+      assertTrue(e.getMessage().contains("Plugin config name: csv"));
+      assertTrue(e.getMessage().contains("Extract headers: true"));
+      assertTrue(e.getMessage().contains("Skip headers: false"));
+    } catch (Exception e) {
+      fail();
+    } finally {
+      resetV3();
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
index 6051875..ec6810d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
@@ -17,11 +17,18 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant;
 
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -31,12 +38,6 @@ import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
-
-import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -394,4 +395,51 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
       resetV3();
     }
   }
+
+  /**
+   * When the `columns` array is allowed, the projection list cannot
+   * implicitly suggest that `columns` is a map.
+   * <p>
+   * V2 message: DATA_READ ERROR: Selected column 'columns' must be an array index
+   */
+
+  @Test
+  public void testColumnsAsMap() throws IOException {
+    String sql = "SELECT `%s`.columns.foo FROM `dfs.data`.`%s`";
+    try {
+      enableV3(true);
+      client.queryBuilder().sql(sql, TEST_FILE_NAME, TEST_FILE_NAME).run();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains(
+          "VALIDATION ERROR: Column `columns` has map elements, but must be an array"));
+      assertTrue(e.getMessage().contains("Plugin config name: csv"));
+    } catch (Exception e) {
+      fail();
+    } finally {
+      resetV3();
+    }
+  }
+  /**
+   * When the `columns` array is allowed, and an index is projected,
+   * it must be below the maximum.
+   * <p>
+   * V2 message: INTERNAL_ERROR ERROR: 70000
+   */
+
+  @Test
+  public void testColumnsIndexOverflow() throws IOException {
+    String sql = "SELECT columns[70000] FROM `dfs.data`.`%s`";
+    try {
+      enableV3(true);
+      client.queryBuilder().sql(sql, TEST_FILE_NAME, TEST_FILE_NAME).run();
+    } catch (UserRemoteException e) {
+      assertTrue(e.getMessage().contains(
+          "VALIDATION ERROR: `columns`[70000] index out of bounds, max supported size is 65536"));
+      assertTrue(e.getMessage().contains("Plugin config name: csv"));
+    } catch (Exception e) {
+      fail();
+    } finally {
+      resetV3();
+    }
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java
index f2333f4..7b523ad 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java
@@ -82,4 +82,21 @@ public enum ProjectionType {
       throw new IllegalStateException(toString());
     }
   }
+
+  public String label() {
+    switch (this) {
+    case SCALAR:
+      return "scalar (a)";
+    case ARRAY:
+      return "array (a[n])";
+    case TUPLE:
+      return "tuple (a.x)";
+    case TUPLE_ARRAY:
+      return "tuple array (a[n].x)";
+    case WILDCARD:
+      return "wildcard (*)";
+    default:
+      return name();
+    }
+  }
 }