You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ag...@apache.org on 2020/02/21 12:03:34 UTC

[drill] 05/10: DRILL-7574: Generalize the projection parser

This is an automated email from the ASF dual-hosted git repository.

agozhiy pushed a commit to branch MERGE-200221-00
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b0ab3a6cf8552c4f4e6b3256d4c70f0fbe698343
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Fri Feb 7 18:11:02 2020 -0800

    DRILL-7574: Generalize the projection parser
    
    Adds support for multi-dimensional arrays, and columns
    projected as both an array and a map.
    
    closes #1974
---
 .../impl/scan/columns/ColumnsArrayParser.java      |  17 +-
 .../scan/columns/ResolvedColumnsArrayColumn.java   |   2 +-
 .../scan/columns/UnresolvedColumnsArrayColumn.java |   2 +-
 .../impl/scan/file/FileMetadataColumnsParser.java  |  49 +--
 .../impl/scan/file/FileMetadataManager.java        |  23 +-
 .../scan/project/AbstractUnresolvedColumn.java     |   2 +-
 .../scan/project/ExplicitSchemaProjection.java     |  12 +-
 .../scan/project/ReaderSchemaOrchestrator.java     |   9 -
 .../impl/scan/project/ScanLevelProjection.java     |  67 ++-
 .../impl/scan/project/ScanSchemaOrchestrator.java  |  12 +-
 .../scan/project/projSet/AbstractReadColProj.java  |   4 -
 .../scan/project/projSet/EmptyProjectionSet.java   |   3 +
 .../project/projSet/ExplicitProjectionSet.java     |  40 +-
 .../scan/project/projSet/ProjectedDictColumn.java  |  10 +-
 .../scan/project/projSet/ProjectedMapColumn.java   |   2 +-
 .../scan/project/projSet/ProjectedReadColumn.java  |   9 +-
 .../scan/project/projSet/ProjectionChecker.java    | 141 +++++++
 .../scan/project/projSet/ProjectionSetBuilder.java |  26 +-
 .../scan/project/projSet/ProjectionSetFactory.java |   4 +-
 .../project/projSet/WildcardProjectionSet.java     |   3 +
 .../exec/physical/resultSet/ProjectionSet.java     |  13 +-
 .../physical/resultSet/impl/ContainerState.java    |  12 -
 .../resultSet/impl/ResultSetLoaderImpl.java        |  36 +-
 .../exec/physical/resultSet/impl/TupleState.java   |  18 -
 .../project/BaseRequestedColumn.java}              |  40 +-
 .../resultSet/project/ImpliedTupleRequest.java     |  28 +-
 .../physical/resultSet/project/ProjectionType.java | 187 ---------
 .../physical/resultSet/project/Projections.java    | 159 +++++++
 .../exec/physical/resultSet/project/Qualifier.java | 162 ++++++++
 .../project/QualifierContainer.java}               |  15 +-
 .../resultSet/project/RequestedColumn.java         | 239 +++++++++++
 .../resultSet/project/RequestedColumnImpl.java     | 217 ++--------
 .../physical/resultSet/project/RequestedTuple.java |  54 +--
 .../resultSet/project/RequestedTupleImpl.java      | 277 ++++---------
 .../resultSet/project/RequestedWildcardColumn.java |  61 +++
 .../exec/physical/impl/scan/TestColumnsArray.java  |   1 -
 .../impl/scan/TestColumnsArrayFramework.java       |   3 -
 .../physical/impl/scan/TestColumnsArrayParser.java |  29 +-
 .../impl/scan/TestFileMetadataColumnParser.java    | 100 -----
 .../impl/scan/TestScanOrchestratorEarlySchema.java | 110 +----
 .../impl/scan/project/TestScanLevelProjection.java |  64 +--
 .../impl/scan/project/TestSchemaSmoothing.java     |   1 -
 .../scan/project/projSet/TestProjectionSet.java    |  22 -
 .../physical/resultSet/impl/RowSetTestUtils.java   |   7 +-
 .../resultSet/project/TestProjectedPath.java       | 325 +++++++++++++++
 .../resultSet/project/TestProjectedTuple.java      | 457 ++++++++++++---------
 .../resultSet/project/TestProjectionType.java      | 155 -------
 .../easy/text/compliant/TestCsvWithHeaders.java    |  18 +-
 .../easy/text/compliant/TestCsvWithoutHeaders.java |  10 +-
 49 files changed, 1663 insertions(+), 1594 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index ffd69e6..4a25426 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -22,8 +22,8 @@ import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn
 import org.apache.drill.exec.physical.impl.scan.project.ColumnProjection;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.physical.resultSet.project.RequestedColumnImpl;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
 import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -115,7 +115,7 @@ public class ColumnsArrayParser implements ScanProjectionParser {
 
   @Override
   public boolean parse(RequestedColumn inCol) {
-    if (! requireColumnsArray && ! allowOtherCols) {
+    if (!requireColumnsArray && !allowOtherCols) {
 
       // If we do not require the columns array, then we presume that
       // the reader does not provide arrays, so any use of the columns[x]
@@ -137,14 +137,14 @@ public class ColumnsArrayParser implements ScanProjectionParser {
           new RequestedColumnImpl(builder.rootProjection(), ColumnsScanFramework.COLUMNS_COL));
       return true;
     }
-    if (! inCol.nameEquals(ColumnsScanFramework.COLUMNS_COL)) {
+    if (!inCol.nameEquals(ColumnsScanFramework.COLUMNS_COL)) {
       return false;
     }
 
     // The columns column cannot be a map. That is, the following is
     // not allowed: columns.foo.
 
-    if (inCol.isTuple() && ! allowOtherCols) {
+    if (inCol.isTuple() && !allowOtherCols) {
       throw UserException
         .validationError()
         .message("Column `%s` has map elements, but must be an array", inCol.name())
@@ -175,11 +175,10 @@ public class ColumnsArrayParser implements ScanProjectionParser {
     // Special `columns` array column. Allow multiple, but
     // project only one.
 
-    if (columnsArrayCol != null) {
-      return;
+    if (columnsArrayCol == null) {
+      columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol);
+      builder.addTableColumn(columnsArrayCol);
     }
-    columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol);
-    builder.addTableColumn(columnsArrayCol);
   }
 
   @Override
@@ -197,7 +196,7 @@ public class ColumnsArrayParser implements ScanProjectionParser {
           .addContext(builder.context())
           .build(logger);
       }
-      if (requireColumnsArray && ! allowOtherCols) {
+      if (requireColumnsArray && !allowOtherCols) {
         throw UserException
           .validationError()
           .message("Only `columns` column is allowed. Found: " + col.name())
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ResolvedColumnsArrayColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ResolvedColumnsArrayColumn.java
index 2c83065..33a4ccb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ResolvedColumnsArrayColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ResolvedColumnsArrayColumn.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.columns;
 
 import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn;
 import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.record.MaterializedField;
 
 public class ResolvedColumnsArrayColumn extends ResolvedTableColumn {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/UnresolvedColumnsArrayColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/UnresolvedColumnsArrayColumn.java
index aba6926..9eb378e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/UnresolvedColumnsArrayColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/UnresolvedColumnsArrayColumn.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.physical.impl.scan.columns;
 
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 
 public class UnresolvedColumnsArrayColumn extends AbstractUnresolvedColumn {
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
index b3cf55c..02d4da9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
@@ -25,7 +25,9 @@ import java.util.regex.Pattern;
 import org.apache.drill.exec.physical.impl.scan.project.ColumnProjection;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Parses the implicit file metadata columns out of a project list,
@@ -33,7 +35,7 @@ import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.Requested
  */
 
 public class FileMetadataColumnsParser implements ScanProjectionParser {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileMetadataColumnsParser.class);
+  private static final Logger logger = LoggerFactory.getLogger(FileMetadataColumnsParser.class);
 
   // Internal
 
@@ -46,8 +48,6 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
 
   private boolean hasImplicitCols;
 
-  private boolean expandPartitionsAtEnd;
-
   public FileMetadataColumnsParser(FileMetadataManager metadataManager) {
     this.metadataManager = metadataManager;
     partitionPattern = Pattern.compile(metadataManager.partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE);
@@ -69,11 +69,6 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
     if (defn != null) {
       return buildMetadataColumn(defn, inCol);
     }
-    if (inCol.isWildcard()) {
-      buildWildcard();
-
-      // Don't consider this a match.
-    }
     return false;
   }
 
@@ -81,15 +76,13 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
 
     // If the projected column is a map or array, then it shadows the
     // partition column. Example: dir0.x, dir0[2].
-
     if (! inCol.isSimple()) {
       logger.warn("Partition column {} is shadowed by a projected {}",
-          inCol.name(), inCol.summary());
+          inCol.name(), inCol.toString());
       return false;
     }
 
     // Partition column
-
     int partitionIndex = Integer.parseInt(m.group(1));
     if (! referencedPartitions.contains(partitionIndex)) {
       builder.addMetadataColumn(
@@ -98,7 +91,6 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
             partitionIndex));
 
       // Remember the partition for later wildcard expansion
-
       referencedPartitions.add(partitionIndex);
       hasImplicitCols = true;
     }
@@ -110,47 +102,24 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
 
     // If the projected column is a map or array, then it shadows the
     // metadata column. Example: filename.x, filename[2].
-
     if (! inCol.isSimple()) {
       logger.warn("File metadata column {} is shadowed by a projected {}",
-          inCol.name(), inCol.summary());
+          inCol.name(), inCol.toString());
       return false;
     }
 
     // File metadata (implicit) column
-
     builder.addMetadataColumn(new FileMetadataColumn(inCol.name(), defn));
     hasImplicitCols = true;
     return true;
   }
 
-  private void buildWildcard() {
-    if (!metadataManager.options().useLegacyWildcardExpansion) {
-      return;
-    }
-    if (metadataManager.options().useLegacyExpansionLocation) {
-
-      // Star column: this is a SELECT * query.
-
-      // Old-style wildcard handling inserts all partition columns in
-      // the scanner, removes them in Project.
-      // Fill in the file metadata columns. Can do here because the
-      // set is constant across all files.
-
-      expandPartitions();
-    } else {
-      expandPartitionsAtEnd = true;
-    }
-  }
-
   @Override
   public void validate() {
 
-    // Expand partitions if using a wildcard appears, if using the
-    // feature to expand partitions for wildcards, and we want the
-    // partitions after data columns.
-
-    if (expandPartitionsAtEnd) {
+    // Expand partitions when the projection includes a wildcard
+    // and when "legacy" partition expansion is enabled.
+    if (builder.projectAll() && metadataManager.options().useLegacyWildcardExpansion) {
       expandPartitions();
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
index 330a2ab..e859c46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
@@ -60,14 +60,12 @@ import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTes
  * <p>
  * This is the successor to {@link org.apache.drill.exec.store.ColumnExplorer}.
  */
-
 public class FileMetadataManager implements MetadataManager, ReaderProjectionResolver, VectorSource {
 
   /**
    * Automatically compute partition depth from files. Use only
    * for testing!
    */
-
   public static final int AUTO_PARTITION_DEPTH = -1;
 
   public static class FileMetadataOptions {
@@ -75,8 +73,12 @@ public class FileMetadataManager implements MetadataManager, ReaderProjectionRes
     private Path rootDir;
     private int partitionCount = AUTO_PARTITION_DEPTH;
     private List<Path> files;
+
+    /**
+     * Historically Drill will expand parition columns (dir0, dir1, ...)
+     * when the project list includes a wildcard.
+     */
     protected boolean useLegacyWildcardExpansion = true;
-    protected boolean useLegacyExpansionLocation;
 
     /**
       * Specify the selection root for a directory scan, if any.
@@ -113,21 +115,6 @@ public class FileMetadataManager implements MetadataManager, ReaderProjectionRes
      public void useLegacyWildcardExpansion(boolean flag) {
        useLegacyWildcardExpansion = flag;
      }
-
-     /**
-      * In legacy mode, above, Drill expands partition columns whenever the
-      * wildcard appears. Drill 1.1 - 1.11 put expanded partition columns after
-      * data columns. This is actually a better position as it minimizes changes
-      * the row layout for files at different depths. Drill 1.12 moved them before
-      * data columns: at the location of the wildcard.
-      * <p>
-      * This flag, when set, uses the Drill 1.12 position. Later enhancements
-      * can unset this flag to go back to the future: use the preferred location
-      * after other columns.
-      */
-     public void useLegacyExpansionLocation(boolean flag) {
-       useLegacyExpansionLocation = flag;
-     }
   }
 
   // Input
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/AbstractUnresolvedColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/AbstractUnresolvedColumn.java
index 9e40659..78bc388 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/AbstractUnresolvedColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/AbstractUnresolvedColumn.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.scan.project;
 
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 
 /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
index 8975a0b..fc91b38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -22,12 +22,14 @@ import java.util.List;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.complex.DictVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Perform a schema projection for the case of an explicit list of
@@ -43,7 +45,7 @@ import org.apache.drill.exec.vector.complex.DictVector;
  */
 
 public class ExplicitSchemaProjection extends ReaderLevelProjection {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplicitSchemaProjection.class);
+  private static final Logger logger = LoggerFactory.getLogger(ExplicitSchemaProjection.class);
 
   private final ScanLevelProjection scanProj;
 
@@ -148,7 +150,7 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
 
     ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple,
         column.schema(), sourceIndex);
-    resolveTuple(mapCol.members(), requestedCol.mapProjection(),
+    resolveTuple(mapCol.members(), requestedCol.tuple(),
         column.tupleSchema());
 
     // If the projection is simple, then just project the map column
@@ -193,7 +195,7 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
     }
 
     ResolvedDictColumn dictColumn = new ResolvedDictColumn(outputTuple, column.schema(), sourceIndex);
-    resolveDictTuple(dictColumn.members(), requestedCol.mapProjection(), column.tupleSchema());
+    resolveDictTuple(dictColumn.members(), requestedCol.tuple(), column.tupleSchema());
 
     // The same as for Map
     if (dictColumn.members().isSimpleProjection()) {
@@ -301,7 +303,7 @@ public class ExplicitSchemaProjection extends ReaderLevelProjection {
   private ResolvedColumn resolveMapMembers(ResolvedTuple outputTuple, RequestedColumn col) {
     ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple, col.name());
     ResolvedTuple members = mapCol.members();
-    for (RequestedColumn child : col.mapProjection().projections()) {
+    for (RequestedColumn child : col.tuple().projections()) {
       if (child.isTuple()) {
         members.add(resolveMapMembers(members, child));
       } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
index 1460a85..99356e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
@@ -34,7 +34,6 @@ import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTes
  * scan operator manages. Vectors are reused across readers, but via a vector
  * cache. All other state is distinct between readers.
  */
-
 public class ReaderSchemaOrchestrator implements VectorSource {
 
   private final ScanSchemaOrchestrator scanOrchestrator;
@@ -48,7 +47,6 @@ public class ReaderSchemaOrchestrator implements VectorSource {
    * schema changes in this output batch by absorbing trivial schema changes
    * that occur across readers.
    */
-
   private ResolvedRow rootTuple;
   private VectorContainer tableContainer;
 
@@ -88,7 +86,6 @@ public class ReaderSchemaOrchestrator implements VectorSource {
     options.setSchema(readerSchema);
 
     // Create the table loader
-
     tableLoader = new ResultSetLoaderImpl(scanOrchestrator.allocator, options.build());
     return tableLoader;
   }
@@ -111,22 +108,18 @@ public class ReaderSchemaOrchestrator implements VectorSource {
    * to the output batch. First, build the metadata and/or null columns for the
    * table row count. Then, merge the sources.
    */
-
   public void endBatch() {
 
     // Get the batch results in a container.
-
     tableContainer = tableLoader.harvest();
 
     // If the schema changed, set up the final projection based on
     // the new (or first) schema.
-
     if (prevTableSchemaVersion < tableLoader.schemaVersion()) {
       reviseOutputProjection();
     } else {
 
       // Fill in the null and metadata columns.
-
       populateNonDataColumns();
     }
     rootTuple.setRowCount(tableContainer.getRecordCount());
@@ -147,7 +140,6 @@ public class ReaderSchemaOrchestrator implements VectorSource {
    * only need be done if null columns were created when mapping from a prior
    * schema.
    */
-
   private void reviseOutputProjection() {
 
     // Do the table-schema level projection; the final matching
@@ -223,7 +215,6 @@ public class ReaderSchemaOrchestrator implements VectorSource {
    *
    * @param tableSchema newly arrived schema
    */
-
   private void doExplicitProjection(TupleMetadata tableSchema) {
     rootTuple = newRootTuple();
     new ExplicitSchemaProjection(scanOrchestrator.scanProj,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
index b4be33b..a117e7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -26,9 +26,9 @@ import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedWildcardColumn;
 import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetBuilder;
 import org.apache.drill.exec.physical.resultSet.project.ImpliedTupleRequest;
+import org.apache.drill.exec.physical.resultSet.project.Projections;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTupleImpl;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
@@ -211,7 +211,6 @@ public class ScanLevelProjection {
    * the columns are table columns. The add-on parser can tag
    * columns as special, such as to hold metadata.
    */
-
   public interface ScanProjectionParser {
     void bind(ScanLevelProjection builder);
     boolean parse(RequestedColumn inCol);
@@ -222,8 +221,9 @@ public class ScanLevelProjection {
 
   public static class Builder {
     private List<SchemaPath> projectionList;
-    private List<ScanProjectionParser> parsers = new ArrayList<>();
+    private final List<ScanProjectionParser> parsers = new ArrayList<>();
     private TupleMetadata outputSchema;
+
     /**
      * Context used with error messages.
      */
@@ -301,16 +301,14 @@ public class ScanLevelProjection {
    * Projection definition for the scan a whole. Parsed form of the input
    * projection list.
    */
-
   protected RequestedTuple outputProjection;
 
   /**
    * Projection definition passed to each reader. This is the set of
    * columns that the reader is asked to provide.
    */
-
   protected RequestedTuple readerProjection;
-  protected ScanProjectionType projectionType = ScanProjectionType.EMPTY;
+  protected ScanProjectionType projectionType;
 
   private ScanLevelProjection(Builder builder) {
     this.projectionList = builder.projectionList();
@@ -351,23 +349,25 @@ public class ScanLevelProjection {
   }
 
   private void doParse() {
-    outputProjection = RequestedTupleImpl.parse(projectionList);
-
-    for (ScanProjectionParser parser : parsers) {
-      parser.bind(this);
-    }
-
-    // First pass: check if a wildcard exists.
-
-    for (RequestedColumn inCol : outputProjection.projections()) {
-      if (inCol.isWildcard()) {
+    outputProjection = Projections.parse(projectionList);
+    switch (outputProjection.type()) {
+      case ALL:
         includesWildcard = true;
+        projectionType = ScanProjectionType.WILDCARD;
+        break;
+      case NONE:
+        projectionType = ScanProjectionType.EMPTY;
+        break;
+      default:
+        projectionType = ScanProjectionType.EXPLICIT;
         break;
-      }
     }
 
-    // Second pass: process projected columns.
+    for (ScanProjectionParser parser : parsers) {
+      parser.bind(this);
+    }
 
+    // Process projected columns.
     for (RequestedColumn inCol : outputProjection.projections()) {
       if (inCol.isWildcard()) {
         mapWildcard(inCol);
@@ -393,19 +393,18 @@ public class ScanLevelProjection {
     // projection. With a schema, we want the schema columns (which may
     // or may not correspond to reader columns.)
 
-    if (projectionType != ScanProjectionType.EMPTY &&
-        projectionType != ScanProjectionType.EXPLICIT) {
-
+    if (projectionType == ScanProjectionType.EMPTY) {
+      readerProjection = ImpliedTupleRequest.NO_MEMBERS;
+    } else if (projectionType != ScanProjectionType.EXPLICIT) {
       readerProjection = ImpliedTupleRequest.ALL_MEMBERS;
     } else {
-
       List<RequestedColumn> outputProj = new ArrayList<>();
       for (ColumnProjection col : outputCols) {
         if (col instanceof AbstractUnresolvedColumn) {
           outputProj.add(((AbstractUnresolvedColumn) col).element());
         }
       }
-      readerProjection = RequestedTupleImpl.build(outputProj);
+      readerProjection = Projections.build(outputProj);
     }
   }
 
@@ -414,29 +413,25 @@ public class ScanLevelProjection {
    * columns that are needed. The order is important: we want custom
    * columns to follow table columns.
    */
-
   private void mapWildcard(RequestedColumn inCol) {
 
     // Wildcard column: this is a SELECT * query.
-
     assert includesWildcard;
     if (sawWildcard) {
       throw new IllegalArgumentException("Duplicate * entry in project list");
     }
 
     // Expand strict schema columns, if provided
-
+    assert projectionType == ScanProjectionType.WILDCARD;
     boolean expanded = expandOutputSchema();
 
     // Remember the wildcard position, if we need to insert it.
     // Ensures that the main wildcard expansion occurs before add-on
     // columns.
-
     int wildcardPosn = outputCols.size();
 
     // Parsers can consume the wildcard. But, all parsers must
     // have visibility to the wildcard column.
-
     for (ScanProjectionParser parser : parsers) {
       if (parser.parse(inCol)) {
         wildcardPosn = -1;
@@ -444,12 +439,10 @@ public class ScanLevelProjection {
     }
 
     // Set this flag only after the parser checks.
-
     sawWildcard = true;
 
     // If not consumed, put the wildcard column into the projection list as a
     // placeholder to be filled in later with actual table columns.
-
     if (expanded) {
       projectionType =
           outputSchema.booleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)
@@ -457,7 +450,6 @@ public class ScanLevelProjection {
           : ScanProjectionType.SCHEMA_WILDCARD;
     } else if (wildcardPosn != -1) {
       outputCols.add(wildcardPosn, new UnresolvedWildcardColumn(inCol));
-      projectionType = ScanProjectionType.WILDCARD;
     }
   }
 
@@ -476,7 +468,6 @@ public class ScanLevelProjection {
 
       // Skip columns tagged as "special"; those that should not expand
       // automatically.
-
       if (col.booleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD)) {
         continue;
       }
@@ -502,7 +493,6 @@ public class ScanLevelProjection {
    *
    * @param inCol the SELECT column
    */
-
   private void mapColumn(RequestedColumn inCol) {
 
     // Give the extensions first crack at each column.
@@ -525,7 +515,6 @@ public class ScanLevelProjection {
     }
 
     // This is a desired table column.
-
     addTableColumn(inCol);
   }
 
@@ -539,7 +528,6 @@ public class ScanLevelProjection {
 
   public void addTableColumn(ColumnProjection outCol) {
     outputCols.add(outCol);
-    projectionType = ScanProjectionType.EXPLICIT;
   }
 
   public void addMetadataColumn(ColumnProjection outCol) {
@@ -552,17 +540,14 @@ public class ScanLevelProjection {
    * add-on parser is given an opportunity to do its own
    * validation.
    */
-
   private void verify() {
 
     // Let parsers do overall validation.
-
     for (ScanProjectionParser parser : parsers) {
       parser.validate();
     }
 
     // Validate column-by-column.
-
     for (ColumnProjection outCol : outputCols) {
       for (ScanProjectionParser parser : parsers) {
         parser.validateColumn(outCol);
@@ -576,7 +561,6 @@ public class ScanLevelProjection {
    * Return the set of columns from the SELECT list
    * @return the SELECT list columns, in SELECT list order
    */
-
   public List<SchemaPath> requestedCols() { return projectionList; }
 
   /**
@@ -585,7 +569,6 @@ public class ScanLevelProjection {
    * table order (for SELECT * queries).
    * @return the set of output columns in output order
    */
-
   public List<ColumnProjection> columns() { return outputCols; }
 
   public ScanProjectionType projectionType() { return projectionType; }
@@ -594,7 +577,6 @@ public class ScanLevelProjection {
    * Return whether this is a SELECT * query
    * @return true if this is a SELECT * query
    */
-
   public boolean projectAll() { return projectionType.isWildcard(); }
 
   /**
@@ -607,7 +589,6 @@ public class ScanLevelProjection {
    * if at least one column is projected (or the query contained
    * the wildcard)
    */
-
   public boolean isEmptyProjection() { return projectionType == ScanProjectionType.EMPTY; }
 
   public RequestedTuple rootProjection() { return outputProjection; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index edc6acf..5626d1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -165,12 +165,12 @@ public class ScanSchemaOrchestrator {
     private MetadataManager metadataManager;
     private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
     private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
-    private List<ScanProjectionParser> parsers = new ArrayList<>();
-    private List<ReaderProjectionResolver> schemaResolvers = new ArrayList<>();
+    private final List<ScanProjectionParser> parsers = new ArrayList<>();
+    private final List<ReaderProjectionResolver> schemaResolvers = new ArrayList<>();
     private boolean useSchemaSmoothing;
     private boolean allowRequiredNullColumns;
     private List<SchemaPath> projection;
-    private TypeConverter.Builder typeConverterBuilder = TypeConverter.builder();
+    private final TypeConverter.Builder typeConverterBuilder = TypeConverter.builder();
 
     /**
      * Option that enables whether the scan operator starts with an empty
@@ -422,9 +422,9 @@ public class ScanSchemaOrchestrator {
 
     ScanProjectionParser parser = metadataManager.projectionParser();
     if (parser != null) {
-      // Insert in first position so that it is ensured to see
-      // any wildcard that exists
-      options.parsers.add(0, parser);
+      // Insert in last position to expand wildcards at
+      // the end of the tuple.
+      options.parsers.add(parser);
     }
 
     // Parse the projection list.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractReadColProj.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractReadColProj.java
index d7bde8f..d3fc2ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractReadColProj.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractReadColProj.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.scan.project.projSet;
 
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
 import org.apache.drill.exec.physical.resultSet.ProjectionSet.ColumnReadProjection;
-import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 
@@ -44,7 +43,4 @@ public abstract class AbstractReadColProj implements ColumnReadProjection {
 
   @Override
   public ProjectionSet mapProjection() { return ProjectionSetFactory.projectAll(); }
-
-  @Override
-  public ProjectionType projectionType() { return null; }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
index 7fdec52..d02a04c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
@@ -45,4 +45,7 @@ public class EmptyProjectionSet implements ProjectionSet {
 
   @Override
   public boolean isEmpty() { return true; }
+
+  @Override
+  public boolean isProjected(String colName) { return false; }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
index daf2f1a..aace7e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
@@ -17,19 +17,14 @@
  */
 package org.apache.drill.exec.physical.impl.scan.project.projSet;
 
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
-import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.physical.resultSet.project.RequestedColumnImpl;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.TupleProjectionType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.exec.vector.complex.DictVector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Projection set based on an explicit set of columns provided
@@ -38,7 +33,6 @@ import org.slf4j.LoggerFactory;
  */
 
 public class ExplicitProjectionSet extends AbstractProjectionSet {
-  private static final Logger logger = LoggerFactory.getLogger(ExplicitProjectionSet.class);
 
   private final RequestedTuple requestedProj;
 
@@ -48,6 +42,11 @@ public class ExplicitProjectionSet extends AbstractProjectionSet {
   }
 
   @Override
+  public boolean isProjected(String colName) {
+    return requestedProj.get(colName) != null;
+  }
+
+  @Override
   public ColumnReadProjection readProjection(ColumnMetadata col) {
     RequestedColumn reqCol = requestedProj.get(col.name());
     if (reqCol == null) {
@@ -59,7 +58,7 @@ public class ExplicitProjectionSet extends AbstractProjectionSet {
 
   private ColumnReadProjection getReadProjection(ColumnMetadata col, RequestedColumn reqCol) {
     ColumnMetadata outputSchema = outputSchema(col);
-    validateProjection(reqCol, outputSchema == null ? col : outputSchema);
+    ProjectionChecker.validateProjection(reqCol, outputSchema == null ? col : outputSchema, errorContext);
     if (!col.isMap() && !col.isDict()) {
 
       // Non-map column.
@@ -73,7 +72,7 @@ public class ExplicitProjectionSet extends AbstractProjectionSet {
 
       TypeConverter childConverter = childConverter(outputSchema);
       ProjectionSet mapProjection;
-      if (! reqCol.type().isTuple() || reqCol.mapProjection().type() == TupleProjectionType.ALL) {
+      if (! reqCol.isTuple() || reqCol.tuple().type() == TupleProjectionType.ALL) {
 
         // Projection is simple: "m". This is equivalent to
         // (non-SQL) m.*
@@ -88,7 +87,7 @@ public class ExplicitProjectionSet extends AbstractProjectionSet {
         // projected; that case, while allowed in the RequestedTuple
         // implementation, can never occur in a SELECT list.)
 
-        mapProjection = new ExplicitProjectionSet(reqCol.mapProjection(), childConverter);
+        mapProjection = new ExplicitProjectionSet(reqCol.tuple(), childConverter);
       }
       if (col.isMap()) {
         return new ProjectedMapColumn(col, reqCol, outputSchema, mapProjection);
@@ -98,27 +97,6 @@ public class ExplicitProjectionSet extends AbstractProjectionSet {
     }
   }
 
-  public void validateProjection(RequestedColumn colReq, ColumnMetadata readCol) {
-    if (colReq == null || readCol == null) {
-      return;
-    }
-    ProjectionType type = colReq.type();
-    if (type == null) {
-      return;
-    }
-    ProjectionType neededType = ProjectionType.typeFor(readCol.majorType());
-    if (type.isCompatible(neededType)) {
-      return;
-    }
-    throw UserException.validationError()
-      .message("Column type not compatible with projection specification")
-      .addContext("Column:", readCol.name())
-      .addContext("Projection type:", type.label())
-      .addContext("Column type:", Types.getSqlTypeName(readCol.majorType()))
-      .addContext(errorContext)
-      .build(logger);
-  }
-
   @Override
   public ColumnReadProjection readDictProjection(ColumnMetadata col) {
     // Unlike for a MAP, requestedProj contains a key value, rather than nested field's name:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java
index 9f4eecf..6805a0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java
@@ -18,8 +18,7 @@
 package org.apache.drill.exec.physical.impl.scan.project.projSet;
 
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
-import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 
 public class ProjectedDictColumn extends ProjectedReadColumn {
@@ -27,7 +26,7 @@ public class ProjectedDictColumn extends ProjectedReadColumn {
   private final ProjectionSet tupleProjection;
 
   public ProjectedDictColumn(ColumnMetadata readSchema,
-                            RequestedTuple.RequestedColumn requestedCol, ColumnMetadata outputSchema,
+                            RequestedColumn requestedCol, ColumnMetadata outputSchema,
                             ProjectionSet tupleProjection) {
     super(readSchema, requestedCol, outputSchema, null);
     this.tupleProjection = tupleProjection;
@@ -37,9 +36,4 @@ public class ProjectedDictColumn extends ProjectedReadColumn {
   public ProjectionSet mapProjection() {
     return tupleProjection;
   }
-
-  @Override
-  public ProjectionType projectionType() {
-    return super.projectionType().isArray() ? ProjectionType.DICT_ARRAY : ProjectionType.ARRAY;
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java
index da38550..f29ca31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.physical.impl.scan.project.projSet;
 
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 
 public class ProjectedMapColumn extends ProjectedReadColumn {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedReadColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedReadColumn.java
index ddc9002..834b657 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedReadColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedReadColumn.java
@@ -18,8 +18,7 @@
 package org.apache.drill.exec.physical.impl.scan.project.projSet;
 
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
-import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 
@@ -30,6 +29,7 @@ import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
  */
 
 public class ProjectedReadColumn extends AbstractReadColProj {
+  @SuppressWarnings("unused")
   private final RequestedColumn requestedCol;
   private final ColumnMetadata outputSchema;
   private final ColumnConversionFactory conversionFactory;
@@ -69,10 +69,5 @@ public class ProjectedReadColumn extends AbstractReadColProj {
   }
 
   @Override
-  public ProjectionType projectionType() {
-    return requestedCol == null ? null : requestedCol.type();
-  }
-
-  @Override
   public ColumnConversionFactory conversionFactory() { return conversionFactory; }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionChecker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionChecker.java
new file mode 100644
index 0000000..8757167
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionChecker.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
+import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to check if a column is consistent with the projection
+ * requested for a query. Used for scans: the reader offers certain columns
+ * and the scan operator must decide whether to accept them, and if so,
+ * if the column that has actually appeared is consistent with the projection
+ * schema path provided by the planner. An obvious example is if projection
+ * asks for {@code a[0]} (and array), but the reader offer up {@code a}
+ * as a non-array column.
+ * <p>
+ * Checks are reasonable, but not complete. Particularly in the {@code DICT}
+ * case, projection depends on multiple factors, such as the type of the
+ * key and values. This class does not (yet) handle that complexity.
+ * Instead, the goal is no false negatives for the complex cases, while
+ * catching the simple cases.
+ * <p>
+ * The Project operator or other consuming operator is the final arbitrator
+ * of whether a particular column satisfies a particular projection. This
+ * class tries to catch those errors early to provide better error
+ * messages.
+ */
+public class ProjectionChecker {
+  private static final Logger logger = LoggerFactory.getLogger(ProjectionChecker.class);
+
+  private ProjectionChecker() { }
+
+  /**
+   * Check if the given read column is consistent with the projection requested for
+   * a tuple. This form handles wildcard projection and unprojected columns; cases
+   * where there is no column-level projection information.
+   *
+   * @param tuple the tuple-level projection description
+   * @param readCol metadata for the column which the reader has actually
+   * produced
+   * @return {@code true} if the column is consistent with projection (or if the
+   * column is too complex to check), {@code false} if the column is not
+   * consistent and represents an error case. Also returns {@code true} if
+   * the column is not projected, as any type of column can be ignored
+   */
+  public static boolean isConsistent(RequestedTuple tuple, ColumnMetadata readCol) {
+    if (tuple == null || !tuple.isProjected(readCol.name())) {
+      return true;
+    }
+    // If the column is projected, it may be projected implicitly.
+    // Only check explicit projection.
+    RequestedColumn col = tuple.get(readCol.name());
+    if (col == null) {
+      return true;
+    } else {
+      return isConsistent(col, readCol);
+    }
+  }
+
+  /**
+   * Check if the given read column is consistent with the projection requested for
+   * that column. Does not handle subtleties such as DICT key types, actual types
+   * in a UNION, etc.
+   *
+   * @param colReq the column-level projection description
+   * @param readCol metadata for the column which the reader has actually
+   * produced
+   * @return {@code true} if the column is consistent with projection (or if the
+   * column is too complex to check), {@code false} if the column is not
+   * consistent and represents an error case. Also returns {@code true} if
+   * the column is not projected, as any type of column can be ignored
+   */
+  public static boolean isConsistent(RequestedColumn colReq, ColumnMetadata readCol) {
+    if (colReq == null || readCol == null) {
+      return true;
+    }
+    if (colReq.isTuple() && !(readCol.isMap() || readCol.isDict() || readCol.isVariant())) {
+      return false;
+    }
+    if (colReq.isArray()) {
+      if (colReq.arrayDims() == 1) {
+        return readCol.isArray() || readCol.isDict() || readCol.isVariant();
+      } else {
+        return readCol.type() == MinorType.LIST || readCol.isDict() || readCol.isVariant();
+      }
+    }
+    return true;
+  }
+
+  public static void validateProjection(RequestedColumn colReq, ColumnMetadata readCol) {
+    validateProjection(colReq, readCol, null);
+  }
+
+  /**
+   * Perform the column-level projection as described in
+   * {@link #isConsistent(RequestedColumn, ColumnMetadata)}, and raise a
+   * {@code UserException} if the column is not consistent with projection.
+   *
+   * @param colReq the column-level projection description
+   * @param readCol metadata for the column which the reader has actually
+   * produced
+   * @param errorContext additional error context to pass along in the
+   * exception
+   * @throws UserException if the read column is not consistent with the
+   * projection description for the column
+   */
+  public static void validateProjection(RequestedColumn colReq, ColumnMetadata readCol,
+      CustomErrorContext errorContext) {
+    if (!isConsistent(colReq, readCol)) {
+      throw UserException.validationError()
+        .message("Column type not compatible with projection specification")
+        .addContext("Column:", readCol.name())
+        .addContext("Projection type:", colReq.toString())
+        .addContext("Column type:", Types.getSqlTypeName(readCol.majorType()))
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetBuilder.java
index f1a48d6..6f788b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetBuilder.java
@@ -22,9 +22,9 @@ import java.util.Collection;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
+import org.apache.drill.exec.physical.resultSet.project.Projections;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.TupleProjectionType;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTupleImpl;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 public class ProjectionSetBuilder {
@@ -52,7 +52,7 @@ public class ProjectionSetBuilder {
     if (projection == null) {
       parsedProjection = null;
     } else {
-      parsedProjection = RequestedTupleImpl.parse(projection);
+      parsedProjection = Projections.parse(projection);
     }
     return this;
   }
@@ -83,17 +83,17 @@ public class ProjectionSetBuilder {
 
     ProjectionSet projSet;
     switch (projType) {
-    case ALL:
-      projSet = new WildcardProjectionSet(typeConverter);
-      break;
-    case NONE:
-      projSet = ProjectionSetFactory.projectNone();
-      break;
-    case SOME:
-      projSet = new ExplicitProjectionSet(parsedProjection, typeConverter);
-      break;
-    default:
-      throw new IllegalStateException("Unexpected projection type: " + projType.toString());
+      case ALL:
+        projSet = new WildcardProjectionSet(typeConverter);
+        break;
+      case NONE:
+        projSet = ProjectionSetFactory.projectNone();
+        break;
+      case SOME:
+        projSet = new ExplicitProjectionSet(parsedProjection, typeConverter);
+        break;
+      default:
+        throw new IllegalStateException("Unexpected projection type: " + projType.toString());
     }
     projSet.setErrorContext(errorContext);
     return projSet;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetFactory.java
index f5642cb..46e1b94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetFactory.java
@@ -23,8 +23,8 @@ import java.util.Map;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.scan.project.projSet.TypeConverter.CustomTypeTransform;
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
+import org.apache.drill.exec.physical.resultSet.project.Projections;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTupleImpl;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
 import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionDefn;
@@ -70,7 +70,7 @@ public class ProjectionSetFactory {
     if (selection == null) {
       return projectAll();
     }
-    return wrap(RequestedTupleImpl.parse(selection));
+    return wrap(Projections.parse(selection));
   }
 
   public static CustomTypeTransform simpleTransform(ColumnConversionFactory colFactory) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
index 1da5a2b..f7d216f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
@@ -31,6 +31,9 @@ public class WildcardProjectionSet extends AbstractProjectionSet {
   }
 
   @Override
+  public boolean isProjected(String colName) { return true; }
+
+  @Override
   public ColumnReadProjection readProjection(ColumnMetadata col) {
     if (isSpecial(col)) {
       return new UnprojectedReadColumn(col);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java
index 208defd..81b6a24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java
@@ -18,10 +18,8 @@
 package org.apache.drill.exec.physical.resultSet;
 
 import org.apache.drill.common.exceptions.CustomErrorContext;
-import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
-import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 
 /**
  * Provides a dynamic, run-time view of a projection set. Used by
@@ -74,7 +72,6 @@ public interface ProjectionSet {
    * indicates whether a reader column is projected, and if so, the attributes
    * of that projection.
    */
-
   public interface ColumnReadProjection {
 
     /**
@@ -83,23 +80,15 @@ public interface ProjectionSet {
      * from an explicit projection, or columns within a wildcard projection
      * where the column is "special" and is not expanded in the wildcard.
      */
-
     boolean isProjected();
 
     ColumnMetadata readSchema();
     ColumnMetadata providedSchema();
     ColumnConversionFactory conversionFactory();
     ProjectionSet mapProjection();
-
-    /**
-     * The projection type from the parse of the projection list,
-     * if available. Used for testing only. Don't use this in production
-     * code, let this class do the checks itself.
-     */
-    @VisibleForTesting
-    ProjectionType projectionType();
   }
 
+  boolean isProjected(String colName);
   void setErrorContext(CustomErrorContext errorContext);
   ColumnReadProjection readProjection(ColumnMetadata col);
   ColumnReadProjection readDictProjection(ColumnMetadata col);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ContainerState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ContainerState.java
index 3881891..fde2844 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ContainerState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ContainerState.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
  * <li>A column state which orchestrates the above three items.</li>
  * <ul>
  */
-
 public abstract class ContainerState {
 
   protected final LoaderInternals loader;
@@ -51,7 +50,6 @@ public abstract class ContainerState {
    * Vector cache for this loader.
    * @see {@link OptionBuilder#setVectorCache()}.
    */
-
   protected final ResultVectorCache vectorCache;
 
   public ContainerState(LoaderInternals loader, ResultVectorCache vectorCache, ProjectionSet projectionSet) {
@@ -79,7 +77,6 @@ public abstract class ContainerState {
    *
    * @return <tt>true</tt> if versioned
    */
-
   protected abstract boolean isVersioned();
 
   protected LoaderInternals loader() { return loader; }
@@ -89,19 +86,15 @@ public abstract class ContainerState {
   public ColumnState addColumn(ColumnMetadata columnSchema) {
 
     // Create the vector, writer and column state
-
     ColumnState colState = loader.columnBuilder().buildColumn(this, columnSchema);
 
     // Add the column to this container
-
     addColumn(colState);
 
     // Set initial cardinality
-
     colState.updateCardinality(innerCardinality());
 
     // Allocate vectors if a batch is in progress.
-
     if (loader().writeable()) {
       colState.allocateVectors();
     }
@@ -116,7 +109,6 @@ public abstract class ContainerState {
    * this value is recursively pushed downward to compute the cardinality
    * of lists of maps that contains lists of maps, and so on.
    */
-
   public void updateCardinality() {
     int innerCardinality = innerCardinality();
     assert innerCardinality > 0;
@@ -129,7 +121,6 @@ public abstract class ContainerState {
    * Start a new batch by shifting the overflow buffers back into the main
    * write vectors and updating the writers.
    */
-
   public void startBatch(boolean schemaOnly) {
     for (ColumnState colState : columnStates()) {
       colState.startBatch(schemaOnly);
@@ -143,7 +134,6 @@ public abstract class ContainerState {
    * for some previous row, depending on exactly when and where the overflow
    * occurs.
    */
-
   public void rollover() {
     for (ColumnState colState : columnStates()) {
       colState.rollover();
@@ -155,7 +145,6 @@ public abstract class ContainerState {
    * vector for harvesting to send downstream. Set aside the look-ahead vector
    * and put the full vector buffer back into the active vector.
    */
-
   public void harvestWithLookAhead() {
     for (ColumnState colState : columnStates()) {
       colState.harvestWithLookAhead();
@@ -166,7 +155,6 @@ public abstract class ContainerState {
    * Clean up state (such as backup vectors) associated with the state
    * for each vector.
    */
-
   public void close() {
     for (ColumnState colState : columnStates()) {
       colState.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 6c27706..c0695fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -39,13 +39,11 @@ import org.slf4j.LoggerFactory;
  *
  * @see {@link ResultSetLoader}
  */
-
 public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
 
   /**
    * Read-only set of options for the result set loader.
    */
-
   public static class ResultSetOptions {
     protected final int vectorSizeLimit;
     protected final int rowCountLimit;
@@ -95,13 +93,11 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
     /**
      * Before the first batch.
      */
-
     START,
 
     /**
      * Writing to a batch normally.
      */
-
     ACTIVE,
 
     /**
@@ -109,14 +105,12 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
      * to write to a temporary "overflow" batch until the
      * end of the current row.
      */
-
     OVERFLOW,
 
     /**
      * Temporary state to avoid batch-size related overflow while
      * an overflow is in progress.
      */
-
     IN_OVERFLOW,
 
     /**
@@ -124,14 +118,12 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
      * when saving a row.
      * No more writes allowed until harvesting the current batch.
      */
-
     FULL_BATCH,
 
     /**
      * Current batch was harvested: data is gone. No lookahead
      * batch exists.
      */
-
     HARVESTED,
 
     /**
@@ -155,13 +147,11 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
      * fine. The correct buffers are restored once a new batch is started
      * and the state moves to ACTIVE.
      */
-
     LOOK_AHEAD,
 
     /**
      * Mutator is closed: no more operations are allowed.
      */
-
     CLOSED
   }
 
@@ -170,26 +160,22 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
   /**
    * Options provided to this loader.
    */
-
   private final ResultSetOptions options;
 
   /**
    * Allocator for vectors created by this loader.
    */
-
   private final BufferAllocator allocator;
 
   /**
    * Builds columns (vector, writer, state).
    */
-
   private final ColumnBuilder columnBuilder;
 
   /**
    * Internal structure used to work with the vectors (real or dummy) used
    * by this loader.
    */
-
   private final RowState rootState;
 
   /**
@@ -200,21 +186,18 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * within the writer structure that points to the current position within
    * an array column.
    */
-
   private final WriterIndexImpl writerIndex;
 
   /**
    * The row-level writer for stepping through rows as they are written,
    * and for accessing top-level columns.
    */
-
   private final RowSetLoaderImpl rootWriter;
 
   /**
    * Tracks the state of the row set loader. Handling vector overflow requires
    * careful stepping through a variety of states as the write proceeds.
    */
-
   private State state = State.START;
 
   /**
@@ -223,7 +206,6 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * This allows very easy checks for schema changes: save the prior version number
    * and compare it against the current version number.
    */
-
   private int activeSchemaVersion;
 
   /**
@@ -234,21 +216,18 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * sees the schema as it existed at a prior version: the harvest schema
    * version.
    */
-
   private int harvestSchemaVersion;
 
   /**
    * Counts the batches harvested (sent downstream) from this loader. Does
    * not include the current, in-flight batch.
    */
-
   private int harvestBatchCount;
 
   /**
    * Counts the rows included in previously-harvested batches. Does not
    * include the number of rows in the current batch.
    */
-
   private int previousRowCount;
 
   /**
@@ -258,7 +237,6 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * overflow row is in effect, then this number is undefined (and should be
    * zero.)
    */
-
   private int pendingRowCount;
 
   /**
@@ -266,13 +244,11 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * adjusted between batches, perhaps based on the actual observed size of
    * input data.
    */
-
   private int targetRowCount;
 
   /**
    * Total bytes allocated to the current batch.
    */
-
   protected int accumulatedBatchSize;
 
   protected final ProjectionSet projectionSet;
@@ -399,7 +375,6 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
   /**
    * Start a batch to report only schema without data.
    */
-
   public void startEmptyBatch() {
     startBatch(true);
   }
@@ -484,7 +459,6 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * Called before writing a new row. Implementation of
    * {@link RowSetLoader#start()}.
    */
-
   protected void startRow() {
     switch (state) {
     case ACTIVE:
@@ -504,13 +478,12 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * Finalize the current row. Implementation of
    * {@link RowSetLoader#save()}.
    */
-
   protected void saveRow() {
     switch (state) {
     case ACTIVE:
       rootWriter.endArrayValue();
       rootWriter.saveRow();
-      if (! writerIndex.next()) {
+      if (!writerIndex.next()) {
         state = State.FULL_BATCH;
       }
 
@@ -550,11 +523,10 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * @return true if the batch is full (reached vector capacity or the
    * row count limit), false if more rows can be added
    */
-
   protected boolean isFull() {
     switch (state) {
     case ACTIVE:
-      return ! writerIndex.valid();
+      return !writerIndex.valid();
     case OVERFLOW:
     case FULL_BATCH:
       return true;
@@ -579,7 +551,6 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * @return the number of rows to be sent downstream for this
    * batch. Does not include the overflow row.
    */
-
   protected int rowCount() {
     switch (state) {
     case ACTIVE:
@@ -621,7 +592,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
 
   @Override
   public boolean isProjectionEmpty() {
-    return ! rootState.hasProjections();
+    return !rootState.hasProjections();
   }
 
   @Override
@@ -809,7 +780,6 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
    * This will occur if the target row count is incorrect for the
    * data size.
    */
-
   private void checkInitialAllocation() {
     if (options.maxBatchSize < 0) {
       logger.debug("Initial vector allocation: {}, no batch limit specified",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
index 2e5881b..6138283 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
@@ -90,7 +90,6 @@ import org.apache.drill.exec.vector.complex.RepeatedDictVector;
  * either one list of columns or another, the internal and external maps must
  * differ. The set of child vectors (except for child maps) are shared.
  */
-
 public abstract class TupleState extends ContainerState
   implements AbstractTupleWriter.TupleWriterListener {
 
@@ -125,7 +124,6 @@ public abstract class TupleState extends ContainerState
    * a structured: an ordered, named list of columns.) When looking for newly
    * added columns, they will always be at the end.
    */
-
   public static class MapColumnState extends BaseContainerColumnState {
     protected final MapState mapState;
     protected boolean isVersioned;
@@ -169,7 +167,6 @@ public abstract class TupleState extends ContainerState
      * </ul>
      * @return <tt>true</tt> if this map is versioned as described above
      */
-
     public boolean isVersioned() { return isVersioned; }
 
     @Override
@@ -181,7 +178,6 @@ public abstract class TupleState extends ContainerState
    * vector. The map vector itself is a pseudo-vector that is simply a
    * container for other vectors, and so needs no management itself.
    */
-
   public static class MapVectorState implements VectorState {
 
     private final AbstractMapVector mapVector;
@@ -243,14 +239,12 @@ public abstract class TupleState extends ContainerState
    * Note that by "row" we mean the set of vectors that define the
    * set of rows.
    */
-
   public static class RowState extends TupleState {
 
     /**
      * The row-level writer for stepping through rows as they are written,
      * and for accessing top-level columns.
      */
-
     private final RowSetLoaderImpl writer;
 
     /**
@@ -259,7 +253,6 @@ public abstract class TupleState extends ContainerState
      * consumer of the writers. Also excludes columns if added during
      * an overflow row.
      */
-
     private final VectorContainer outputContainer;
 
     public RowState(ResultSetLoaderImpl rsLoader, ResultVectorCache vectorCache) {
@@ -283,7 +276,6 @@ public abstract class TupleState extends ContainerState
      *
      * @return <tt>true</tt>
      */
-
     @Override
     protected boolean isVersioned() { return true; }
 
@@ -314,7 +306,6 @@ public abstract class TupleState extends ContainerState
    * The map state is associated with a map vector. This vector is built
    * either during harvest time (normal maps) or on the fly (union maps.)
    */
-
   public static abstract class MapState extends TupleState {
 
     public MapState(LoaderInternals events,
@@ -366,7 +357,6 @@ public abstract class TupleState extends ContainerState
      * that maps are materialized regardless of nesting depth within
      * a union.
      */
-
     @Override
     protected boolean isVersioned() {
       return ((MapColumnState) parentColumn).isVersioned();
@@ -400,7 +390,6 @@ public abstract class TupleState extends ContainerState
      * map, then it is the writer itself. If this is a map array,
      * then the tuple is nested inside the array.
      */
-
     @Override
     public AbstractTupleWriter writer() {
       return (AbstractTupleWriter) parentColumn.writer().tuple();
@@ -420,7 +409,6 @@ public abstract class TupleState extends ContainerState
      * map, then it is the writer itself. If this is a map array,
      * then the tuple is nested inside the array.
      */
-
     @Override
     public AbstractTupleWriter writer() {
       return (AbstractTupleWriter) parentColumn.writer().array().tuple();
@@ -433,13 +421,11 @@ public abstract class TupleState extends ContainerState
    * query does not project; the result set loader creates a dummy column
    * and dummy writer, then does not project the column to the output.)
    */
-
   protected final List<ColumnState> columns = new ArrayList<>();
 
   /**
    * Internal writer schema that matches the column list.
    */
-
   protected final TupleMetadata schema = new TupleSchema();
 
   /**
@@ -455,7 +441,6 @@ public abstract class TupleState extends ContainerState
    * not defer columns because of the muddy semantics (and infrequent use)
    * of unions.
    */
-
   protected TupleMetadata outputSchema;
 
   private int prevHarvestIndex = -1;
@@ -479,7 +464,6 @@ public abstract class TupleState extends ContainerState
    * @return ordered list of column states for the columns within
    * this tuple
    */
-
   public List<ColumnState> columns() { return columns; }
 
   public TupleMetadata schema() { return writer().tupleSchema(); }
@@ -518,12 +502,10 @@ public abstract class TupleState extends ContainerState
   protected void updateOutput(int curSchemaVersion) {
 
     // Scan all columns
-
     for (int i = 0; i < columns.size(); i++) {
       final ColumnState colState = columns.get(i);
 
       // Ignore unprojected columns
-
       if (! colState.writer().isProjected()) {
         continue;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/BaseRequestedColumn.java
similarity index 51%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/BaseRequestedColumn.java
index da38550..1363631 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/BaseRequestedColumn.java
@@ -15,25 +15,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.impl.scan.project.projSet;
+package org.apache.drill.exec.physical.resultSet.project;
 
-import org.apache.drill.exec.physical.resultSet.ProjectionSet;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
+public abstract class BaseRequestedColumn implements RequestedColumn {
 
-public class ProjectedMapColumn extends ProjectedReadColumn {
+  private final RequestedTuple parent;
+  private final String name;
 
-  private final ProjectionSet mapProjection;
+  public BaseRequestedColumn(RequestedTuple parent, String name) {
+    this.parent = parent;
+    this.name = name;
+  }
+
+  public boolean isRoot() { return parent == null; }
+
+  @Override
+  public String name() { return name; }
+
+  @Override
+  public String fullName() {
+    final StringBuilder buf = new StringBuilder();
+    buildName(buf);
+    return buf.toString();
+  }
 
-  public ProjectedMapColumn(ColumnMetadata readSchema,
-      RequestedColumn requestedCol, ColumnMetadata outputSchema,
-      ProjectionSet mapProjection) {
-    super(readSchema, requestedCol, outputSchema, null);
-    this.mapProjection = mapProjection;
+  protected void buildName(StringBuilder buf) {
+    parent.buildName(buf);
+    buf.append('`')
+       .append(name)
+       .append('`');
   }
 
   @Override
-  public ProjectionSet mapProjection() {
-    return mapProjection;
+  public boolean nameEquals(String target) {
+    return name.equalsIgnoreCase(target);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ImpliedTupleRequest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ImpliedTupleRequest.java
index d44f8bf..3b76d7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ImpliedTupleRequest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ImpliedTupleRequest.java
@@ -20,14 +20,11 @@ package org.apache.drill.exec.physical.resultSet.project;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.drill.common.expression.PathSegment;
-
 /**
  * Represents a wildcard: SELECT * when used at the root tuple.
  * When used with maps, means selection of all map columns, either
  * implicitly, or because the map itself is selected.
  */
-
 public class ImpliedTupleRequest implements RequestedTuple {
 
   public static final RequestedTuple ALL_MEMBERS =
@@ -43,21 +40,11 @@ public class ImpliedTupleRequest implements RequestedTuple {
   }
 
   @Override
-  public ProjectionType projectionType(String colName) {
-    return allProjected
-      ? ProjectionType.GENERAL
-      : ProjectionType.UNPROJECTED;
-  }
-
-  @Override
   public RequestedTuple mapProjection(String colName) {
     return allProjected ? ALL_MEMBERS : NO_MEMBERS;
   }
 
   @Override
-  public void parseSegment(PathSegment child) { }
-
-  @Override
   public RequestedColumn get(String colName) { return null; }
 
   @Override
@@ -70,4 +57,19 @@ public class ImpliedTupleRequest implements RequestedTuple {
   public TupleProjectionType type() {
     return allProjected ? TupleProjectionType.ALL : TupleProjectionType.NONE;
   }
+
+  @Override
+  public boolean isProjected(String colName) {
+    return allProjected;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder()
+        .append("{");
+    if (allProjected) {
+      buf.append("*");
+    }
+    return buf.append("}").toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java
deleted file mode 100644
index 363f51d..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.resultSet.project;
-
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-
-/**
- * Specifies the type of projection obtained by parsing the
- * projection list. The type is returned from a query of the
- * form "how is this column projected, if at all?"
- * <p>
- * The projection type allows the scan framework to catch
- * inconsistencies, such as projecting an array as a map,
- * and so on.
- */
-
-public enum ProjectionType {
-
-  /**
-   * The column is not projected in the query.
-   */
-
-  UNPROJECTED,
-
-  /**
-   * Projection is a wildcard.
-   */
-  WILDCARD,     // *
-
-  /**
-   * Projection is by simple name. "General" means that
-   * we have no hints about the type of the column from
-   * the projection.
-   */
-
-  GENERAL,      // x
-
-  /**
-   * The column is projected as a scalar. This state
-   * requires metadata beyond the projection list and
-   * is returned only when that metadata is available.
-   */
-
-  SCALAR,       // x (from schema)
-
-  /**
-   * Applies to the parent of an x.y pair in projection: the
-   * existence of a dotted-member tells us that the parent
-   * must be a tuple (e.g. a Map.)
-   */
-
-  TUPLE,        // x.y
-
-  /**
-   * The projection includes an array suffix, so the column
-   * must be an array.
-   */
-
-  ARRAY,        // x[0]
-
-  /**
-   * Combination of array and map hints.
-   */
-
-  TUPLE_ARRAY,  // x[0].y
-
-  DICT, // x[0] or x['key'] (depends on key type)
-
-  DICT_ARRAY; // x[0][42] or x[0]['key'] (depends on key type)
-
-  public boolean isTuple() {
-    return this == ProjectionType.TUPLE || this == ProjectionType.TUPLE_ARRAY;
-  }
-
-  public boolean isArray() {
-    return this == ProjectionType.ARRAY || this == ProjectionType.TUPLE_ARRAY || this == DICT_ARRAY;
-  }
-
-  public boolean isDict() {
-    return this == DICT || this == DICT_ARRAY;
-  }
-
-  /**
-   * We can't tell, just from the project list, if a column must
-   * be scalar. A column of the form "a" could be a scalar, but
-   * that form is also consistent with maps and arrays.
-   */
-  public boolean isMaybeScalar() {
-    return this == GENERAL || this == SCALAR;
-  }
-
-  public static ProjectionType typeFor(MajorType majorType) {
-    boolean repeated = Types.isRepeated(majorType);
-    if (majorType.getMinorType() == MinorType.MAP) {
-      return repeated ? TUPLE_ARRAY : TUPLE;
-    } else if (majorType.getMinorType() == MinorType.DICT) {
-      return repeated ? DICT_ARRAY : DICT;
-    } else if (repeated || majorType.getMinorType() == MinorType.LIST) {
-      return ARRAY;
-    }
-    return SCALAR;
-  }
-
-  /**
-   * Reports if this type (representing an item in a projection list)
-   * is compatible with the projection type representing an actual
-   * column produced by an operator. The check is not symmetric.
-   * <p>
-   * For example, a column of type map array is compatible with a
-   * projection of type map "m.a" (project all a members of the map array),
-   * but a projection type of map array "m[1].a" is not compatible with
-   * a (non-array) map column.
-   *
-   * @param readType projection type, from {@link #typeFor(MajorType)},
-   * for an actual column
-   * @return true if this projection type is compatible with the
-   * column's projection type
-   */
-
-  public boolean isCompatible(ProjectionType readType) {
-    switch (readType) {
-    case UNPROJECTED:
-    case GENERAL:
-    case WILDCARD:
-      return true;
-    default:
-      break;
-    }
-
-    switch (this) {
-    case ARRAY:
-      return readType == ARRAY || readType == TUPLE_ARRAY
-          || readType == DICT // the actual key type should be validated later
-          || readType == DICT_ARRAY;
-    case TUPLE_ARRAY:
-      return readType == TUPLE_ARRAY || readType == DICT_ARRAY;
-    case SCALAR:
-      return readType == SCALAR;
-    case TUPLE:
-      return readType == TUPLE || readType == TUPLE_ARRAY || readType == DICT || readType == DICT_ARRAY;
-    case DICT:
-      return readType == DICT || readType == DICT_ARRAY;
-    case UNPROJECTED:
-    case GENERAL:
-    case WILDCARD:
-      return true;
-    default:
-      throw new IllegalStateException(toString());
-    }
-  }
-
-  public String label() {
-    switch (this) {
-    case SCALAR:
-      return "scalar (a)";
-    case ARRAY:
-      return "array (a[n])";
-    case TUPLE:
-      return "tuple (a.x)";
-    case TUPLE_ARRAY:
-      return "tuple array (a[n].x)";
-    case DICT:
-      return "dict (a['key'])";
-    case WILDCARD:
-      return "wildcard (*)";
-    default:
-      return name();
-    }
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/Projections.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/Projections.java
new file mode 100644
index 0000000..a14a68d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/Projections.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.project;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.ArraySegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.SchemaPath;
+
+/**
+ * Converts a projection list passed to an operator into a scan projection list,
+ * coalescing multiple references to the same column into a single reference.
+ */
+public class Projections {
+
+  private Projections() { }
+
+  public static RequestedTuple projectAll() {
+    return ImpliedTupleRequest.ALL_MEMBERS;
+  }
+
+  public static RequestedTuple projectNone() {
+    return ImpliedTupleRequest.NO_MEMBERS;
+  }
+
+  /**
+   * Parse a projection list. The list should consist of a list of column names;
+   * or wildcards. An empty list means
+   * nothing is projected. A null list means everything is projected (that is, a
+   * null list here is equivalent to a wildcard in the SELECT statement.)
+   * <p>
+   * The projection list may include both a wildcard and column names (as in
+   * the case of implicit columns.) This results in a final list that both
+   * says that everything is projected, and provides the list of columns.
+   * <p>
+   * Parsing is used at two different times. First, to parse the list from
+   * the physical operator. This has the case above: an explicit wildcard
+   * and/or additional columns. Then, this class is used again to prepare the
+   * physical projection used when reading. In this case, wildcards should
+   * be removed, implicit columns pulled out, and just the list of read-level
+   * columns should remain.
+   *
+   * @param projList
+   *          the list of projected columns, or null if no projection is to be
+   *          done
+   * @return a projection set that implements the specified projection
+   */
+  public static RequestedTuple parse(Collection<SchemaPath> projList) {
+    if (projList == null) {
+      return projectAll();
+    }
+    if (projList.isEmpty()) {
+      return projectNone();
+    }
+    RequestedTupleImpl tupleProj = new RequestedTupleImpl();
+    for (SchemaPath col : projList) {
+      parseMember(tupleProj, col.getRootSegment());
+    }
+    return tupleProj;
+  }
+
+  private static void parseMember(RequestedTupleImpl tuple, NameSegment nameSeg) {
+    RequestedColumn col = tuple.project(nameSeg.getPath());
+    if (!col.isWildcard()) {
+      RequestedColumnImpl colImpl = (RequestedColumnImpl) col;
+      parseChildSeg(colImpl, colImpl, nameSeg);
+    }
+  }
+
+  private static void parseChildSeg(RequestedColumnImpl column, QualifierContainer parent, PathSegment parentPath) {
+    if (parentPath.isLastPath()) {
+      parseLeaf(parent);
+    } else {
+      PathSegment seg = parentPath.getChild();
+      if (seg.isArray()) {
+        parseArraySeg(column, parent, (ArraySegment) seg);
+      } else {
+        parseMemberSeg(column, parent, (NameSegment) seg);
+      }
+    }
+  }
+
+  /**
+   * Parse a projection of the form {@code a}: that is, just a bare column.
+   */
+  private static void parseLeaf(QualifierContainer parent) {
+    Qualifier qual = parent.qualifier();
+    if (qual == null) {
+      // Nothing to do
+    } else if (qual.isArray()) {
+      qual.projectAllElements();
+    } else if (qual.isTuple()) {
+      qual.projectAllMembers();
+    }
+  }
+
+  private static void parseArraySeg(RequestedColumnImpl column, QualifierContainer parent, ArraySegment arraySeg) {
+    Qualifier prevQualifier = parent.qualifier();
+    Qualifier qualifier = parent.requireQualifier();
+    if (column.refCount() > 1 && (prevQualifier == null || !prevQualifier.isArray())) {
+      qualifier.projectAllElements();
+    } else {
+      qualifier.addIndex(arraySeg.getIndex());
+    }
+    parseChildSeg(column, qualifier, arraySeg);
+  }
+
+  private static void parseMemberSeg(RequestedColumnImpl column, QualifierContainer parent, NameSegment memberSeg) {
+    Qualifier prevQualifier = parent.qualifier();
+    Qualifier qualifier = parent.requireQualifier();
+    if (column.refCount() > 1 && (prevQualifier == null || !prevQualifier.isTuple())) {
+      qualifier.projectAllMembers();
+    } else {
+      RequestedTupleImpl tuple = qualifier.explicitMembers();
+      if (tuple != null) {
+        parseMember(tuple, memberSeg);
+      }
+    }
+  }
+
+  /**
+   * Create a requested tuple projection from a rewritten top-level
+   * projection list. The columns within the list have already been parsed to
+   * pick out arrays, maps and scalars. The list must not include the
+   * wildcard: a wildcard list must be passed in as a null list. An
+   * empty list means project nothing. Null list means project all, else
+   * project only the columns in the list.
+   *
+   * @param projList top-level, parsed columns
+   * @return the tuple projection for the top-level row
+   */
+  public static RequestedTuple build(List<RequestedColumn> projList) {
+    if (projList == null) {
+      return new ImpliedTupleRequest(true);
+    }
+    if (projList.isEmpty()) {
+      return projectAll();
+    }
+    return new RequestedTupleImpl(projList);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/Qualifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/Qualifier.java
new file mode 100644
index 0000000..976f968
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/Qualifier.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.project;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.TupleProjectionType;
+
+/**
+ * Represents one level of qualifier for a column. Analogous to
+ * a {@code SchemaPath}, but represents the result of coalescing
+ * multiple occurrences of the same column.
+ */
+public class Qualifier implements QualifierContainer {
+  /**
+   * Marker to indicate that that a) the item is an
+   * array, and b) that all indexes are to be projected.
+   * Used when seeing both a and a[x].
+   */
+  private static final Set<Integer> ALL_INDEXES = new HashSet<>();
+
+  private Set<Integer> indexes;
+  private RequestedTuple members;
+  private Qualifier child;
+
+  @Override
+  public Qualifier qualifier() { return child; }
+
+  @Override
+  public Qualifier requireQualifier() {
+    if (child == null) {
+      child = new Qualifier();
+    }
+    return child;
+  }
+
+  public boolean isArray() {
+    return indexes != null;
+  }
+
+  public boolean hasIndexes() {
+    return isArray() && indexes != ALL_INDEXES;
+  }
+
+  public boolean hasIndex(int index) {
+    return hasIndexes() && indexes.contains(index);
+  }
+
+  public int maxIndex() {
+    if (! hasIndexes()) {
+      return 0;
+    }
+    int max = 0;
+    for (final Integer index : indexes) {
+      max = Math.max(max, index);
+    }
+    return max;
+  }
+
+  public boolean[] indexArray() {
+    if (! hasIndexes()) {
+      return null;
+    }
+    final int max = maxIndex();
+    final boolean map[] = new boolean[max+1];
+    for (final Integer index : indexes) {
+      map[index] = true;
+    }
+    return map;
+  }
+
+  public boolean isTuple() {
+    return members != null || (child != null && child.isTuple());
+  }
+
+  public RequestedTuple tuple() {
+    if (members != null) {
+      return members;
+    } if (child != null) {
+      return child.tuple();
+    } else {
+      return null;
+    }
+  }
+
+  protected void addIndex(int index) {
+    if (indexes == null) {
+      indexes = new HashSet<>();
+    }
+    if (indexes != ALL_INDEXES) {
+      indexes.add(index);
+    }
+  }
+
+  protected void projectAllElements() {
+    indexes = ALL_INDEXES;
+  }
+
+  public int arrayDims() {
+    if (!isArray()) {
+      return 0;
+    } else if (child == null) {
+      return 1;
+    } else {
+      return 1 + child.arrayDims();
+    }
+  }
+
+  public void projectAllMembers() {
+    if (members == null || members.type() != TupleProjectionType.ALL) {
+      members = ImpliedTupleRequest.ALL_MEMBERS;
+    }
+  }
+
+  public RequestedTupleImpl explicitMembers() {
+    if (members == null) {
+      members = new RequestedTupleImpl();
+    }
+    if (members.type() == TupleProjectionType.SOME) {
+      return (RequestedTupleImpl) members;
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    if (isArray()) {
+      buf.append("[");
+      if (indexes == ALL_INDEXES) {
+        buf.append("*");
+      } else {
+        List<String> idxs = indexes.stream().sorted().map(i -> Integer.toString(i)).collect(Collectors.toList());
+        buf.append(String.join(", ", idxs));
+      }
+      buf.append("]");
+    }
+    if (members != null) {
+      buf.append(members.toString());
+    }
+    return buf.toString();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/UnresolvedColumnsArrayColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/QualifierContainer.java
similarity index 63%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/UnresolvedColumnsArrayColumn.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/QualifierContainer.java
index aba6926..ce8d485 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/UnresolvedColumnsArrayColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/QualifierContainer.java
@@ -15,16 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.impl.scan.columns;
+package org.apache.drill.exec.physical.resultSet.project;
 
-import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
-
-public class UnresolvedColumnsArrayColumn extends AbstractUnresolvedColumn {
-
-  public UnresolvedColumnsArrayColumn(RequestedColumn inCol) {
-    super(inCol);
-  }
-
-  public boolean[] selectedIndexes() { return inCol.indexes(); }
+public interface QualifierContainer {
+  Qualifier qualifier();
+  Qualifier requireQualifier();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumn.java
new file mode 100644
index 0000000..c074551
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumn.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.project;
+
+/**
+ * Plan-time properties of a requested column. Represents
+ * a consolidated view of the set of references to a column.
+ * For example, the project list might contain:
+ * <ul>
+ * <li>{@code SELECT *}</li>
+ * <li>{@code SELECT filename, *, dir0}</li>
+ * <li>{@code SELECT a, b, c}</li>
+ * <li>{@code SELECT columns[4], columns[8]}</li>
+ * <li>{@code SELECT a.b, a.c}</li>
+ * <li>{@code SELECT columns, columns[1]}</li>
+ * <li>{@code SELECT a, a.b}</li>
+ * </ul>
+ *
+ * In each case, the same column is referenced in different
+ * forms which are consolidated into this abstraction.
+ * <p>
+ * The resulting information is a "pattern": a form of reference.
+ * Given the requested column, code can check if some concrete
+ * reader-provided column is consistent with the requested
+ * projection or not. The project
+ * list does not contain sufficient information to definitively pick
+ * a type; it only excludes certain types.
+ * <p>
+ * Even for complex types, we cannot definitively know the type.
+ * For example, the projection {@code a[0]} could either refer to an
+ * array (of any type), <b>or</b> a {@code DICT} with integer keys.
+ * Similarly, a projection of the form {@code a.b} can either refer
+ * to a member of a map, or the {@code "b"} string key of a
+ * {@code DICT} column.
+ *
+ * <h4>Compatibility Rules</h4>
+ *
+ * The pattern given by projection is consistent with certain concrete types
+ * as follows. + means any number of additional qualifiers. Note that the
+ * following list is conceptual based on observed practice; the actual
+ * implementation may be more restrictive.
+ * <p>
+ * <table>
+ * <tr><th>Type</th><th>Consistent with</th></tr>
+ * <tr><td>Non-repeated MAP</td>
+ *     <td>{@code a}, {@code a.b}</td></tr>
+ * <tr><td>Repeated MAP</td>
+ *     <td>{@code a}, {@code a.b}, {@code a[n].b}</td></tr>
+ * <tr><td>Non-repeated Scalar</td>
+ *     <td>{@code a}</td></tr>
+ * <tr><td>Repeated Scalar</td>
+ *     <td>{@code a}, {@code a[n]}</td></tr>
+ * <tr><td>Non-repeated DICT</td>
+ *     <td>{@code a}, {@code a[n]}, {@code a['key']}</td></tr>
+ * <tr><td>Repeated DICT</td>
+ *     <td>{@code a}, {@code a[n]}, {@code a['key']}, {@code a[n][m]}, {@code a[n]['key']}</td></tr>
+ * <tr><td>Non-repeated LIST</td>
+ *     <td>{@code a}, {@code a[n]}</td></tr>
+ * <tr><td>Repeated LIST</td>
+ *     <td>{@code a}, {@code a[n]}, {@code a[n][n]}</td></tr>
+ * </table>
+ * <p>
+ * MAP, DICT, UNION and LIST are structured types: projection can reach
+ * into the structure to any number of levels. In such a case, when sufficient
+ * schema information is available, the above rules can be applied recursively
+ * to each level of structure. The recursion can be done in the class for a
+ * DICT (since there is only one child type), but must be external for other
+ * complex types. For MAP, the column can report which specific members
+ * are projected.
+ * <p>
+ * The Text reader allows the {@code columns} column, which allows the
+ * user to specify indexes. This class reports which indexes were actually
+ * selected. Index information is available only at the top level, but
+ * not for 2+ dimensions.
+ */
+public interface RequestedColumn {
+
+  /**
+   * The column name as projected. If the same column appears multiple
+   * times (as in {@code a[1], A[2]}, then the case of the first appearance
+   * is used.
+   *
+   * @return the column name as observed in the project list
+   */
+  String name();
+
+  /**
+   * Returns the fully-qualified column name. If the column is in the
+   * top-level tuple, this is the same as {@code name()}. If the column
+   * is nested in an array, then this name includes the enclosing
+   * columns: {@code a.b.c}.
+   *
+   * @return the full name with enclosing map prefixes, if any
+   */
+  String fullName();
+
+  /**
+   * Case-insensitive comparison of the column name.
+   */
+  boolean nameEquals(String target);
+
+  /**
+   * Several consumers of this this mechanism process the "raw" projection list
+   * which can contain a combination of wildcard and otehr columns. For example:
+   * {@code filename, *, dir0}. The requested tuple preserves the wildcard
+   * within the projection list so that, say, the projection mechanism can insert
+   * the actual data columns between the two implicit columns in the example.
+   * <p>
+   * If a column is a wildcard, then none of the other methods apply, since
+   * this projected column represents any number or actual columns.
+   *
+   * @return if this column is the wildcard placeholder
+   */
+  boolean isWildcard();
+
+  /**
+   * @return true if this column has no qualifiers. Example:
+   * {@code a}.
+   */
+  boolean isSimple();
+
+  /**
+   * Report whether the projection implies a tuple. Example:
+   * {@code a.b}. Not that this method, and others can only tell
+   * if the projection implies a tuple; the actual column may
+   * be a tuple (MAP), but be projected simply. The map
+   * format also describes a DICT with a VARCHAR key.
+   *
+   * @return true if the column has a map-like projection.
+   */
+  boolean isTuple();
+
+  /**
+   * Return projection information for the column as a tuple. If
+   * projection included references to nested columns (such as
+   * {@code a.b, a.c}, then the tuple projection will list only
+   * the referenced columns. However, if projection is generic
+   * ({@code m}), then we presume all columns of the map are projected
+   * and the returned object assumes all members are projected.
+   *
+   * @return projection information for a (presumed) map column
+   */
+  RequestedTuple tuple();
+
+  /**
+   * Report whether the first qualifier is an array.
+   * Example: {@code a[1]}. The array format also describes
+   * a DICT with an integer key.
+   * @return true if the column must be an array.
+   */
+  boolean isArray();
+
+  /**
+   * If {@code isArray()} returns true, reports the number of dimensions
+   * observed in projection. That is if projection is {@code a[0][1]},
+   * then this method returns 2.
+   * <p>
+   * Note that, as with all projection-level information, this number
+   * reflects only what was in the project list; not what might be
+   * the number of dimensions in the actual input source.
+   *
+   * @return the maximum number of array dimensions observed in the
+   * projection list, or 0 if this column was not observed to be an
+   * array (if {@code isArray()} returns {@code false}.
+   */
+  int arrayDims();
+
+  /**
+   * Reports if the projection list included (only) specific element
+   * indexes. For example: {@code a[2], a[5]}. The user could also project
+   * both indexes and the array: {@code a[0], a}. In this case
+   * {@code isArray()} is {code true}, but {@code hasIndexes()} is {@code false}.
+   *
+   * @return {@code true} if the column has enumerated indexes, {@code false}
+   * if the column was also projected as a whole, or if this column
+   * was not observed to be an array
+   */
+  boolean hasIndexes();
+
+  /**
+   * Return the maximum index value, if only explicit indexes were given.
+   * Valid if {@code hasIndexes()} returns true.
+   *
+   * @return the maximum array index value known to the projection, or
+   * 0 if {@code isArray()} is {@code false}. Also returns 0 if
+   * {@code hasIndexe()} returns {@code false}, meaning that either
+   * the column was not observed to be an array, or was projected
+   * with both indexes and by itself: {@code a[0], a}.
+   */
+  int maxIndex();
+
+  /**
+    * Return a bitmap of the selected indexes. Only valid if
+    * {@code hasIndexes()} returns {@code true}.
+    * @return a bitmap of the selected array indexes, or {@code null}
+    * if {@code hasIndexes()} returns {@code false}.
+    */
+  boolean[] indexes();
+
+  /**
+   * Report is a specific index was selected. Short cut for the other
+   * array methods. Used in cases such as the {@code columns} column where
+   * the user can select specific elements (column) but not others.
+   *
+   * @param index the array index to check
+   * @return {@code true} if the array element was projected, either
+   * explicitly ({@code a[3]}) or implicitly ({@code a}). Returns
+   * {@code false} <i>only</i> if {@code hasIndexes()} returns
+   * {@code true} (the user listed only explicit indexes) and the
+   * requested index is not among those requested ({@code index >=
+   * maxIndex() || !indexes()[index]})
+   */
+  boolean hasIndex(int index);
+
+  /**
+   * The internal qualifier information for the column. Generally not
+   * needed by clients; use the other informations to interpret the
+   * qualifier for you.
+   *
+   * @return detailed column qualifier information, if the column was
+   * seen to be complex in the project list
+   */
+  Qualifier qualifier();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java
index 55a252d..a764843 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java
@@ -17,226 +17,99 @@
  */
 package org.apache.drill.exec.physical.resultSet.project;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.drill.common.expression.PathSegment.NameSegment;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
 
 /**
  * Represents one name element. Like a {@link NameSegment}, except that this
  * version is an aggregate. If the projection list contains `a.b` and `a.c`,
  * then one name segment exists for a, and contains segments for both b and c.
  */
+public class RequestedColumnImpl extends BaseRequestedColumn implements QualifierContainer {
 
-public class RequestedColumnImpl implements RequestedColumn {
-
-  /**
-   * Special marker to indicate that that a) the item is an
-   * array, and b) that all indexes are to be projected.
-   * Used when seeing both a and a[x].
-   */
-
-  private static final Set<Integer> ALL_INDEXES = new HashSet<>();
-
-  private final RequestedTuple parent;
-  private final String name;
-  private RequestedTuple members;
-  private Set<Integer> indexes;
-  private ProjectionType type;
+  private int refCount = 1;
+  private Qualifier qualifier;
 
   public RequestedColumnImpl(RequestedTuple parent, String name) {
-    this.parent = parent;
-    this.name = name;
-    setType();
-  }
-
-  public RequestedColumnImpl(RequestedTuple parent, String name, ProjectionType type) {
-    this.parent = parent;
-    this.name = name;
-    this.type = type;
+    super(parent, name);
   }
 
-  @Override
-  public String name() { return name; }
-  @Override
-  public ProjectionType type() { return type; }
-  @Override
-  public boolean isWildcard() { return type == ProjectionType.WILDCARD; }
-  @Override
-  public boolean isSimple() { return type == ProjectionType.GENERAL; }
+  protected void bumpRefCount() { refCount++; }
 
-  @Override
-  public boolean isArray() { return type.isArray(); }
+  public int refCount() { return refCount; }
 
   @Override
-  public boolean isTuple() { return type.isTuple(); }
+  public Qualifier qualifier() { return qualifier; }
 
   @Override
-  public boolean isDict() {
-    return type.isDict();
-  }
-
-  public RequestedTuple asTuple() {
-    if (members == null) {
-      members = new RequestedTupleImpl(this);
-      setType();
-    }
-    return members;
-  }
-
-  public RequestedTuple projectAllMembers(boolean projectAll) {
-    members = projectAll ? ImpliedTupleRequest.ALL_MEMBERS : ImpliedTupleRequest.NO_MEMBERS;
-    setType();
-    return members;
-  }
-
-  public void addIndex(int index) {
-    if (indexes == null) {
-      indexes = new HashSet<>();
-    }
-    if (indexes != ALL_INDEXES) {
-      indexes.add(index);
+  public Qualifier requireQualifier() {
+    if (qualifier == null) {
+      qualifier = new Qualifier();
     }
-    setType();
-  }
-
-  public void projectAllElements() {
-    indexes = ALL_INDEXES;
-    setType();
+    return qualifier;
   }
 
   @Override
-  public boolean hasIndexes() {
-    return indexes != null && indexes != ALL_INDEXES;
-  }
+  public boolean isWildcard() { return false; }
 
   @Override
-  public boolean hasIndex(int index) {
-    return hasIndexes() ? indexes.contains(index) : false;
-  }
+  public boolean isSimple() { return qualifier == null; }
 
   @Override
-  public int maxIndex() {
-    if (! hasIndexes()) {
-      return 0;
-    }
-    int max = 0;
-    for (final Integer index : indexes) {
-      max = Math.max(max, index);
-    }
-    return max;
+  public boolean isTuple() {
+    return qualifier != null && qualifier.isTuple();
   }
 
   @Override
-  public boolean[] indexes() {
-    if (! hasIndexes()) {
-      return null;
-    }
-    final int max = maxIndex();
-    final boolean map[] = new boolean[max+1];
-    for (final Integer index : indexes) {
-      map[index] = true;
+  public RequestedTuple tuple() {
+    if (!isTuple()) {
+      return ImpliedTupleRequest.ALL_MEMBERS;
     }
-    return map;
+    return qualifier.tuple();
   }
 
   @Override
-  public String fullName() {
-    final StringBuilder buf = new StringBuilder();
-    buildName(buf);
-    return buf.toString();
+  public boolean isArray() {
+    return qualifier != null && qualifier.isArray();
   }
 
-  public boolean isRoot() { return parent == null; }
-
-  private void setType() {
-    if (name.equals(SchemaPath.DYNAMIC_STAR)) {
-      type = ProjectionType.WILDCARD;
-    } else if (indexes != null && members != null) {
-      type = ProjectionType.TUPLE_ARRAY;
-    }
-    else if (indexes != null) {
-      type = ProjectionType.ARRAY;
-    } else if (members != null) {
-      type = ProjectionType.TUPLE;
-    } else {
-      type = ProjectionType.GENERAL;
-    }
-  }
-
-  protected void buildName(StringBuilder buf) {
-    parent.buildName(buf);
-    buf.append('`')
-       .append(name)
-       .append('`');
+  @Override
+  public boolean hasIndexes() {
+    return qualifier != null && qualifier.hasIndexes();
   }
 
   @Override
-  public String summary() {
-    switch (type) {
-    case ARRAY:
-      return "array column";
-    case TUPLE:
-      return "map column";
-    case TUPLE_ARRAY:
-      return "repeated map";
-    case DICT:
-      return "dict column";
-    case DICT_ARRAY:
-      return "repeated dict column";
-    case WILDCARD:
-      return "wildcard";
-    default:
-      return "column";
-    }
+  public boolean hasIndex(int index) {
+    return qualifier != null && qualifier.hasIndex(index);
   }
 
   @Override
-  public boolean nameEquals(String target) {
-    return name.equalsIgnoreCase(target);
+  public int maxIndex() {
+    return qualifier == null ? 0 : qualifier.maxIndex();
   }
 
   @Override
-  public RequestedTuple mapProjection() {
-    switch (type) {
-    case ARRAY:
-    case GENERAL:
-      // Don't know if the target is a tuple or not.
-
-      return ImpliedTupleRequest.ALL_MEMBERS;
-    case TUPLE:
-    case TUPLE_ARRAY:
-      return members == null ? ImpliedTupleRequest.ALL_MEMBERS : members;
-    case UNPROJECTED:
-      return ImpliedTupleRequest.NO_MEMBERS;
-    default:
-      return null;
-    }
+  public boolean[] indexes() {
+    return qualifier == null ? null : qualifier.indexArray();
   }
 
+  /**
+   * Convert the projection to a string of the form:
+   * {@code a[0,1,4]['*']{b, c d}}.
+   * The information here s insufficient to specify a type,
+   * it only specifies a pattern to which types are compatible.
+   */
   @Override
   public String toString() {
-    final StringBuilder buf = new StringBuilder();
-    buf
-      .append("[")
-      .append(getClass().getSimpleName())
-      .append(" name=")
-      .append(name())
-      .append(", type=")
-      .append(summary());
-    if (isArray()) {
-      buf
-        .append(", array=")
-        .append(indexes);
-    }
-    if (isTuple()) {
-      buf
-        .append(", tuple=")
-        .append(members);
+    final StringBuilder buf = new StringBuilder()
+        .append(name());
+    if (qualifier != null) {
+      buf.append(qualifier.toString());
     }
-    buf.append("]");
     return buf.toString();
   }
+
+  @Override
+  public int arrayDims() {
+    return qualifier == null ? 0 : qualifier.arrayDims();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java
index d9b3e1a..1279299 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java
@@ -19,13 +19,10 @@ package org.apache.drill.exec.physical.resultSet.project;
 
 import java.util.List;
 
-import org.apache.drill.common.expression.PathSegment;
-
 /**
  * Represents the set of columns projected for a tuple (row or map.)
- * The projected columns might themselves be columns, so returns a
- * projection set for such columns. Represents the set of requested
- * columns and tuples as expressed in the physical plan.
+ * Each column may have structure: a set of referenced names or
+ * array indices.
  * <p>
  * Three variations exist:
  * <ul>
@@ -45,55 +42,22 @@ import org.apache.drill.common.expression.PathSegment;
  * projection set which the code can query to determine if a newly
  * added column is wanted (and so should have a backing vector) or
  * is unwanted (and can just receive a dummy writer.)
+ * <p>
+ * Wildcards will set the projection type to {@code ALL}, and will
+ * be retained in the projection list. Retaining the wildcard
+ * is important because multiple consumers insert columns at the
+ * wildcard position. For example:<br>
+ * {@code SELECT filename, *, filepath FROM ...}
  */
-
 public interface RequestedTuple {
 
-  /**
-   * Plan-time properties of a requested column. Represents
-   * a consolidated view of the set of references to a column.
-   * For example, the project list might contain:<br>
-   * <tt>SELECT columns[4], columns[8]</tt><br>
-   * <tt>SELECT a.b, a.c</tt><br>
-   * <tt>SELECT columns, columns[1]</tt><br>
-   * <tt>SELECT a, a.b</tt><br>
-   * In each case, the same column is referenced in different
-   * forms which are consolidated in to this abstraction.
-   * <p>
-   * Depending on the syntax, we can infer if a column must
-   * be an array or map. This is definitive: though we know that
-   * columns of the form above must be an array or a map,
-   * we cannot know if a simple column reference might refer
-   * to an array or map.
-   */
-
-  public interface RequestedColumn {
-
-    String name();
-    ProjectionType type();
-    boolean isWildcard();
-    boolean isSimple();
-    boolean isArray();
-    boolean isTuple();
-    boolean isDict();
-    String fullName();
-    RequestedTuple mapProjection();
-    boolean nameEquals(String target);
-    int maxIndex();
-    boolean[] indexes();
-    boolean hasIndexes();
-    boolean hasIndex(int index);
-    String summary();
-  }
-
   public enum TupleProjectionType {
     ALL, NONE, SOME
   }
 
   TupleProjectionType type();
-  void parseSegment(PathSegment child);
   RequestedColumn get(String colName);
-  ProjectionType projectionType(String colName);
+  boolean isProjected(String colName);
   RequestedTuple mapProjection(String colName);
   List<RequestedColumn> projections();
   void buildName(StringBuilder buf);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java
index 3e91466..dc0c8e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java
@@ -17,19 +17,14 @@
  */
 package org.apache.drill.exec.physical.resultSet.project;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.PathSegment.ArraySegment;
-import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.record.metadata.TupleNameSpace;
 
 /**
- * Represents an explicit projection at some tuple level.
+ * Represents an explicit projection at some tuple level. A tuple is the
+ * top-level row or a map.
  * <p>
  * A column is projected if it is explicitly listed in the selection list.
  * <p>
@@ -66,22 +61,41 @@ import org.apache.drill.exec.record.metadata.TupleNameSpace;
  * <li><tt>ArraySegment</tt> is the other kind of name part and represents
  * an array index such as the "[1]" in `columns`[1].</li>
  * <ul>
- * The parser here consumes only names, this mechanism does not consider
- * array indexes. As a result, there may be multiple projected columns that
- * map to the same projection here: `columns`[1] and `columns`[2] both map to
- * the name `columns`, for example.
+ * The parser considers names and array indexes. Example:<pre><code>
+ * a
+ * a.b
+ * a[2]
+ * a[2].b
+ * a[1][2][3]
+ * a[1][2][3].b.c
+ * a['foo'][0].b['bar']
+ * </code></pre>
+ *
+ * <h4>Usage</h4>
+ * The projection information is a <i>pattern</i> which supports queries of the
+ * form "is this column projected", and "if projected, is the projection consistent
+ * with such-and-so concrete type?" Clients should not try to work out the
+ * meaning of the pattern: doing so is very complex. Instead, do the following:
+ *
+ * <pre><code>
+ * String colName = ...;
+ * ColumnMetadata colDef = ...;
+ * InputTupleProjection tupleProj = ...
+ * if (tupleProj.isProjected(colName)) {
+ *   if (!tupleProj.isComsistentWith(colDef)) {
+ *     // Raise an error
+ *   }
+ *   // Handle a projected column.
+ * }</code></pre>
  */
-
 public class RequestedTupleImpl implements RequestedTuple {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class);
-  private static final Collection<SchemaPath> PROJECT_ALL = Collections.singletonList(SchemaPath.STAR_COLUMN);
-
   private final RequestedColumnImpl parent;
+  protected TupleProjectionType projectionType = TupleProjectionType.SOME;
   private final TupleNameSpace<RequestedColumn> projection = new TupleNameSpace<>();
 
   public RequestedTupleImpl() {
-    parent = null;
+    this.parent = null;
   }
 
   public RequestedTupleImpl(RequestedColumnImpl parent) {
@@ -100,194 +114,26 @@ public class RequestedTupleImpl implements RequestedTuple {
     return projection.get(colName.toLowerCase());
   }
 
-  private RequestedColumnImpl getImpl(String colName) {
+  protected RequestedColumnImpl getImpl(String colName) {
     return (RequestedColumnImpl) get(colName);
   }
 
-  @Override
-  public ProjectionType projectionType(String colName) {
+  protected RequestedColumn project(String colName) {
     RequestedColumn col = get(colName);
-    return col == null ? ProjectionType.UNPROJECTED : col.type();
-  }
-
-  @Override
-  public RequestedTuple mapProjection(String colName) {
-    RequestedColumnImpl col = getImpl(colName);
-    RequestedTuple mapProj = (col == null) ? null : col.mapProjection();
-    if (mapProj != null) {
-      return mapProj;
-    }
-
-    // No explicit information for the map. Members inherit the
-    // same projection as the map itself.
-
     if (col != null) {
-      return col.projectAllMembers(true);
-    }
-    return ImpliedTupleRequest.NO_MEMBERS;
-  }
-
-  /**
-   * Create a requested tuple projection from a rewritten top-level
-   * projection list. The columns within the list have already been parsed to
-   * pick out arrays, maps and scalars. The list must not include the
-   * wildcard: a wildcard list must be passed in as a null list. An
-   * empty list means project nothing. Null list means project all, else
-   * project only the columns in the list.
-   *
-   * @param projList top-level, parsed columns
-   * @return the tuple projection for the top-leel row
-   */
-
-  public static RequestedTuple build(List<RequestedColumn> projList) {
-    if (projList == null) {
-      return new ImpliedTupleRequest(true);
-    }
-    if (projList.isEmpty()) {
-      return ImpliedTupleRequest.NO_MEMBERS;
-    }
-    return new RequestedTupleImpl(projList);
-  }
-
-  /**
-   * Parse a projection list. The list should consist of a list of column names;
-   * or wildcards. An empty list means
-   * nothing is projected. A null list means everything is projected (that is, a
-   * null list here is equivalent to a wildcard in the SELECT statement.)
-   * <p>
-   * The projection list may include both a wildcard and column names (as in
-   * the case of implicit columns.) This results in a final list that both
-   * says that everything is projected, and provides the list of columns.
-   * <p>
-   * Parsing is used at two different times. First, to parse the list from
-   * the physical operator. This has the case above: an explicit wildcard
-   * and/or additional columns. Then, this class is used again to prepare the
-   * physical projection used when reading. In this case, wildcards should
-   * be removed, implicit columns pulled out, and just the list of read-level
-   * columns should remain.
-   *
-   * @param projList
-   *          the list of projected columns, or null if no projection is to be
-   *          done
-   * @return a projection set that implements the specified projection
-   */
-
-  public static RequestedTuple parse(Collection<SchemaPath> projList) {
-    if (projList == null) {
-      projList = PROJECT_ALL;
-    }
-    else if (projList.isEmpty()) {
-      return ImpliedTupleRequest.NO_MEMBERS;
-    }
-    RequestedTupleImpl projSet = new RequestedTupleImpl();
-    for (SchemaPath col : projList) {
-      projSet.parseSegment(col.getRootSegment());
-    }
-    return projSet;
-  }
-
-  @Override
-  public void parseSegment(PathSegment pathSeg) {
-    if (pathSeg.isLastPath()) {
-      parseLeaf((NameSegment) pathSeg);
-    } else if (pathSeg.getChild().isArray()) {
-      parseArray((NameSegment) pathSeg);
-    } else {
-      parseInternal((NameSegment) pathSeg);
-    }
-  }
-
-  private void parseLeaf(NameSegment nameSeg) {
-    String name = nameSeg.getPath();
-    RequestedColumnImpl member = getImpl(name);
-    if (member == null) {
-      projection.add(name, new RequestedColumnImpl(this, name));
-      return;
-    }
-    if (member.isSimple() || member.isWildcard()) {
-      throw UserException
-        .validationError()
-        .message("Duplicate column in project list: %s",
-            member.fullName())
-        .build(logger);
-    }
-    if (member.isArray()) {
-
-      // Saw both a and a[x]. Occurs in project list.
-      // Project all elements.
-
-      member.projectAllElements();
-      return;
-    }
-
-    // Else the column is a known map.
-
-    assert member.isTuple();
-
-    // Allow both a.b (existing) and a (this column)
-    // Since we we know a is a map, and we've projected the
-    // whole map, modify the projection of the column to
-    // project the entire map.
-
-    member.projectAllMembers(true);
-  }
-
-  private void parseInternal(NameSegment nameSeg) {
-    String name = nameSeg.getPath();
-    RequestedColumnImpl member = getImpl(name);
-    RequestedTuple map;
-    if (member == null) {
-      // New member. Since this is internal, this new member
-      // must be a map.
-
-      member = new RequestedColumnImpl(this, name);
-      projection.add(name, member);
-      map = member.asTuple();
-    } else if (member.isTuple()) {
-
-      // Known map. Add to it.
-
-      map = member.asTuple();
+      if (col instanceof RequestedColumnImpl) {
+        ((RequestedColumnImpl) col).bumpRefCount();
+      }
     } else {
-
-      // Member was previously projected by itself. We now
-      // know it is a map. So, project entire map. (Earlier
-      // we saw `a`. Now we see `a`.`b`.)
-
-      map = member.projectAllMembers(true);
-    }
-    map.parseSegment(nameSeg.getChild());
-  }
-
-  private void parseArray(NameSegment nameSeg) {
-    String name = nameSeg.getPath();
-    ArraySegment arraySeg = ((ArraySegment) nameSeg.getChild());
-    int index = arraySeg.getIndex();
-    RequestedColumnImpl member = getImpl(name);
-    if (member == null) {
-      member = new RequestedColumnImpl(this, name);
-      projection.add(name, member);
-    } else if (member.isSimple()) {
-
-      // Saw both a and a[x]. Occurs in project list.
-      // Project all elements.
-
-      member.projectAllElements();
-      return;
-    }
-
-    // Allow duplicate indexes. Example: z[0], z[0]['orange']
-    if (!member.hasIndex(index)) {
-      member.addIndex(index);
-    }
-
-    // Drills SQL parser does not support map arrays: a[0].c
-    // But, the SchemaPath does support them, so no harm in
-    // parsing them here.
-
-    if (! arraySeg.isLastPath()) {
-      parseInternal(nameSeg);
+      if (colName.equals(SchemaPath.DYNAMIC_STAR)) {
+        projectionType = TupleProjectionType.ALL;
+        col = new RequestedWildcardColumn(this, colName);
+      } else {
+        col = new RequestedColumnImpl(this, colName);
+      }
+      projection.add(colName, col);
     }
+    return col;
   }
 
   @Override
@@ -309,14 +155,39 @@ public class RequestedTupleImpl implements RequestedTuple {
    */
   @Override
   public TupleProjectionType type() {
-    if (projection.isEmpty()) {
-      return TupleProjectionType.NONE;
+    return projectionType;
+  }
+
+  @Override
+  public boolean isProjected(String colName) {
+    return projectionType == TupleProjectionType.ALL ? true : get(colName) != null;
+  }
+
+  @Override
+  public RequestedTuple mapProjection(String colName) {
+    switch (projectionType) {
+      case ALL:
+        return ImpliedTupleRequest.ALL_MEMBERS;
+      case NONE:
+        return ImpliedTupleRequest.NO_MEMBERS;
+      default:
+        RequestedColumnImpl colProj = getImpl(colName);
+        return colProj == null ? ImpliedTupleRequest.NO_MEMBERS : colProj.tuple();
     }
-    for (RequestedColumn col : projection) {
-      if (col.isWildcard()) {
-        return TupleProjectionType.ALL;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder()
+        .append("{");
+    boolean first = true;
+    for (RequestedColumn col : projections()) {
+      if (!first) {
+        buf.append(", ");
       }
+      first = false;
+      buf.append(col.toString());
     }
-    return TupleProjectionType.SOME;
+    return buf.append("}").toString();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedWildcardColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedWildcardColumn.java
new file mode 100644
index 0000000..179ac56
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedWildcardColumn.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.project;
+
+public class RequestedWildcardColumn extends BaseRequestedColumn {
+
+  public RequestedWildcardColumn(RequestedTuple parent, String name) {
+    super(parent, name);
+  }
+
+  @Override
+  public boolean isWildcard() { return true; }
+
+  @Override
+  public boolean isSimple() { return true; }
+
+  @Override
+  public boolean isTuple() { return false; }
+
+  @Override
+  public RequestedTuple tuple() { return null; }
+
+  @Override
+  public boolean isArray() { return false; }
+
+  @Override
+  public boolean hasIndexes() { return false; }
+
+  @Override
+  public int maxIndex() { return 0; }
+
+  @Override
+  public boolean[] indexes() { return null; }
+
+  @Override
+  public boolean hasIndex(int index) { return false; }
+
+  @Override
+  public String toString() { return name(); }
+
+  @Override
+  public int arrayDims() { return 0; }
+
+  @Override
+  public Qualifier qualifier() { return null; }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
index 61f1cca..74f20e3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
@@ -50,7 +50,6 @@ import org.junit.experimental.categories.Category;
  * Test the "columns" array mechanism integrated with the scan schema
  * orchestrator including simulating reading data.
  */
-
 @Category(RowSetTests.class)
 public class TestColumnsArray extends SubOperatorTest {
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
index e6b897d..af81aaf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
@@ -55,7 +55,6 @@ import static org.junit.Assert.assertTrue;
 /**
  * Test the columns-array specific behavior in the columns scan framework.
  */
-
 @Category(RowSetTests.class)
 public class TestColumnsArrayFramework extends SubOperatorTest {
 
@@ -145,7 +144,6 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
    * Test including a column other than "columns". Occurs when
    * using implicit columns.
    */
-
   @Test
   public void testNonColumnsProjection() {
 
@@ -178,7 +176,6 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
   /**
    * Test projecting just the `columns` column.
    */
-
   @Test
   public void testColumnsProjection() {
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
index 6d955be..1adf539 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
@@ -48,7 +48,6 @@ public class TestColumnsArrayParser extends SubOperatorTest {
    * as an array. No need for early schema. This case is special: it actually
    * creates the one and only table column to match the desired output column.
    */
-
   @Test
   public void testColumnsArray() {
     ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -89,7 +88,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
         RowSetTestUtils.projectAll(),
         ScanTestUtils.parsers(new ColumnsArrayParser(true)));
 
-    assertFalse(scanProj.projectAll());
+    assertTrue(scanProj.projectAll());
     assertEquals(1, scanProj.requestedCols().size());
 
     assertEquals(1, scanProj.columns().size());
@@ -154,7 +153,6 @@ public class TestColumnsArrayParser extends SubOperatorTest {
    * <p>
    * TODO: This should only be true for text readers, make this an option.
    */
-
   @Test
   public void testErrorColumnsArrayAndColumn() {
     try {
@@ -170,7 +168,6 @@ public class TestColumnsArrayParser extends SubOperatorTest {
   /**
    * Exclude a column and `columns` (reversed order of previous test).
    */
-
   @Test
   public void testErrorColumnAndColumnsArray() {
     try {
@@ -184,19 +181,17 @@ public class TestColumnsArrayParser extends SubOperatorTest {
   }
 
   /**
-   * Can't request `columns` twice.
+   * Requesting `columns` twice: second is ignored.
    */
-
   @Test
-  public void testErrorTwoColumnsArray() {
-    try {
-      ScanLevelProjection.build(
-          RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL, ColumnsScanFramework.COLUMNS_COL),
-          ScanTestUtils.parsers(new ColumnsArrayParser(false)));
-      fail();
-    } catch (UserException e) {
-      // Expected
-    }
+  public void testTwoColumnsArray() {
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
+        RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL, ColumnsScanFramework.COLUMNS_COL),
+        ScanTestUtils.parsers(new ColumnsArrayParser(false)));
+    assertFalse(scanProj.projectAll());
+    assertEquals(2, scanProj.requestedCols().size());
+    assertEquals(1, scanProj.columns().size());
+    assertEquals(ColumnsScanFramework.COLUMNS_COL, scanProj.columns().get(0).name());
   }
 
   @Test
@@ -235,7 +230,6 @@ public class TestColumnsArrayParser extends SubOperatorTest {
    * The `columns` column is special: can't be used with other column names.
    * Make sure that the rule <i>does not</i> apply to implicit columns.
    */
-
   @Test
   public void testMetadataColumnsWithColumnsArray() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
@@ -274,7 +268,6 @@ public class TestColumnsArrayParser extends SubOperatorTest {
    * includes both the wildcard and the `columns` array.
    * We can ignore one of them.
    */
-
   @Test
   public void testWildcardAndColumns() {
     ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -283,7 +276,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
             ColumnsScanFramework.COLUMNS_COL),
         ScanTestUtils.parsers(new ColumnsArrayParser(true)));
 
-    assertFalse(scanProj.projectAll());
+    assertTrue(scanProj.projectAll());
     assertEquals(2, scanProj.requestedCols().size());
 
     assertEquals(1, scanProj.columns().size());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
index e115ad9..dfb4f08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
@@ -78,7 +78,6 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
    * Test including file metadata (AKA "implicit columns") in the project
    * list.
    */
-
   @Test
   public void testFileMetadataColumnSelection() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
@@ -121,7 +120,6 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
   /**
    * Verify that partition columns, in any case, work.
    */
-
   @Test
   public void testPartitionColumnSelection() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
@@ -152,7 +150,6 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
   /**
    * Test wildcard expansion.
    */
-
   @Test
   public void testRevisedWildcard() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
@@ -170,48 +167,14 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
   }
 
   /**
-   * Legacy (prior version) wildcard expansion always expands partition
-   * columns.
-   */
-
-  @Test
-  public void testLegacyWildcard() {
-    Path filePath = new Path("hdfs:///w/x/y/z.csv");
-    FileMetadataOptions options = standardOptions(filePath);
-    options.useLegacyWildcardExpansion(true);
-    options.useLegacyExpansionLocation(true);
-    // Max partition depth is 3, though this "scan" sees only 2
-    options.setPartitionDepth(3);
-    FileMetadataManager metadataManager = new FileMetadataManager(
-        fixture.getOptionManager(),
-        options);
-
-    ScanLevelProjection scanProj = ScanLevelProjection.build(
-        RowSetTestUtils.projectAll(),
-        Lists.newArrayList(metadataManager.projectionParser()));
-
-    List<ColumnProjection> cols = scanProj.columns();
-    assertEquals(4, cols.size());
-    assertTrue(scanProj.columns().get(0) instanceof UnresolvedWildcardColumn);
-    assertTrue(scanProj.columns().get(1) instanceof PartitionColumn);
-    assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
-    assertTrue(scanProj.columns().get(2) instanceof PartitionColumn);
-    assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
-    assertTrue(scanProj.columns().get(3) instanceof PartitionColumn);
-    assertEquals(2, ((PartitionColumn) cols.get(3)).partition());
-  }
-
-  /**
    * Combine wildcard and file metadata columns. The wildcard expands
    * table columns but not metadata columns.
    */
-
   @Test
   public void testLegacyWildcardAndFileMetadata() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataOptions options = standardOptions(filePath);
     options.useLegacyWildcardExpansion(true);
-    options.useLegacyExpansionLocation(false);
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
         options);
@@ -236,13 +199,11 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
    * As above, but include implicit columns before and after the
    * wildcard.
    */
-
   @Test
   public void testLegacyWildcardAndFileMetadataMixed() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataOptions options = standardOptions(filePath);
     options.useLegacyWildcardExpansion(true);
-    options.useLegacyExpansionLocation(false);
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
         options);
@@ -271,7 +232,6 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
    * Tests proposed functionality: included only requested partition
    * columns.
    */
-
   @Test
   public void testRevisedWildcardAndPartition() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
@@ -291,37 +251,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
   }
 
   @Test
-  public void testLegacyWildcardAndPartition() {
-    Path filePath = new Path("hdfs:///w/x/y/z.csv");
-    FileMetadataOptions options = standardOptions(filePath);
-    options.useLegacyWildcardExpansion(true);
-    options.useLegacyExpansionLocation(true);
-    FileMetadataManager metadataManager = new FileMetadataManager(
-        fixture.getOptionManager(),
-        options);
-
-    ScanLevelProjection scanProj = ScanLevelProjection.build(
-        RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
-            ScanTestUtils.partitionColName(8)),
-        Lists.newArrayList(metadataManager.projectionParser()));
-
-      List<ColumnProjection> cols = scanProj.columns();
-      assertEquals(4, cols.size());
-      assertTrue(scanProj.columns().get(0) instanceof UnresolvedWildcardColumn);
-      assertTrue(scanProj.columns().get(1) instanceof PartitionColumn);
-      assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
-      assertTrue(scanProj.columns().get(2) instanceof PartitionColumn);
-      assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
-      assertTrue(scanProj.columns().get(3) instanceof PartitionColumn);
-      assertEquals(8, ((PartitionColumn) cols.get(3)).partition());
-  }
-
-  @Test
   public void testPreferredPartitionExpansion() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataOptions options = standardOptions(filePath);
     options.useLegacyWildcardExpansion(true);
-    options.useLegacyExpansionLocation(false);
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
         options);
@@ -342,43 +275,11 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
       assertEquals(1, ((PartitionColumn) cols.get(3)).partition());
   }
 
-  /**
-   * Test a case like:<br>
-   * <code>SELECT *, dir1 FROM ...</code><br>
-   * The projection list includes "dir1". The wildcard will
-   * fill in "dir0".
-   */
-
-  @Test
-  public void testLegacyWildcardAndPartitionWithOverlap() {
-    Path filePath = new Path("hdfs:///w/x/y/z.csv");
-    FileMetadataOptions options = standardOptions(filePath);
-    options.useLegacyWildcardExpansion(true);
-    options.useLegacyExpansionLocation(true);
-    FileMetadataManager metadataManager = new FileMetadataManager(
-        fixture.getOptionManager(),
-        options);
-
-    ScanLevelProjection scanProj = ScanLevelProjection.build(
-        RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
-            ScanTestUtils.partitionColName(1)),
-        Lists.newArrayList(metadataManager.projectionParser()));
-
-      List<ColumnProjection> cols = scanProj.columns();
-      assertEquals(3, cols.size());
-      assertTrue(scanProj.columns().get(0) instanceof UnresolvedWildcardColumn);
-      assertTrue(scanProj.columns().get(1) instanceof PartitionColumn);
-      assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
-      assertTrue(scanProj.columns().get(2) instanceof PartitionColumn);
-      assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
-  }
-
   @Test
   public void testPreferedWildcardExpansionWithOverlap() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataOptions options = standardOptions(filePath);
     options.useLegacyWildcardExpansion(true);
-    options.useLegacyExpansionLocation(false);
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
         options);
@@ -402,7 +303,6 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
    * to be maps or arrays, are not interpreted as metadata. That is,
    * the projected table map or array "shadows" the metadata column.
    */
-
   @Test
   public void testShadowed() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
index 8fd14b7..c3a2243 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
@@ -53,41 +53,34 @@ import org.junit.experimental.categories.Category;
  * The tests here focus on the scan orchestrator itself; the tests assume
  * that tests for lower-level components have already passed.
  */
-
 @Category(RowSetTests.class)
 public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   /**
    * Test SELECT * from an early-schema table of (a, b)
    */
-
   @Test
   public void testEarlySchemaWildcard() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT * ...
-
     builder.setProjection(RowSetTestUtils.projectAll());
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // ... FROM table
-
     ReaderSchemaOrchestrator reader = scanner.startReader();
 
     // file schema (a, b)
-
     TupleMetadata tableSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
         .buildSchema();
 
     // Create the table loader
-
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
     // Simulate a first reader in a scan that can provide an
     // empty batch to define schema.
-
     {
       reader.defineSchema();
       SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
@@ -99,7 +92,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     }
 
     // Create a batch of data.
-
     reader.startBatch();
     loader.writer()
       .addRow(1, "fred")
@@ -108,7 +100,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     reader.endBatch();
 
     // Verify
-
     {
       SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
           .addRow(1, "fred")
@@ -120,7 +111,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     }
 
     // Second batch.
-
     reader.startBatch();
     loader.writer()
       .addRow(3, "barney")
@@ -129,7 +119,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     reader.endBatch();
 
     // Verify
-
     {
       SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
           .addRow(3, "barney")
@@ -142,7 +131,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     // Explicit reader close. (All other tests are lazy, they
     // use an implicit close.)
-
     scanner.closeReader();
 
     scanner.close();
@@ -151,36 +139,30 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
   /**
    * Test SELECT a, b FROM table(a, b)
    */
-
   @Test
   public void testEarlySchemaSelectAll() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT a, b ...
-
     builder.setProjection(RowSetTestUtils.projectList("a", "b"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // ... FROM table
-
     ReaderSchemaOrchestrator reader = scanner.startReader();
 
     // file schema (a, b)
-
     TupleMetadata tableSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
         .buildSchema();
 
     // Create the table loader
-
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
     // Don't bother with an empty batch here or in other tests.
     // Simulates the second reader in a scan.
 
     // Create a batch of data.
-
     reader.startBatch();
     loader.writer()
       .addRow(1, "fred")
@@ -188,7 +170,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     reader.endBatch();
 
     // Verify
-
     SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
         .addRow(1, "fred")
         .addRow(2, "wilma")
@@ -203,29 +184,24 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
   /**
    * Test SELECT b, a FROM table(a, b)
    */
-
   @Test
   public void testEarlySchemaSelectAllReorder() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT b, a ...
-
     builder.setProjection(RowSetTestUtils.projectList("b", "a"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // ... FROM table
-
     ReaderSchemaOrchestrator reader = scanner.startReader();
 
     // file schema (a, b)
-
     TupleMetadata tableSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
         .buildSchema();
 
     // Create the table loader
-
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
     TupleMetadata expectedSchema = new SchemaBuilder()
@@ -234,7 +210,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
         .buildSchema();
 
     // Create a batch of data.
-
    reader.startBatch();
    loader.writer()
      .addRow(1, "fred")
@@ -242,7 +217,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
    reader.endBatch();
 
     // Verify
-
    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
       .addRow("fred", 1)
       .addRow("wilma", 2)
@@ -258,29 +232,24 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
    * Test SELECT a, b, c FROM table(a, b)
    * c will be null
    */
-
   @Test
   public void testEarlySchemaSelectExtra() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT a, b, c ...
-
     builder.setProjection(RowSetTestUtils.projectList("a", "b", "c"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // ... FROM table
-
     ReaderSchemaOrchestrator reader = scanner.startReader();
 
     // file schema (a, b)
-
     TupleMetadata tableSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
         .buildSchema();
 
     // Create the table loader
-
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
     TupleMetadata expectedSchema = new SchemaBuilder()
@@ -289,8 +258,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
         .addNullable("c", MinorType.INT)
         .buildSchema();
 
-   // Create a batch of data.
-
+    // Create a batch of data.
     reader.startBatch();
     loader.writer()
       .addRow(1, "fred")
@@ -314,13 +282,11 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
    * Test SELECT a, b, c FROM table(a, b)
    * c will be null of type VARCHAR
    */
-
   @Test
   public void testEarlySchemaSelectExtraCustomType() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // Null columns of type VARCHAR
-
     MajorType nullType = MajorType.newBuilder()
         .setMinorType(MinorType.VARCHAR)
         .setMode(DataMode.OPTIONAL)
@@ -328,23 +294,19 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     builder.setNullType(nullType);
 
     // SELECT a, b, c ...
-
     builder.setProjection(RowSetTestUtils.projectList("a", "b", "c"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // ... FROM table ...
-
     ReaderSchemaOrchestrator reader = scanner.startReader();
 
     // file schema (a, b)
-
     TupleMetadata tableSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
         .buildSchema();
 
     // Create the table loader
-
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
     TupleMetadata expectedSchema = new SchemaBuilder()
@@ -354,7 +316,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
         .buildSchema();
 
     // Create a batch of data.
-
     reader.startBatch();
     loader.writer()
       .addRow(1, "fred")
@@ -362,7 +323,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     reader.endBatch();
 
     // Verify
-
     SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
       .addRow(1, "fred", null)
       .addRow(2, "wilma", null)
@@ -377,22 +337,18 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
   /**
    * Test SELECT a FROM table(a, b)
    */
-
   @Test
   public void testEarlySchemaSelectSubset() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT a ...
-
     builder.setProjection(RowSetTestUtils.projectList("a"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // ... FROM table
-
     ReaderSchemaOrchestrator reader = scanner.startReader();
 
     // file schema (a, b)
-
     TupleMetadata tableSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
@@ -404,7 +360,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     // Verify that unprojected column is unprojected in the
     // table loader.
-
     assertFalse(loader.writer().column("b").isProjected());
 
     TupleMetadata expectedSchema = new SchemaBuilder()
@@ -412,7 +367,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
         .buildSchema();
 
     // Create a batch of data.
-
     reader.startBatch();
     loader.writer()
       .addRow(1, "fred")
@@ -420,7 +374,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     reader.endBatch();
 
     // Verify
-
     SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
       .addRow(1)
       .addRow(2)
@@ -435,47 +388,39 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
   /**
    * Test SELECT - FROM table(a, b)
    */
-
   @Test
   public void testEarlySchemaSelectNone() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT ...
     // (Like SELECT COUNT(*) ...
-
     builder.setProjection(RowSetTestUtils.projectList());
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // ... FROM table
-
     ReaderSchemaOrchestrator reader = scanner.startReader();
 
     // file schema (a, b)
-
     TupleMetadata tableSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
         .buildSchema();
 
     // Create the table loader
-
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
     // Verify that unprojected column is unprojected in the
     // table loader.
-
     assertTrue(loader.isProjectionEmpty());
     assertFalse(loader.writer().column("a").isProjected());
     assertFalse(loader.writer().column("b").isProjected());
 
     // Verify empty batch.
-
     BatchSchema expectedSchema = new BatchSchemaBuilder()
         .withSchemaBuilder(new SchemaBuilder())
         .build();
 
     // Create a batch of data.
-
     reader.startBatch();
     loader.writer()
       .addRow(1, "fred")
@@ -483,10 +428,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     reader.endBatch();
 
     // Verify
-
     {
       // Two rows, no data.
-
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
         .addRow()
         .addRow()
@@ -497,13 +440,11 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     }
 
     // Fast path to fill in empty rows
-
     reader.startBatch();
     loader.skipRows(10);
     reader.endBatch();
 
     // Verify
-
     {
       VectorContainer output = scanner.output();
       assertEquals(10, output.getRecordCount());
@@ -517,37 +458,30 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
    * Test SELECT * from an early-schema table of () (that is,
    * a schema that consists of zero columns.
    */
-
   @Test
   public void testEmptySchema() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT * ...
-
     builder.setProjection(RowSetTestUtils.projectAll());
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // ... FROM table
-
     ReaderSchemaOrchestrator reader = scanner.startReader();
 
     // file schema ()
-
     TupleMetadata tableSchema = new SchemaBuilder()
         .buildSchema();
 
     // Create the table loader
-
     reader.makeTableLoader(tableSchema);
 
     // Create a batch of data. Because there are no columns, it does
     // not make sense to ready any rows.
-
     reader.startBatch();
     reader.endBatch();
 
     // Verify
-
     {
       SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
           .build();
@@ -559,32 +493,26 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     scanner.close();
   }
 
-
   /**
    * Test SELECT a from an early-schema table of () (that is,
    * a schema that consists of zero columns.
    */
-
   @Test
   public void testEmptySchemaExtra() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT * ...
-
     builder.setProjection(RowSetTestUtils.projectList("a"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // ... FROM table
-
     ReaderSchemaOrchestrator reader = scanner.startReader();
 
     // file schema ()
-
     TupleMetadata tableSchema = new SchemaBuilder()
         .buildSchema();
 
     // Create the table loader
-
     reader.makeTableLoader(tableSchema);
 
     TupleMetadata expectedSchema = new SchemaBuilder()
@@ -593,12 +521,10 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     // Create a batch of data. Because there are no columns, it does
     // not make sense to ready any rows.
-
     reader.startBatch();
     reader.endBatch();
 
     // Verify
-
     SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
         .build();
     RowSetUtilities.verify(expected,
@@ -621,7 +547,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
    * The result in all cases should be
    * <tt>(a : BIGINT, b: VARCHAR)</tt>
    */
-
   @Test
   public void testTypeSmoothingExplicit() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
@@ -634,14 +559,12 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     SchemaTracker tracker = new SchemaTracker();
 
     // SELECT * ...
-
     builder.setProjection(RowSetTestUtils.projectList("a", "b", "c"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     int schemaVersion;
     {
       // ... FROM table1(a, b, c)
-
       ReaderSchemaOrchestrator reader = scanner.startReader();
       reader.makeTableLoader(table1Schema);
       reader.defineSchema();
@@ -657,7 +580,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       //
       // B is dropped. But, it is nullable, so the vector cache
       // can supply the proper type to ensure continuity.
-
       TupleMetadata table2Schema = new SchemaBuilder()
           .add("A", MinorType.BIGINT)
           .addArray("C", MinorType.INT)
@@ -677,7 +599,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       //
       // C is dropped. But, it is an array, which uses zero-elements
       // to indicate null, so the vector cache can fill in the type.
-
       TupleMetadata table3Schema = new SchemaBuilder()
           .add("A", MinorType.BIGINT)
           .addNullable("B", MinorType.VARCHAR)
@@ -698,7 +619,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       // This version carries over a non-nullable BIGINT, but that
       // can't become a null column, so nullable BIGINT is substituted,
       // result in a schema change.
-
       TupleMetadata table2Schema = new SchemaBuilder()
           .addNullable("B", MinorType.VARCHAR)
           .addArray("C", MinorType.INT)
@@ -734,18 +654,15 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
    * As a side effect, makes sure that two identical tables (in this case,
    * separated by a different table) results in no schema change.
    */
-
   @Test
   public void testTypeSmoothing() {
     ScanOrchestratorBuilder builder = new MockScanBuilder();
 
     // SELECT a, b ...
-
     builder.setProjection(RowSetTestUtils.projectList("a", "b"));
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // file schema (a, b)
-
     TupleMetadata twoColSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .addNullable("b", MinorType.VARCHAR, 10)
@@ -755,12 +672,10 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     int schemaVersion;
     {
       // ... FROM table 1
-
       ReaderSchemaOrchestrator reader = scanner.startReader();
       ResultSetLoader loader = reader.makeTableLoader(twoColSchema);
 
       // Projection of (a, b) to (a, b)
-
       reader.startBatch();
       loader.writer()
           .addRow(10, "fred")
@@ -779,17 +694,14 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     }
     {
       // ... FROM table 2
-
       ReaderSchemaOrchestrator reader = scanner.startReader();
 
       // File schema (a)
-
       TupleMetadata oneColSchema = new SchemaBuilder()
           .add("a", MinorType.INT)
           .buildSchema();
 
       // Projection of (a) to (a, b), reusing b from above.
-
       ResultSetLoader loader = reader.makeTableLoader(oneColSchema);
 
       reader.startBatch();
@@ -810,11 +722,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     }
     {
       // ... FROM table 3
-
       ReaderSchemaOrchestrator reader = scanner.startReader();
 
       // Projection of (a, b), to (a, b), reusing b yet again
-
       ResultSetLoader loader = reader.makeTableLoader(twoColSchema);
 
       reader.startBatch();
@@ -845,7 +755,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
 
     // Most general schema: nullable, with precision.
-
     TupleMetadata schema1 = new SchemaBuilder()
         .addNullable("a", MinorType.VARCHAR, 10)
         .buildSchema();
@@ -859,7 +768,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       ResultSetLoader loader = reader.makeTableLoader(schema1);
 
       // Create a batch
-
       reader.startBatch();
       loader.writer()
         .addRow("fred")
@@ -869,7 +777,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       schemaVersion = tracker.schemaVersion();
 
       // Verify
-
       SingleRowSet expected = fixture.rowSetBuilder(schema1)
         .addRow("fred")
         .addRow("wilma")
@@ -883,7 +790,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       // Table 2: required, use nullable
 
       // Required version.
-
       TupleMetadata schema2 = new SchemaBuilder()
           .add("a", MinorType.VARCHAR, 10)
           .buildSchema();
@@ -891,9 +797,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       ReaderSchemaOrchestrator reader = scanner.startReader();
       ResultSetLoader loader = reader.makeTableLoader(schema2);
 
-
       // Create a batch
-
       reader.startBatch();
       loader.writer()
         .addRow("barney")
@@ -901,7 +805,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       reader.endBatch();
 
       // Verify, using persistent schema
-
       tracker.trackSchema(scanner.output());
       assertEquals(schemaVersion, tracker.schemaVersion());
       SingleRowSet expected = fixture.rowSetBuilder(schema1)
@@ -917,7 +820,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       // Table 3: narrower precision, use wider
 
       // Required version with narrower precision.
-
       TupleMetadata schema3 = new SchemaBuilder()
           .add("a", MinorType.VARCHAR, 5)
           .buildSchema();
@@ -926,7 +828,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       ResultSetLoader loader = reader.makeTableLoader(schema3);
 
       // Create a batch
-
       reader.startBatch();
       loader.writer()
         .addRow("bam-bam")
@@ -934,7 +835,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       reader.endBatch();
 
       // Verify, using persistent schema
-
       tracker.trackSchema(scanner.output());
       assertEquals(schemaVersion, tracker.schemaVersion());
 
@@ -956,7 +856,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
    * SELECT order, preserving vectors, so no schema change for column
    * reordering.
    */
-
   @Test
   public void testColumnReordering() {
 
@@ -985,11 +884,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     int schemaVersion;
     {
       // ... FROM table 1
-
       ReaderSchemaOrchestrator reader = scanner.startReader();
 
       // Projection of (a, b, c) to (a, b, c)
-
       ResultSetLoader loader = reader.makeTableLoader(schema1);
 
       reader.startBatch();
@@ -1012,11 +909,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     }
     {
       // ... FROM table 2
-
       ReaderSchemaOrchestrator reader = scanner.startReader();
 
       // Projection of (c, a, b) to (a, b, c)
-
       ResultSetLoader loader = reader.makeTableLoader(schema2);
 
       reader.startBatch();
@@ -1037,11 +932,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     }
     {
       // ... FROM table 3
-
       ReaderSchemaOrchestrator reader = scanner.startReader();
 
       // Projection of (a, c, b) to (a, b, c)
-
       ResultSetLoader loader = reader.makeTableLoader(schema3);
 
       reader.startBatch();
@@ -1065,5 +958,4 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
   }
 
   // TODO: Start with early schema, but add columns
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
index 1eff25f..cc24cb7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -22,10 +22,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import org.apache.drill.categories.RowSetTests;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
@@ -35,9 +33,8 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.Scan
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
 import org.apache.drill.exec.physical.resultSet.ProjectionSet.ColumnReadProjection;
 import org.apache.drill.exec.physical.resultSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -49,7 +46,6 @@ import org.junit.experimental.categories.Category;
  * Test the level of projection done at the level of the scan as a whole;
  * before knowledge of table "implicit" columns or the specific table schema.
  */
-
 @Category(RowSetTests.class)
 public class TestScanLevelProjection extends SubOperatorTest {
 
@@ -58,7 +54,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
    * data source has an early schema of (a, c, d). (a, c) are
    * projected, (d) is null.
    */
-
   @Test
   public void testBasics() {
 
@@ -83,18 +78,15 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertEquals("c", scanProj.columns().get(2).name());
 
     // Verify column type
-
     assertTrue(scanProj.columns().get(0) instanceof UnresolvedColumn);
 
     // Verify tuple projection
-
     RequestedTuple outputProj = scanProj.rootProjection();
     assertEquals(3, outputProj.projections().size());
     assertNotNull(outputProj.get("a"));
     assertTrue(outputProj.get("a").isSimple());
 
     // Make up a reader schema and test the projection set.
-
     TupleMetadata readerSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.INT)
@@ -112,13 +104,11 @@ public class TestScanLevelProjection extends SubOperatorTest {
    * a dot, such as "a.b". We may not know the type of "b", but have
    * just learned that "a" must be a map.
    */
-
   @Test
   public void testMap() {
 
     // SELECT a.x, b.x, a.y, b.y, c
     // We infer a and b are maps.
-
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
         ScanTestUtils.parsers());
@@ -132,29 +122,25 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertEquals("c", scanProj.columns().get(2).name());
 
     // Verify column type
-
     assertTrue(scanProj.columns().get(0) instanceof UnresolvedColumn);
 
     // Inferred map structure
-
     final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
     assertTrue(a.isTuple());
-    assertEquals(ProjectionType.GENERAL, a.mapProjection().projectionType("x"));
-    assertEquals(ProjectionType.GENERAL, a.mapProjection().projectionType("y"));
-    assertEquals(ProjectionType.UNPROJECTED,  a.mapProjection().projectionType("z"));
+    assertTrue(a.tuple().isProjected("x"));
+    assertTrue(a.tuple().isProjected("y"));
+    assertFalse(a.tuple().isProjected("z"));
 
     final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element();
     assertTrue(c.isSimple());
 
     // Verify tuple projection
-
     RequestedTuple outputProj = scanProj.rootProjection();
     assertEquals(3, outputProj.projections().size());
     assertNotNull(outputProj.get("a"));
     assertTrue(outputProj.get("a").isTuple());
 
     // Make up a reader schema and test the projection set.
-
     TupleMetadata readerSchema = new SchemaBuilder()
         .addMap("a")
           .add("x", MinorType.INT)
@@ -173,20 +159,19 @@ public class TestScanLevelProjection extends SubOperatorTest {
     // an actual reader.
 
     ProjectionSet projSet = scanProj.projectionSet().build();
+    assertTrue(projSet.isProjected("a"));
     ColumnReadProjection aProj = projSet.readProjection(readerSchema.metadata("a"));
     assertTrue(aProj.isProjected());
-    assertEquals(ProjectionType.TUPLE, aProj.projectionType());
     ColumnReadProjection cProj = projSet.readProjection(readerSchema.metadata("c"));
     assertTrue(cProj.isProjected());
-    assertEquals(ProjectionType.GENERAL, cProj.projectionType());
     assertFalse(projSet.readProjection(readerSchema.metadata("d")).isProjected());
+    assertFalse(projSet.isProjected("d"));
   }
 
   /**
    * Similar to maps, if the project list contains "a[1]" then we've learned that
    * a is an array, but we don't know what type.
    */
-
   @Test
   public void testArray() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -200,11 +185,9 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertEquals("a", scanProj.columns().get(0).name());
 
     // Verify column type
-
     assertTrue(scanProj.columns().get(0) instanceof UnresolvedColumn);
 
     // Map structure
-
     final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
     assertTrue(a.isArray());
     assertFalse(a.hasIndex(0));
@@ -213,14 +196,12 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertTrue(a.hasIndex(3));
 
     // Verify tuple projection
-
     RequestedTuple outputProj = scanProj.rootProjection();
     assertEquals(1, outputProj.projections().size());
     assertNotNull(outputProj.get("a"));
     assertTrue(outputProj.get("a").isArray());
 
     // Make up a reader schema and test the projection set.
-
     TupleMetadata readerSchema = new SchemaBuilder()
         .addArray("a", MinorType.INT)
         .add("c", MinorType.INT)
@@ -229,7 +210,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
     ProjectionSet projSet = scanProj.projectionSet().build();
     ColumnReadProjection aProj = projSet.readProjection(readerSchema.metadata("a"));
     assertTrue(aProj.isProjected());
-    assertEquals(ProjectionType.ARRAY, aProj.projectionType());
     assertFalse(projSet.readProjection(readerSchema.metadata("c")).isProjected());
   }
 
@@ -237,7 +217,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
    * Simulate a SELECT * query by passing "**" (Drill's internal representation
    * of the wildcard) as a column name.
    */
-
   @Test
   public void testWildcard() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -253,22 +232,18 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertEquals(SchemaPath.DYNAMIC_STAR, scanProj.columns().get(0).name());
 
     // Verify bindings
-
     assertEquals(scanProj.columns().get(0).name(), scanProj.requestedCols().get(0).rootName());
 
     // Verify column type
-
     assertTrue(scanProj.columns().get(0) instanceof UnresolvedWildcardColumn);
 
     // Verify tuple projection
-
     RequestedTuple outputProj = scanProj.rootProjection();
     assertEquals(1, outputProj.projections().size());
     assertNotNull(outputProj.get(SchemaPath.DYNAMIC_STAR));
     assertTrue(outputProj.get(SchemaPath.DYNAMIC_STAR).isWildcard());
 
     // Make up a reader schema and test the projection set.
-
     TupleMetadata readerSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("c", MinorType.INT)
@@ -283,7 +258,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
    * Test an empty projection which occurs in a
    * SELECT COUNT(*) query.
    */
-
   @Test
   public void testEmptyProjection() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -295,12 +269,10 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertEquals(0, scanProj.requestedCols().size());
 
     // Verify tuple projection
-
     RequestedTuple outputProj = scanProj.rootProjection();
     assertEquals(0, outputProj.projections().size());
 
     // Make up a reader schema and test the projection set.
-
     TupleMetadata readerSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .buildSchema();
@@ -314,7 +286,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
    * operator will fill in the column, the scan framework just ignores
    * the extra column.
    */
-
   @Test
   public void testWildcardAndColumns() {
     ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -327,7 +298,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertEquals(1, scanProj.columns().size());
 
     // Verify tuple projection
-
     RequestedTuple outputProj = scanProj.rootProjection();
     assertEquals(2, outputProj.projections().size());
     assertNotNull(outputProj.get(SchemaPath.DYNAMIC_STAR));
@@ -335,7 +305,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertNotNull(outputProj.get("a"));
 
     // Make up a reader schema and test the projection set.
-
     TupleMetadata readerSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("c", MinorType.INT)
@@ -349,7 +318,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
   /**
    * Test a column name and a wildcard.
    */
-
   @Test
   public void testColumnAndWildcard() {
     ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -363,22 +331,17 @@ public class TestScanLevelProjection extends SubOperatorTest {
   }
 
   /**
-   * Can't include a wildcard twice.
+   * Wildcard included twice is benign
    * <p>
    * Note: Drill actually allows this, but the work should be done
    * in the project operator; scan should see at most one wildcard.
    */
-
   @Test
-  public void testErrorTwoWildcards() {
-    try {
-      ScanLevelProjection.build(
-          RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR),
-          ScanTestUtils.parsers());
-      fail();
-    } catch (final UserException e) {
-      // Expected
-    }
+  public void testTwoWildcards() {
+    ScanLevelProjection scanProj = ScanLevelProjection.build(
+        RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR),
+        ScanTestUtils.parsers());
+    assertTrue(scanProj.projectAll());
   }
 
   @Test
@@ -386,7 +349,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
     TupleMetadata outputSchema = new SchemaBuilder().buildSchema();
 
     // Simulate SELECT a
-
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a"),
         ScanTestUtils.parsers(),
@@ -437,7 +399,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
         .buildSchema();
 
     // Mark b as special; not expanded in wildcard.
-
     outputSchema.metadata("b").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
 
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -478,7 +439,6 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertTrue(scanProj.columns().get(1) instanceof UnresolvedColumn);
 
     // Make up a reader schema and test the projection set.
-
     TupleMetadata readerSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.INT)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
index 1b711eb..b299511 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -93,7 +93,6 @@ public class TestSchemaSmoothing extends SubOperatorTest {
 
   private FileMetadataOptions standardOptions(List<Path> files) {
     FileMetadataOptions options = new FileMetadataOptions();
-    options.useLegacyWildcardExpansion(false); // Don't expand partition columns for wildcard
     options.setSelectionRoot(new Path("hdfs:///w"));
     options.setFiles(files);
     return options;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
index a59065d..caa1d25 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.scan.project.projSet;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -32,7 +31,6 @@ import org.apache.drill.exec.physical.impl.scan.project.projSet.TypeConverter.Cu
 import org.apache.drill.exec.physical.resultSet.ProjectionSet;
 import org.apache.drill.exec.physical.resultSet.ProjectionSet.ColumnReadProjection;
 import org.apache.drill.exec.physical.resultSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -56,14 +54,12 @@ import org.junit.experimental.categories.Category;
  * of projection should be fully tested here, then just sanity tested
  * in the result set loader.
  */
-
 @Category(RowSetTests.class)
 public class TestProjectionSet extends BaseTest {
 
   /**
    * Empty projection, no schema
    */
-
   @Test
   public void testEmptyProjection() {
     ProjectionSet projSet = ProjectionSetFactory.projectNone();
@@ -85,7 +81,6 @@ public class TestProjectionSet extends BaseTest {
     assertSame(aSchema, aCol.providedSchema());
     assertNull(aCol.conversionFactory());
     assertSame(EmptyProjectionSet.PROJECT_NONE, aCol.mapProjection());
-    assertNull(aCol.projectionType());
 
     ColumnReadProjection mCol = projSet.readProjection(readSchema.metadata("m"));
     assertFalse(mCol.isProjected());
@@ -98,7 +93,6 @@ public class TestProjectionSet extends BaseTest {
   /**
    * Wildcard projection, no schema
    */
-
   @Test
   public void testWildcardProjection() {
     ProjectionSet projSet = ProjectionSetFactory.projectAll();
@@ -114,13 +108,11 @@ public class TestProjectionSet extends BaseTest {
     assertSame(aSchema, aCol.providedSchema());
     assertNull(aCol.conversionFactory());
     assertNull(aCol.mapProjection());
-    assertNull(aCol.projectionType());
   }
 
   /**
    * Wildcard projection, no schema
    */
-
   @Test
   public void testWildcardMapProjection() {
     ProjectionSet projSet = ProjectionSetFactory.projectAll();
@@ -143,7 +135,6 @@ public class TestProjectionSet extends BaseTest {
    * Wildcard projection, with schema. Some columns marked
    * as special; not expanded by wildcard.
    */
-
   @Test
   public void testWildcardAndSchemaProjection() {
     TupleMetadata readSchema = new SchemaBuilder()
@@ -198,7 +189,6 @@ public class TestProjectionSet extends BaseTest {
    * Wildcard projection, with schema. Some columns marked
    * as special; not expanded by wildcard.
    */
-
   @Test
   public void testWildcardAndSchemaMapProjection() {
     TupleMetadata readSchema = new SchemaBuilder()
@@ -265,7 +255,6 @@ public class TestProjectionSet extends BaseTest {
   /**
    * Wildcard and strict schema
    */
-
   @Test
   public void testWildcardAndStrictSchemaProjection() {
     TupleMetadata readSchema = new SchemaBuilder()
@@ -303,7 +292,6 @@ public class TestProjectionSet extends BaseTest {
   /**
    * Wildcard and strict schema
    */
-
   @Test
   public void testWildcardAndStrictMapSchemaProjection() {
     TupleMetadata readSchema = new SchemaBuilder()
@@ -369,7 +357,6 @@ public class TestProjectionSet extends BaseTest {
    * Also, sanity test of the builder for the project all,
    * project none cases.
    */
-
   @Test
   public void testExplicitProjection() {
     TupleMetadata readSchema = new SchemaBuilder()
@@ -388,7 +375,6 @@ public class TestProjectionSet extends BaseTest {
     assertSame(aSchema, aCol.providedSchema());
     assertNull(aCol.conversionFactory());
     assertNull(aCol.mapProjection());
-    assertEquals(ProjectionType.GENERAL, aCol.projectionType());
 
     ColumnReadProjection bCol = projSet.readProjection(readSchema.metadata("b"));
     assertFalse(bCol.isProjected());
@@ -431,13 +417,11 @@ public class TestProjectionSet extends BaseTest {
     assertSame(m1Schema, m1Col.readSchema());
     assertSame(m1Schema, m1Col.providedSchema());
     assertNull(m1Col.conversionFactory());
-    assertEquals(ProjectionType.TUPLE, m1Col.projectionType());
 
     // m1.c is projected
 
     ColumnReadProjection cCol = m1Col.mapProjection().readProjection(m1ReadSchema.metadata("c"));
     assertTrue(cCol.isProjected());
-    assertEquals(ProjectionType.GENERAL, cCol.projectionType());
 
     // but m1.d is not projected
 
@@ -446,13 +430,11 @@ public class TestProjectionSet extends BaseTest {
     // m2 is entirely projected
 
     ColumnReadProjection m2Col = projSet.readProjection(m2Schema);
-    assertEquals(ProjectionType.GENERAL, m2Col.projectionType());
     assertTrue(m2Col.isProjected());
     assertSame(m2Schema, m2Col.readSchema());
     assertSame(m2Schema, m2Col.providedSchema());
     assertNull(m2Col.conversionFactory());
     assertTrue(m2Col.mapProjection() instanceof WildcardProjectionSet);
-    assertEquals(ProjectionType.GENERAL, m2Col.projectionType());
     assertTrue(m2Col.mapProjection().readProjection(m2ReadSchema.metadata("e")).isProjected());
 
     // m3 is not projected at all
@@ -487,7 +469,6 @@ public class TestProjectionSet extends BaseTest {
 
     ColumnReadProjection m1Col = projSet.readProjection(m1Schema);
     assertTrue(m1Col.isProjected());
-    assertEquals(ProjectionType.TUPLE, m1Col.projectionType());
 
     // M1.c is projected
 
@@ -504,7 +485,6 @@ public class TestProjectionSet extends BaseTest {
    * That is, SELECT m is logically equivalent to SELECT m.*
    * and is subject to the strict schema projection rule.
    */
-
   @Test
   public void testImpliedWildcardWithStrictSchema() {
     TupleMetadata readSchema = new SchemaBuilder()
@@ -544,7 +524,6 @@ public class TestProjectionSet extends BaseTest {
    * Wildcard and none already tested above, here we test the
    * builder. With schema.
    */
-
   @Test
   public void testExplicitSchemaProjection() {
     TupleMetadata readSchema = new SchemaBuilder()
@@ -603,7 +582,6 @@ public class TestProjectionSet extends BaseTest {
   /**
    * Wildcard projection, no schema, custom column transform.
    */
-
   @Test
   public void testTransformConversion() {
     ColumnConversionFactory conv = StandardConversions.factory(ConvertStringToInt.class);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/RowSetTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/RowSetTestUtils.java
index 5c9f76d..9f4266f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/RowSetTestUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/RowSetTestUtils.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.resultSet.impl;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.common.expression.SchemaPath;
@@ -30,7 +31,7 @@ public class RowSetTestUtils {
   public static List<SchemaPath> projectList(String... names) {
     List<SchemaPath> selected = new ArrayList<>();
     for (String name : names) {
-      if (name.equals(SchemaPath.DYNAMIC_STAR)) {
+      if (name.equals(SchemaPath.DYNAMIC_STAR) || name.equals("*")) {
         selected.add(SchemaPath.STAR_COLUMN);
       } else {
         selected.add(SchemaPath.parseFromString(name));
@@ -58,6 +59,10 @@ public class RowSetTestUtils {
         new SchemaPath[] {SchemaPath.STAR_COLUMN});
   }
 
+  public static List<SchemaPath> projectNone() {
+    return Collections.emptyList();
+  }
+
   @SafeVarargs
   public static List<SchemaPath> concat(List<SchemaPath>... parts) {
     List<SchemaPath> selected = new ArrayList<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedPath.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedPath.java
new file mode 100644
index 0000000..4f7104d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedPath.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.project;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionChecker;
+import org.apache.drill.exec.physical.resultSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.junit.Test;
+
+/**
+ * Projection creates a pattern which we match against a particular type
+ * to see if the projection path is consistent with the type. Tests here
+ * verify the consistency checks.
+ */
+public class TestProjectedPath {
+
+  // INT is a proxy for all scalar columns.
+  private static final ColumnMetadata INT_COLUMN = intSchema().metadata("a");
+  private static final ColumnMetadata INT_ARRAY_COLUMN = intArraySchema().metadata("a");
+  private static final ColumnMetadata MAP_COLUMN = mapSchema().metadata("a");
+  private static final ColumnMetadata MAP_ARRAY_COLUMN = mapArraySchema().metadata("a");
+  private static final ColumnMetadata UNION_COLUMN = unionSchema().metadata("a");
+  private static final ColumnMetadata LIST_COLUMN = listSchema().metadata("a");
+  private static final ColumnMetadata DICT_INT_INT_COLUMN = dictSchema(MinorType.INT).metadata("a");
+  private static final ColumnMetadata DICT_ARRAY_INT_INT_COLUMN = dictArraySchema(MinorType.INT).metadata("a");
+  private static final ColumnMetadata DICT_BIGINT_INT_COLUMN = dictSchema(MinorType.BIGINT).metadata("a");
+  private static final ColumnMetadata DICT_ARRAY_BIGINT_INT_COLUMN = dictArraySchema(MinorType.BIGINT).metadata("a");
+  private static final ColumnMetadata DICT_VARCHAR_INT_COLUMN = dictSchema(MinorType.VARCHAR).metadata("a");
+  private static final ColumnMetadata DICT_ARRAY_VARCHAR_INT_COLUMN = dictArraySchema(MinorType.VARCHAR).metadata("a");
+  private static final ColumnMetadata DICT_DOUBLE_INT_COLUMN = dictSchema(MinorType.FLOAT8).metadata("a");
+  private static final ColumnMetadata DICT_ARRAY_DOUBLE_INT_COLUMN = dictArraySchema(MinorType.FLOAT8).metadata("a");
+  private static final ColumnMetadata DICT_ARRAY_INT_INT_ARRAY_COLUMN = dictArrayArraySchema(MinorType.INT).metadata("a");
+  private static final ColumnMetadata DICT_ARRAY_VARCHAR_INT_ARRAY_COLUMN = dictArrayArraySchema(MinorType.VARCHAR).metadata("a");
+
+  private static TupleMetadata intSchema() {
+    return new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .build();
+  }
+
+  private static TupleMetadata intArraySchema() {
+    return new SchemaBuilder()
+        .addArray("a", MinorType.INT)
+        .build();
+  }
+
+  private static TupleMetadata mapSchema() {
+    return new SchemaBuilder()
+        .addMap("a")
+          .add("i", MinorType.INT)
+          .addMap("m")
+            .add("mi", MinorType.INT)
+            .resumeMap()
+          .resumeSchema()
+        .build();
+  }
+
+  private static TupleMetadata mapArraySchema() {
+    return new SchemaBuilder()
+        .addMapArray("a")
+          .add("i", MinorType.INT)
+          .addMap("m")
+            .add("mi", MinorType.INT)
+            .resumeMap()
+          .resumeSchema()
+        .build();
+  }
+
+  private static TupleMetadata dictSchema(MinorType keyType) {
+    return new SchemaBuilder()
+        .addDict("a", keyType)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .build();
+  }
+
+  private static TupleMetadata dictArraySchema(MinorType keyType) {
+    return new SchemaBuilder()
+        .addDictArray("a", keyType)
+          .value(MinorType.INT)
+          .resumeSchema()
+        .build();
+  }
+
+  private static TupleMetadata dictArrayArraySchema(MinorType keyType) {
+    return new SchemaBuilder()
+        .addDictArray("a", keyType)
+          .value(Types.repeated(MinorType.INT))
+          .resumeSchema()
+        .build();
+  }
+
+  private static TupleMetadata unionSchema() {
+    return new SchemaBuilder()
+        .addUnion("a")
+          .addType(MinorType.INT)
+          .resumeSchema()
+        .build();
+  }
+
+  private static TupleMetadata listSchema() {
+    return new SchemaBuilder()
+        .addList("a")
+          .addType(MinorType.INT)
+          .resumeSchema()
+        .build();
+  }
+
+  private void assertConsistent(RequestedTuple projSet, ColumnMetadata col) {
+    assertTrue(ProjectionChecker.isConsistent(projSet, col));
+  }
+
+  private void assertNotConsistent(RequestedTuple projSet, ColumnMetadata col) {
+    assertFalse(ProjectionChecker.isConsistent(projSet, col));
+  }
+
+  private void assertAllConsistent(RequestedTuple projSet) {
+    assertConsistent(projSet, INT_COLUMN);
+    assertConsistent(projSet, INT_ARRAY_COLUMN);
+    assertConsistent(projSet, MAP_COLUMN);
+    assertConsistent(projSet, MAP_ARRAY_COLUMN);
+    assertConsistent(projSet, DICT_INT_INT_COLUMN);
+    assertConsistent(projSet, DICT_ARRAY_INT_INT_COLUMN);
+    assertConsistent(projSet, UNION_COLUMN);
+    assertConsistent(projSet, LIST_COLUMN);
+  }
+
+  @Test
+  public void testSimplePath() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a"));
+
+    assertAllConsistent(projSet);
+
+    // No constraints on an unprojected column.
+
+    assertTrue(ProjectionChecker.isConsistent(projSet,
+        MetadataUtils.newScalar("b", Types.required(MinorType.INT))));
+  }
+
+  @Test
+  public void testProjectAll() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectAll());
+
+    // No constraints on wildcard projection
+    assertAllConsistent(projSet);
+  }
+
+  @Test
+  public void testProjectNone() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectNone());
+
+    // No constraints on empty projection
+    assertAllConsistent(projSet);
+  }
+
+  @Test
+  public void test1DArray() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a[0]"));
+
+    assertNotConsistent(projSet, INT_COLUMN);
+    assertConsistent(projSet, INT_ARRAY_COLUMN);
+    assertNotConsistent(projSet, MAP_COLUMN);
+    assertConsistent(projSet, MAP_ARRAY_COLUMN);
+    assertConsistent(projSet, UNION_COLUMN);
+    assertConsistent(projSet, LIST_COLUMN);
+
+    assertConsistent(projSet, DICT_INT_INT_COLUMN);
+
+    // TODO: Enforce specific DICT keys, if needed.
+//    assertDictConsistent(projSet, DICT_INT_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_INT_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_INT_INT_ARRAY_COLUMN);
+//    assertDictConsistent(projSet, DICT_BIGINT_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_BIGINT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_VARCHAR_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_VARCHAR_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_VARCHAR_INT_ARRAY_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_DOUBLE_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_DOUBLE_INT_COLUMN);
+  }
+
+  @Test
+  public void test2DArray() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a[0][1]"));
+
+    assertNotConsistent(projSet, INT_COLUMN);
+    assertNotConsistent(projSet, INT_ARRAY_COLUMN);
+    assertNotConsistent(projSet, MAP_COLUMN);
+    assertNotConsistent(projSet, MAP_ARRAY_COLUMN);
+    assertConsistent(projSet, UNION_COLUMN);
+    assertConsistent(projSet, LIST_COLUMN);
+
+    assertConsistent(projSet, DICT_INT_INT_COLUMN);
+
+    // TODO: Enforce specific DICT keys, if needed.
+//    assertDictNotConsistent(projSet, DICT_INT_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_INT_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_INT_INT_ARRAY_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_BIGINT_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_BIGINT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_VARCHAR_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_VARCHAR_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_VARCHAR_INT_ARRAY_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_DOUBLE_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_DOUBLE_INT_COLUMN);
+  }
+
+  @Test
+  public void test3DArray() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a[0][1][2]"));
+
+    assertNotConsistent(projSet, INT_COLUMN);
+    assertNotConsistent(projSet, INT_ARRAY_COLUMN);
+    assertNotConsistent(projSet, MAP_COLUMN);
+    assertNotConsistent(projSet, MAP_ARRAY_COLUMN);
+    assertConsistent(projSet, UNION_COLUMN);
+    assertConsistent(projSet, LIST_COLUMN);
+
+    assertConsistent(projSet, DICT_INT_INT_COLUMN);
+
+    // TODO: Enforce specific DICT keys, if needed.
+//    assertDictNotConsistent(projSet, DICT_INT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_INT_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_INT_INT_ARRAY_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_BIGINT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_BIGINT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_VARCHAR_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_VARCHAR_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_VARCHAR_INT_ARRAY_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_DOUBLE_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_DOUBLE_INT_COLUMN);
+  }
+
+  @Test
+  public void testMap() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a.b"));
+
+    assertNotConsistent(projSet, INT_COLUMN);
+    assertNotConsistent(projSet, INT_ARRAY_COLUMN);
+    assertConsistent(projSet, MAP_COLUMN);
+    assertConsistent(projSet, MAP_ARRAY_COLUMN);
+
+    // A UNION could contain a map, which would allow the
+    // a.b path to be valid.
+    assertConsistent(projSet, UNION_COLUMN);
+    // A LIST could be a list of MAPs, so a.b could mean
+    // to pick out the b column in all array entries.
+    assertConsistent(projSet, LIST_COLUMN);
+
+    assertConsistent(projSet, DICT_INT_INT_COLUMN);
+
+    // TODO: Enforce specific DICT keys, if needed.
+//    assertDictNotConsistent(projSet, DICT_INT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_INT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_INT_INT_ARRAY_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_BIGINT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_BIGINT_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_VARCHAR_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_VARCHAR_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_VARCHAR_INT_ARRAY_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_DOUBLE_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_DOUBLE_INT_COLUMN);
+  }
+
+  @Test
+  public void testMapArray() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a[0].b"));
+
+    assertNotConsistent(projSet, INT_COLUMN);
+    assertNotConsistent(projSet, INT_ARRAY_COLUMN);
+    assertNotConsistent(projSet, MAP_COLUMN);
+    assertConsistent(projSet, MAP_ARRAY_COLUMN);
+
+    // A UNION could contain a repeated map, which would allow the
+    // a.b path to be valid.
+    assertConsistent(projSet, UNION_COLUMN);
+    // A LIST could contain MAPs.
+    assertConsistent(projSet, LIST_COLUMN);
+
+    assertConsistent(projSet, DICT_INT_INT_COLUMN);
+
+    // TODO: Enforce specific DICT keys, if needed.
+//    assertDictNotConsistent(projSet, DICT_INT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_INT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_INT_INT_ARRAY_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_BIGINT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_BIGINT_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_VARCHAR_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_VARCHAR_INT_COLUMN);
+//    assertDictConsistent(projSet, DICT_ARRAY_VARCHAR_INT_ARRAY_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_DOUBLE_INT_COLUMN);
+//    assertDictNotConsistent(projSet, DICT_ARRAY_DOUBLE_INT_COLUMN);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java
index b4700a6..a9017a4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java
@@ -21,18 +21,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.categories.RowSetTests;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.resultSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.TupleProjectionType;
 import org.apache.drill.test.BaseTest;
 import org.junit.Test;
@@ -48,186 +45,211 @@ import org.junit.experimental.categories.Category;
  * parsing; the only bits not tested here is that which is
  * inherently specific to some use case.
  */
-
 @Category(RowSetTests.class)
 public class TestProjectedTuple extends BaseTest {
 
+  /**
+   * Null map means everything is projected
+   */
   @Test
   public void testProjectionAll() {
+    RequestedTuple projSet = Projections.parse(null);
+    assertSame(TupleProjectionType.ALL, projSet.type());
+    assertTrue(projSet.isProjected("foo"));
+    assertTrue(projSet.projections().isEmpty());
+  }
 
-    // Null map means everything is projected
-
-    RequestedTuple projSet = RequestedTupleImpl.parse(null);
-    assertEquals(TupleProjectionType.ALL, projSet.type());
-    // Not defined well; the tuple contains a wildcard
-    // assertEquals(ProjectionType.GENERAL, projSet.projectionType("foo"));
-
-    projSet = ImpliedTupleRequest.ALL_MEMBERS;
-    assertTrue(projSet instanceof ImpliedTupleRequest);
-    assertEquals(ProjectionType.GENERAL, projSet.projectionType("foo"));
+  /**
+   * SELECT * means everything is projected
+   */
+  @Test
+  public void testWildcard() {
+    RequestedTuple projSet = Projections.parse(RowSetTestUtils.projectAll());
+    assertSame(TupleProjectionType.ALL, projSet.type());
+    assertTrue(projSet.isProjected("foo"));
+    assertNull(projSet.get("foo"));
+    assertEquals(1, projSet.projections().size());
   }
 
   /**
    * Test an empty projection which occurs in a
    * SELECT COUNT(*) query.
+   * Empty list means nothing is projected.
    */
-
   @Test
   public void testProjectionNone() {
-
-    // Empty list means nothing is projected
-
-    RequestedTuple projSet = RequestedTupleImpl.parse(Collections.emptyList());
-    assertEquals(TupleProjectionType.NONE, projSet.type());
-    assertTrue(projSet instanceof ImpliedTupleRequest);
-    List<RequestedColumn> cols = projSet.projections();
-    assertEquals(0, cols.size());
-    assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("foo"));
+    RequestedTuple projSet = Projections.parse(new ArrayList<SchemaPath>());
+    assertSame(TupleProjectionType.NONE, projSet.type());
+    assertFalse(projSet.isProjected("foo"));
+    assertTrue(projSet.projections().isEmpty());
   }
 
+  /**
+   * Simple non-map columns
+   */
   @Test
   public void testProjectionSimple() {
-
-    // Simple non-map columns
-
-    RequestedTuple projSet = RequestedTupleImpl.parse(
+    RequestedTuple projSet = Projections.parse(
         RowSetTestUtils.projectList("a", "b", "c"));
-    assertTrue(projSet instanceof RequestedTupleImpl);
-    assertEquals(ProjectionType.GENERAL, projSet.projectionType("a"));
-    assertEquals(ProjectionType.GENERAL, projSet.projectionType("b"));
-    assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("d"));
+    assertSame(TupleProjectionType.SOME, projSet.type());
+    assertTrue(projSet.isProjected("a"));
+    assertTrue(projSet.isProjected("b"));
+    assertTrue(projSet.isProjected("c"));
+    assertFalse(projSet.isProjected("d"));
 
     List<RequestedColumn> cols = projSet.projections();
     assertEquals(3, cols.size());
 
     RequestedColumn a = cols.get(0);
     assertEquals("a", a.name());
-    assertEquals(ProjectionType.GENERAL, a.type());
     assertTrue(a.isSimple());
-    assertFalse(a.isWildcard());
-
-    // We don't know if a is a map or not (the simple term "a" under-determines
-    // the column type.) In case it is a map, we assume all of the map is
-    // projected.
-
-    assertNotNull(a.mapProjection());
-    assertEquals(TupleProjectionType.ALL, a.mapProjection().type());
-    assertNull(a.indexes());
-
-    assertEquals("b", cols.get(1).name());
-    assertEquals(ProjectionType.GENERAL, cols.get(1).type());
-    assertTrue(cols.get(1).isSimple());
+    assertFalse(a.isArray());
+    assertFalse(a.isTuple());
+  }
 
-    assertEquals("c", cols.get(2).name());
-    assertEquals(ProjectionType.GENERAL, cols.get(2).type());
-    assertTrue(cols.get(2).isSimple());
+  /**
+   * The projection set does not enforce uniqueness.
+   */
+  @Test
+  public void testSimpleDups() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a", "b", "a"));
+    assertSame(TupleProjectionType.SOME, projSet.type());
+    assertEquals(2, projSet.projections().size());
+    assertEquals(2, ((RequestedColumnImpl) projSet.get("a")).refCount());
   }
 
+  /**
+   * Whole-map projection (note, fully projected maps are
+   * identical to projected simple columns at this level of
+   * abstraction.)
+   */
   @Test
   public void testProjectionWholeMap() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("map"));
 
-    // Whole-map projection (note, fully projected maps are
-    // identical to projected simple columns at this level of
-    // abstraction.)
-
-    List<SchemaPath> projCols = new ArrayList<>();
-    projCols.add(SchemaPath.getSimplePath("map"));
-    RequestedTuple projSet = RequestedTupleImpl.parse(projCols);
+    assertSame(TupleProjectionType.SOME, projSet.type());
+    assertTrue(projSet.isProjected("map"));
+    assertFalse(projSet.isProjected("another"));
 
-    assertTrue(projSet instanceof RequestedTupleImpl);
-    assertEquals(ProjectionType.GENERAL, projSet.projectionType("map"));
-    assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("another"));
     RequestedTuple mapProj = projSet.mapProjection("map");
     assertNotNull(mapProj);
-    assertTrue(mapProj instanceof ImpliedTupleRequest);
-    assertEquals(ProjectionType.GENERAL, mapProj.projectionType("foo"));
-    assertNotNull(projSet.mapProjection("another"));
-    assertEquals(ProjectionType.UNPROJECTED, projSet.mapProjection("another").projectionType("anyCol"));
+    assertSame(TupleProjectionType.ALL, mapProj.type());
+    assertTrue(mapProj.isProjected("foo"));
+
+    RequestedTuple anotherProj = projSet.mapProjection("another");
+    assertNotNull(anotherProj);
+    assertSame(TupleProjectionType.NONE, anotherProj.type());
+    assertFalse(anotherProj.isProjected("anyCol"));
   }
 
+  /**
+   * Selected map projection, multiple levels, full projection
+   * at leaf level.
+   */
   @Test
   public void testProjectionMapSubset() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("map.a", "map.b", "map.map2.x"));
+    assertSame(TupleProjectionType.SOME, projSet.type());
 
-    // Selected map projection, multiple levels, full projection
-    // at leaf level.
-
-    List<SchemaPath> projCols = new ArrayList<>();
-    projCols.add(SchemaPath.getCompoundPath("map", "a"));
-    projCols.add(SchemaPath.getCompoundPath("map", "b"));
-    projCols.add(SchemaPath.getCompoundPath("map", "map2", "x"));
-    RequestedTuple projSet = RequestedTupleImpl.parse(projCols);
-    assertTrue(projSet instanceof RequestedTupleImpl);
-    assertEquals(ProjectionType.TUPLE, projSet.projectionType("map"));
+    // Map itself is projected and has a map qualifier
+    assertTrue(projSet.isProjected("map"));
 
     // Map: an explicit map at top level
 
     RequestedTuple mapProj = projSet.mapProjection("map");
-    assertTrue(mapProj instanceof RequestedTupleImpl);
-    assertEquals(ProjectionType.GENERAL, mapProj.projectionType("a"));
-    assertEquals(ProjectionType.GENERAL, mapProj.projectionType("b"));
-    assertEquals(ProjectionType.TUPLE, mapProj.projectionType("map2"));
-    assertEquals(ProjectionType.UNPROJECTED, mapProj.projectionType("bogus"));
+    assertSame(TupleProjectionType.SOME, projSet.type());
+    assertTrue(mapProj.isProjected("a"));
+    assertTrue(mapProj.isProjected("b"));
+    assertTrue(mapProj.isProjected("map2"));
+    assertFalse(mapProj.isProjected("bogus"));
 
     // Map b: an implied nested map
 
+    assertTrue(mapProj.get("b").isSimple());
     RequestedTuple bMapProj = mapProj.mapProjection("b");
     assertNotNull(bMapProj);
-    assertTrue(bMapProj instanceof ImpliedTupleRequest);
-    assertEquals(ProjectionType.GENERAL, bMapProj.projectionType("foo"));
+    assertSame(TupleProjectionType.ALL, bMapProj.type());
+    assertTrue(bMapProj.isProjected("foo"));
 
     // Map2, an nested map, has an explicit projection
 
     RequestedTuple map2Proj = mapProj.mapProjection("map2");
     assertNotNull(map2Proj);
-    assertTrue(map2Proj instanceof RequestedTupleImpl);
-    assertEquals(ProjectionType.GENERAL, map2Proj.projectionType("x"));
-    assertEquals(ProjectionType.UNPROJECTED, map2Proj.projectionType("bogus"));
+    assertSame(TupleProjectionType.SOME, map2Proj.type());
+    assertTrue(map2Proj.isProjected("x"));
+    assertFalse(map2Proj.isProjected("bogus"));
   }
 
+  /**
+   * Project both a map member and the entire map.
+   */
   @Test
-  public void testProjectionMapFieldAndMap() {
-
-    // Project both a map member and the entire map.
+  public void testProjectionMapAndSimple() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("map.a", "map"));
 
-    {
-      List<SchemaPath> projCols = new ArrayList<>();
-      projCols.add(SchemaPath.getCompoundPath("map", "a"));
-      projCols.add(SchemaPath.getCompoundPath("map"));
+    RequestedTuple mapProj = projSet.mapProjection("map");
+    assertSame(TupleProjectionType.ALL, mapProj.type());
+    assertTrue(mapProj.isProjected("a"));
+    assertTrue(mapProj.isProjected("b"));
+  }
 
-      RequestedTuple projSet = RequestedTupleImpl.parse(projCols);
-      assertTrue(projSet instanceof RequestedTupleImpl);
-      assertEquals(ProjectionType.TUPLE, projSet.projectionType("map"));
+  /**
+   * Project both an entire map and a map member.
+   */
+  @Test
+  public void testProjectionSimpleAndMap() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("map", "map.a"));
 
-      RequestedTuple mapProj = projSet.mapProjection("map");
-      assertTrue(mapProj instanceof ImpliedTupleRequest);
-      assertEquals(ProjectionType.GENERAL, mapProj.projectionType("a"));
+    RequestedTuple mapProj = projSet.mapProjection("map");
+    assertSame(TupleProjectionType.ALL, mapProj.type());
+    assertTrue(mapProj.isProjected("a"));
+    assertTrue(mapProj.isProjected("b"));
+  }
 
-      // Didn't ask for b, but did ask for whole map.
+  /**
+   * Project both a map member and the entire map.
+   */
+  @Test
+  public void testProjectionMapAndWildcard() {
 
-      assertEquals(ProjectionType.GENERAL, mapProj.projectionType("b"));
-    }
+    // Built up by hand because "map.*" is not valid Drill
+    // expression syntax.
+    List<SchemaPath> projCols = new ArrayList<>();
+    projCols.add(SchemaPath.getCompoundPath("map", "a"));
+    projCols.add(SchemaPath.getCompoundPath("map", SchemaPath.DYNAMIC_STAR));
 
-    // Now the other way around.
+    RequestedTuple projSet = Projections.parse(projCols);
+    RequestedTuple mapProj = projSet.mapProjection("map");
+    assertSame(TupleProjectionType.ALL, mapProj.type());
+    assertTrue(mapProj.isProjected("a"));
+    assertTrue(mapProj.isProjected("b"));
+  }
 
-    {
-      List<SchemaPath> projCols = new ArrayList<>();
-      projCols.add(SchemaPath.getCompoundPath("map"));
-      projCols.add(SchemaPath.getCompoundPath("map", "a"));
+  /**
+   * Project both an entire map and a map member.
+   */
+    @Test
+    public void testProjectionWildcardAndMap() {
 
-      RequestedTuple projSet = RequestedTupleImpl.parse(projCols);
-      assertTrue(projSet instanceof RequestedTupleImpl);
-      assertEquals(ProjectionType.TUPLE, projSet.projectionType("map"));
+    List<SchemaPath> projCols = new ArrayList<>();
+    projCols.add(SchemaPath.getCompoundPath("map", SchemaPath.DYNAMIC_STAR));
+    projCols.add(SchemaPath.getCompoundPath("map", "a"));
 
-      RequestedTuple mapProj = projSet.mapProjection("map");
-      assertTrue(mapProj instanceof ImpliedTupleRequest);
-      assertEquals(ProjectionType.GENERAL, mapProj.projectionType("a"));
-      assertEquals(ProjectionType.GENERAL, mapProj.projectionType("b"));
-    }
+    RequestedTuple projSet = Projections.parse(projCols);
+    RequestedTuple mapProj = projSet.mapProjection("map");
+    assertSame(TupleProjectionType.ALL, mapProj.type());
+    assertTrue(mapProj.isProjected("a"));
+    assertTrue(mapProj.isProjected("b"));
   }
 
   @Test
   public void testMapDetails() {
-    RequestedTuple projSet = RequestedTupleImpl.parse(
+    RequestedTuple projSet = Projections.parse(
         RowSetTestUtils.projectList("a.b.c", "a.c", "d"));
     List<RequestedColumn> cols = projSet.projections();
     assertEquals(2, cols.size());
@@ -238,96 +260,62 @@ public class TestProjectedTuple extends BaseTest {
     assertFalse(a.isArray());
     assertTrue(a.isTuple());
 
-    {
-      assertNotNull(a.mapProjection());
-      List<RequestedColumn> aMembers = a.mapProjection().projections();
-      assertEquals(2, aMembers.size());
+    // a{}
+    assertNotNull(a.tuple());
+    List<RequestedColumn> aMembers = a.tuple().projections();
+    assertEquals(2, aMembers.size());
+
+    // a.b
+    RequestedColumn a_b = aMembers.get(0);
+    assertEquals("b", a_b.name());
+    assertTrue(a_b.isTuple());
 
-      RequestedColumn a_b = aMembers.get(0);
-      assertEquals("b", a_b.name());
-      assertTrue(a_b.isTuple());
+    // a.b{}
+    assertNotNull(a_b.tuple());
+    List<RequestedColumn> a_bMembers = a_b.tuple().projections();
+    assertEquals(1, a_bMembers.size());
 
-      {
-        assertNotNull(a_b.mapProjection());
-        List<RequestedColumn> a_bMembers = a_b.mapProjection().projections();
-        assertEquals(1, a_bMembers.size());
-        assertEquals("c", a_bMembers.get(0).name());
-        assertTrue(a_bMembers.get(0).isSimple());
-      }
+    // a.b.c
+    assertEquals("c", a_bMembers.get(0).name());
+    assertTrue(a_bMembers.get(0).isSimple());
 
-      assertEquals("c", aMembers.get(1).name());
-      assertTrue(aMembers.get(1).isSimple());
-    }
+    // a.c
+    assertEquals("c", aMembers.get(1).name());
+    assertTrue(aMembers.get(1).isSimple());
 
+    // d
     assertEquals("d", cols.get(1).name());
     assertTrue(cols.get(1).isSimple());
   }
 
-  @Test
-  public void testMapDups() {
-    try {
-      RequestedTupleImpl.parse(
-          RowSetTestUtils.projectList("a.b", "a.c", "a.b"));
-      fail();
-    } catch (UserException e) {
-      // Expected
-    }
-  }
-
   /**
-   * When the project list includes references to both the
-   * map as a whole, and members, then the parser is forgiving
-   * of duplicate map members since all members are projected.
+   * Duplicate column names are merged for projection.
    */
-
-  @Test
-  public void testMapDupsIgnored() {
-    RequestedTuple projSet = RequestedTupleImpl.parse(
-          RowSetTestUtils.projectList("a", "a.b", "a.c", "a.b"));
-    List<RequestedColumn> cols = projSet.projections();
-    assertEquals(1, cols.size());
-  }
-
   @Test
-  public void testWildcard() {
-    RequestedTuple projSet = RequestedTupleImpl.parse(
-        RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR));
-    List<RequestedColumn> cols = projSet.projections();
-    assertEquals(1, cols.size());
-
-    RequestedColumn wildcard = cols.get(0);
-    assertEquals(ProjectionType.WILDCARD, wildcard.type());
-    assertEquals(SchemaPath.DYNAMIC_STAR, wildcard.name());
-    assertFalse(wildcard.isSimple());
-    assertTrue(wildcard.isWildcard());
-    assertNull(wildcard.mapProjection());
-    assertNull(wildcard.indexes());
-  }
+  public void testMapDups() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a.b", "a.c", "a.b"));
 
-  @Test
-  public void testSimpleDups() {
-    try {
-      RequestedTupleImpl.parse(RowSetTestUtils.projectList("a", "b", "a"));
-      fail();
-    } catch (UserException e) {
-      // Expected
-    }
+    RequestedTuple aMap = projSet.mapProjection("a");
+    assertEquals(2, aMap.projections().size());
+    assertEquals(2, ((RequestedColumnImpl) aMap.get("b")).refCount());
   }
 
   @Test
   public void testArray() {
-    RequestedTuple projSet = RequestedTupleImpl.parse(
+    RequestedTuple projSet = Projections.parse(
         RowSetTestUtils.projectList("a[1]", "a[3]"));
     List<RequestedColumn> cols = projSet.projections();
     assertEquals(1, cols.size());
 
-    assertEquals(ProjectionType.ARRAY, projSet.projectionType("a"));
     RequestedColumn a = cols.get(0);
     assertEquals("a", a.name());
     assertTrue(a.isArray());
+    assertEquals(1, a.arrayDims());
     assertFalse(a.isSimple());
     assertFalse(a.isTuple());
-    boolean[] indexes = a.indexes();
+    assertTrue(a.hasIndexes());
+    boolean indexes[] = a.indexes();
     assertNotNull(indexes);
     assertEquals(4, indexes.length);
     assertFalse(indexes[0]);
@@ -336,15 +324,37 @@ public class TestProjectedTuple extends BaseTest {
     assertTrue(indexes[3]);
   }
 
+  @Test
+  public void testMultiDimArray() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a[0][1][2]", "a[2][3]"));
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(1, cols.size());
+
+    RequestedColumn a = cols.get(0);
+    assertEquals("a", a.name());
+    assertTrue(a.isArray());
+    // Dimension count is the maximum seen.
+    assertEquals(3, a.arrayDims());
+    assertFalse(a.isSimple());
+    assertFalse(a.isTuple());
+    boolean[] indexes = a.indexes();
+    assertNotNull(indexes);
+    assertEquals(3, indexes.length);
+    assertTrue(indexes[0]);
+    assertFalse(indexes[1]);
+    assertTrue(indexes[2]);
+  }
+
   /**
    * Duplicate array entries are allowed to handle the
    * use case of a[1], a[1].z. Each element is reported once;
    * the project operator will create copies as needed.
    */
   @Test
-  public void testArrayDups() {
-    RequestedTuple projSet = RequestedTupleImpl.parse(
-      RowSetTestUtils.projectList("a[1]", "a[3]", "a[1]", "a[3].z"));
+  public void testArrayDupsIgnored() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a[1]", "a[3]", "a[1]", "a[3].z"));
 
     List<RequestedColumn> cols = projSet.projections();
     assertEquals(1, cols.size());
@@ -352,7 +362,7 @@ public class TestProjectedTuple extends BaseTest {
     RequestedColumn a = cols.get(0);
     assertEquals("a", a.name());
     assertTrue(a.isArray());
-    boolean[] indexes = a.indexes();
+    boolean indexes[] = a.indexes();
     assertNotNull(indexes);
     assertEquals(4, indexes.length);
     assertFalse(indexes[0]);
@@ -363,7 +373,7 @@ public class TestProjectedTuple extends BaseTest {
 
   @Test
   public void testArrayAndSimple() {
-    RequestedTuple projSet = RequestedTupleImpl.parse(
+    RequestedTuple projSet = Projections.parse(
         RowSetTestUtils.projectList("a[1]", "a"));
     List<RequestedColumn> cols = projSet.projections();
     assertEquals(1, cols.size());
@@ -376,7 +386,7 @@ public class TestProjectedTuple extends BaseTest {
 
   @Test
   public void testSimpleAndArray() {
-    RequestedTuple projSet = RequestedTupleImpl.parse(
+    RequestedTuple projSet = Projections.parse(
         RowSetTestUtils.projectList("a", "a[1]"));
     List<RequestedColumn> cols = projSet.projections();
     assertEquals(1, cols.size());
@@ -384,20 +394,87 @@ public class TestProjectedTuple extends BaseTest {
     RequestedColumn a = cols.get(0);
     assertEquals("a", a.name());
     assertTrue(a.isArray());
+    assertFalse(a.hasIndexes());
     assertNull(a.indexes());
-    assertEquals(ProjectionType.ARRAY, projSet.projectionType("a"));
-    assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("foo"));
   }
 
   @Test
   // Drill syntax does not support map arrays
   public void testMapArray() {
-    RequestedTuple projSet = RequestedTupleImpl.parse(
+    RequestedTuple projSet = Projections.parse(
         RowSetTestUtils.projectList("a[1].x"));
     List<RequestedColumn> cols = projSet.projections();
     assertEquals(1, cols.size());
 
-    assertEquals(ProjectionType.TUPLE_ARRAY, cols.get(0).type());
-    assertEquals(ProjectionType.TUPLE_ARRAY, projSet.projectionType("a"));
+    RequestedColumn a = cols.get(0);
+
+    // Column acts like an array
+    assertTrue(a.isArray());
+    assertTrue(a.hasIndexes());
+    assertEquals(1, a.arrayDims());
+
+    // And the column acts like a map
+    assertTrue(a.isTuple());
+    RequestedTuple aProj = a.tuple();
+    assertSame(TupleProjectionType.SOME, aProj.type());
+    assertTrue(aProj.isProjected("x"));
+    assertFalse(aProj.isProjected("y"));
+  }
+
+  @Test
+  // Drill syntax does not support map arrays
+  public void testMap2DArray() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("a[1][2].x"));
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(1, cols.size());
+
+    RequestedColumn a = cols.get(0);
+
+    // Column acts like an array
+    assertTrue(a.isArray());
+    assertTrue(a.hasIndexes());
+
+    // Note that the multiple dimensions are inferred only through
+    // the multiple levels of qualifiers.
+
+    // And the column acts like a map
+    assertTrue(a.isTuple());
+    RequestedTuple aProj = a.tuple();
+    assertSame(TupleProjectionType.SOME, aProj.type());
+    assertTrue(aProj.isProjected("x"));
+    assertFalse(aProj.isProjected("y"));
+  }
+
+  /**
+   * Projection does not enforce semantics; it just report what it
+   * sees. This allows cases such as m.a and m[0], which might mean
+   * that m is a map array, m.a wants an array of a-member values, and m[0]
+   * wants the first map in the array. Not clear Drill actually supports
+   * these cases, however.
+   */
+  @Test
+  public void testArrayAndMap() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("m.a", "m[0]"));
+    RequestedColumn m = projSet.get("m");
+    assertTrue(m.isArray());
+    assertEquals(1, m.arrayDims());
+    assertTrue(m.isTuple());
+    assertTrue(m.tuple().isProjected("a"));
+    assertFalse(m.tuple().isProjected("b"));
+  }
+
+  @Test
+  public void testMapAndArray() {
+    RequestedTuple projSet = Projections.parse(
+        RowSetTestUtils.projectList("m[0]", "m.a"));
+    RequestedColumn m = projSet.get("m");
+    assertTrue(m.isArray());
+    assertEquals(1, m.arrayDims());
+    assertTrue(m.isTuple());
+    assertTrue(m.tuple().isProjected("a"));
+    // m[0] requests the entire tuple
+    assertTrue(m.tuple().isProjected("b"));
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectionType.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectionType.java
deleted file mode 100644
index 341e10d..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectionType.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.resultSet.project;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.drill.categories.RowSetTests;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.test.BaseTest;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(RowSetTests.class)
-public class TestProjectionType extends BaseTest {
-
-  @Test
-  public void testQueries() {
-    assertFalse(ProjectionType.UNPROJECTED.isTuple());
-    assertFalse(ProjectionType.WILDCARD.isTuple());
-    assertFalse(ProjectionType.GENERAL.isTuple());
-    assertFalse(ProjectionType.SCALAR.isTuple());
-    assertTrue(ProjectionType.TUPLE.isTuple());
-    assertFalse(ProjectionType.ARRAY.isTuple());
-    assertTrue(ProjectionType.TUPLE_ARRAY.isTuple());
-
-    assertFalse(ProjectionType.UNPROJECTED.isArray());
-    assertFalse(ProjectionType.WILDCARD.isArray());
-    assertFalse(ProjectionType.GENERAL.isArray());
-    assertFalse(ProjectionType.SCALAR.isArray());
-    assertFalse(ProjectionType.TUPLE.isArray());
-    assertTrue(ProjectionType.ARRAY.isArray());
-    assertTrue(ProjectionType.TUPLE_ARRAY.isArray());
-
-    assertFalse(ProjectionType.UNPROJECTED.isMaybeScalar());
-    assertFalse(ProjectionType.WILDCARD.isMaybeScalar());
-    assertTrue(ProjectionType.GENERAL.isMaybeScalar());
-    assertTrue(ProjectionType.SCALAR.isMaybeScalar());
-    assertFalse(ProjectionType.TUPLE.isMaybeScalar());
-    assertFalse(ProjectionType.ARRAY.isMaybeScalar());
-    assertFalse(ProjectionType.TUPLE_ARRAY.isMaybeScalar());
-  }
-
-  @Test
-  public void testLabel() {
-
-    // Only worry about the types that could conflict and thus
-    // would show up in error messages.
-
-    assertEquals(ProjectionType.UNPROJECTED.name(), ProjectionType.UNPROJECTED.label());
-    assertEquals("wildcard (*)", ProjectionType.WILDCARD.label());
-    assertEquals(ProjectionType.GENERAL.name(), ProjectionType.GENERAL.label());
-    assertEquals("scalar (a)", ProjectionType.SCALAR.label());
-    assertEquals("tuple (a.x)", ProjectionType.TUPLE.label());
-    assertEquals("array (a[n])", ProjectionType.ARRAY.label());
-    assertEquals("tuple array (a[n].x)", ProjectionType.TUPLE_ARRAY.label());
-  }
-
-  @Test
-  public void testTypeFor() {
-
-    // Test the return of the projection type most specific
-    // for a data type. The projection type under-specifies
-    // the data type, but is a hint.
-
-    assertEquals(ProjectionType.TUPLE, ProjectionType.typeFor(Types.required(MinorType.MAP)));
-    assertEquals(ProjectionType.TUPLE_ARRAY, ProjectionType.typeFor(Types.repeated(MinorType.MAP)));
-    assertEquals(ProjectionType.ARRAY, ProjectionType.typeFor(Types.repeated(MinorType.INT)));
-    assertEquals(ProjectionType.ARRAY, ProjectionType.typeFor(Types.required(MinorType.LIST)));
-    assertEquals(ProjectionType.SCALAR, ProjectionType.typeFor(Types.required(MinorType.INT)));
-  }
-
-  @Test
-  public void testCompatibility() {
-
-    // Only SCALAR, TUPLE, ARRAY and TUPLE_ARRAY are expected for the
-    // argument, but we check all cases for completeness.
-    // Note that the cases are not always symmetrical:
-    // a map array column is compatible with a map projection,
-    // but a map column is not compatible with a map array projection.
-
-    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.UNPROJECTED));
-    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.WILDCARD));
-    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.GENERAL));
-    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.SCALAR));
-    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.TUPLE));
-    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.ARRAY));
-    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.TUPLE_ARRAY));
-
-    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.UNPROJECTED));
-    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.WILDCARD));
-    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.GENERAL));
-    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.SCALAR));
-    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.TUPLE));
-    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.ARRAY));
-    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.TUPLE_ARRAY));
-
-    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.UNPROJECTED));
-    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.WILDCARD));
-    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.GENERAL));
-    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.SCALAR));
-    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.TUPLE));
-    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.ARRAY));
-    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.TUPLE_ARRAY));
-
-    assertTrue(ProjectionType.SCALAR.isCompatible(ProjectionType.UNPROJECTED));
-    assertTrue(ProjectionType.SCALAR.isCompatible(ProjectionType.WILDCARD));
-    assertTrue(ProjectionType.SCALAR.isCompatible(ProjectionType.GENERAL));
-    assertTrue(ProjectionType.SCALAR.isCompatible(ProjectionType.SCALAR));
-    assertFalse(ProjectionType.SCALAR.isCompatible(ProjectionType.TUPLE));
-    assertFalse(ProjectionType.SCALAR.isCompatible(ProjectionType.ARRAY));
-    assertFalse(ProjectionType.SCALAR.isCompatible(ProjectionType.TUPLE_ARRAY));
-
-    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.UNPROJECTED));
-    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.WILDCARD));
-    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.GENERAL));
-    assertFalse(ProjectionType.TUPLE.isCompatible(ProjectionType.SCALAR));
-    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.TUPLE));
-    assertFalse(ProjectionType.TUPLE.isCompatible(ProjectionType.ARRAY));
-    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.TUPLE_ARRAY));
-
-    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.UNPROJECTED));
-    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.WILDCARD));
-    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.GENERAL));
-    assertFalse(ProjectionType.ARRAY.isCompatible(ProjectionType.SCALAR));
-    assertFalse(ProjectionType.ARRAY.isCompatible(ProjectionType.TUPLE));
-    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.ARRAY));
-    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.TUPLE_ARRAY));
-
-    assertTrue(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.UNPROJECTED));
-    assertTrue(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.WILDCARD));
-    assertTrue(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.GENERAL));
-    assertFalse(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.SCALAR));
-    assertFalse(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.TUPLE));
-    assertFalse(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.ARRAY));
-    assertTrue(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.TUPLE_ARRAY));
-  }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index 171669a..0bbdf78 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -60,7 +60,6 @@ import static org.junit.Assert.fail;
  *
  * @see TestHeaderBuilder
  */
-
 @Category(RowSetTests.class)
 public class TestCsvWithHeaders extends BaseCsvTest {
 
@@ -136,7 +135,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
   /**
    * Trivial case: empty header. This case should fail.
    */
-
   @Test
   public void testEmptyCsvHeaders() throws IOException {
     buildFile(EMPTY_HEADERS_FILE, emptyHeaders);
@@ -174,7 +172,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     buildFile(EMPTY_BODY_FILE, emptyBody);
 
     // SELECT * query: expect schema-only result.
-
     RowSet rowSet = client.queryBuilder().sql(makeStatement(EMPTY_BODY_FILE)).rowSet();
     TupleMetadata expectedSchema = new SchemaBuilder()
         .add("a", MinorType.VARCHAR)
@@ -186,7 +183,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     RowSetUtilities.verify(expected, rowSet);
 
     // Try again with COUNT(*)
-
     long count = client.queryBuilder().sql(COUNT_STAR, EMPTY_BODY_FILE).singletonLong();
     assertEquals(0, count);
   }
@@ -289,7 +285,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
    * of just one implicit column. V3 uses non-nullable VARCHAR for file
    * metadata columns.
    */
-
   @Test
   public void testImplicitColsExplicitSelect() throws IOException {
     String sql = "SELECT A, filename FROM `dfs.data`.`%s`";
@@ -311,7 +306,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
    * of just one implicit column. V3 uses non-nullable VARCHAR for file
    * metadata columns.
    */
-
   @Test
   public void testImplicitColWildcard() throws IOException {
     String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
@@ -435,8 +429,8 @@ public class TestCsvWithHeaders extends BaseCsvTest {
 
     RowSet rowSet;
     if (SCHEMA_BATCH_ENABLED) {
-      // First batch is empty; just carries the schema.
 
+      // First batch is empty; just carries the schema.
       assertTrue(iter.hasNext());
       rowSet = iter.next();
       assertEquals(0, rowSet.rowCount());
@@ -444,13 +438,11 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     }
 
     // Read the other two batches.
-
     for (int i = 0; i < 2; i++) {
       assertTrue(iter.hasNext());
       rowSet = iter.next();
 
       // Figure out which record this is and test accordingly.
-
       RowSetReader reader = rowSet.reader();
       assertTrue(reader.next());
       String col1 = reader.scalar(0).getString();
@@ -493,8 +485,8 @@ public class TestCsvWithHeaders extends BaseCsvTest {
 
     RowSet rowSet;
     if (SCHEMA_BATCH_ENABLED) {
-      // First batch is empty; just carries the schema.
 
+      // First batch is empty; just carries the schema.
       assertTrue(iter.hasNext());
       rowSet = iter.next();
       RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
@@ -502,13 +494,11 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     }
 
     // Read the two batches.
-
     for (int i = 0; i < 2; i++) {
       assertTrue(iter.hasNext());
       rowSet = iter.next();
 
       // Figure out which record this is and test accordingly.
-
       RowSetReader reader = rowSet.reader();
       assertTrue(reader.next());
       String aCol = reader.scalar("a").getString();
@@ -548,8 +538,8 @@ public class TestCsvWithHeaders extends BaseCsvTest {
 
     RowSet rowSet;
     if (SCHEMA_BATCH_ENABLED) {
-      // First batch is empty; just carries the schema.
 
+      // First batch is empty; just carries the schema.
       assertTrue(iter.hasNext());
       rowSet = iter.next();
       RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
@@ -557,13 +547,11 @@ public class TestCsvWithHeaders extends BaseCsvTest {
     }
 
     // Read the two batches.
-
     for (int i = 0; i < 2; i++) {
       assertTrue(iter.hasNext());
       rowSet = iter.next();
 
       // Figure out which record this is and test accordingly.
-
       RowSetReader reader = rowSet.reader();
       assertTrue(reader.next());
       String aCol = reader.scalar("a").getString();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
index 9416748..363bb70 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
@@ -47,7 +47,6 @@ import org.junit.experimental.categories.Category;
  * and without an external schema. Data is represented with the
  * `columns` array column.
  */
-
 @Category(RowSetTests.class)
 public class TestCsvWithoutHeaders extends BaseCsvTest {
 
@@ -79,7 +78,6 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
   protected static void buildNestedTableWithoutHeaders() throws IOException {
 
     // Two-level partitioned table
-
     File rootDir = new File(testDir, PART_DIR);
     rootDir.mkdir();
     buildFile(new File(rootDir, ROOT_FILE), sampleData);
@@ -91,7 +89,6 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
   /**
    * Verify that the wildcard expands to the `columns` array
    */
-
   @Test
   public void testWildcard() throws IOException {
     String sql = "SELECT * FROM `dfs.data`.`%s`";
@@ -222,7 +219,6 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
    * data columns (so that data columns don't shift positions if
    * files are nested to another level.)
    */
-
   @Test
   public void testPartitionExpansion() throws IOException {
     String sql = "SELECT * FROM `dfs.data`.`%s`";
@@ -235,8 +231,8 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
 
     RowSet rowSet;
     if (SCHEMA_BATCH_ENABLED) {
-      // First batch is empty; just carries the schema.
 
+      // First batch is empty; just carries the schema.
       assertTrue(iter.hasNext());
       rowSet = iter.next();
       assertEquals(0, rowSet.rowCount());
@@ -244,13 +240,11 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
     }
 
     // Read the two data batches.
-
     for (int i = 0; i < 2; i++) {
       assertTrue(iter.hasNext());
       rowSet = iter.next();
 
       // Figure out which record this is and test accordingly.
-
       RowSetReader reader = rowSet.reader();
       assertTrue(reader.next());
       ArrayReader ar = reader.array(0);
@@ -279,7 +273,6 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
    * V2 message: DATA_READ ERROR: Selected column 'columns' must be an array index
    * @throws Exception
    */
-
   @Test
   public void testColumnsAsMap() throws Exception {
     String sql = "SELECT `%s`.columns.foo FROM `dfs.data`.`%s`";
@@ -298,7 +291,6 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
    * V2 message: INTERNAL_ERROR ERROR: 70000
    * @throws Exception
    */
-
   @Test
   public void testColumnsIndexOverflow() throws Exception {
     String sql = "SELECT columns[70000] FROM `dfs.data`.`%s`";