You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/06/07 10:38:53 UTC

[drill] 04/05: DRILL-7278: Refactor result set loader projection mechanism

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0dbd501c0e2b46073dc3308f81c293452139822b
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Thu May 16 17:34:59 2019 -0700

    DRILL-7278: Refactor result set loader projection mechanism
    
    Drill 1.16 added a enhanced scan framework based on the row set
    mechanisms, and a "provisioned schema" feature build on top
    of that framework. Conversion of the log reader plugin to use
    the framework identified additional features we wish to add,
    such as marking a column as "special" (not expanded in a wildcard
    query.)
    
    This work identified that the code added for provisioned schemas in
    Drill 1.16 worked, but is a bit overly complex, making it hard to add
    the desired new feature.
    
    This patch refactors the "reader" projection code:
    
    * Create a "projection set" mechanism that the reader can query to ask,
      "the caller just added a column. Should it be projected or not?"
    * Unifies the type conversion mechanism added as part of provisioned
      schemas.
    * Added the "special column" property for both "reader" and "provided"
      schemas.
    * Verified that provisioned schemas work with maps (at least on the scan
      framework side.)
    * Replaced the previous "schema transformer" mechanism with a new "type
      conversion" mechanism that unifies type conversion, provided schemas
      and an optional custom type conversion mechanism.
    * Column writers can report if they are projected. Moved this query
      from metadata to the column writer itself.
    * Extended and clarified documentation of the feature.
    * Revised and/or added unit tests.
    
    closes #1797
---
 .../scan/project/AbstractUnresolvedColumn.java     |  38 +-
 .../scan/project/ReaderSchemaOrchestrator.java     |  18 +-
 .../impl/scan/project/ResolvedNullColumn.java      |  12 +
 .../impl/scan/project/ScanLevelProjection.java     |  63 ++-
 .../impl/scan/project/ScanSchemaOrchestrator.java  |  68 +--
 .../scan/project/WildcardSchemaProjection.java     |  35 +-
 .../physical/impl/scan/project/package-info.java   | 182 +++++-
 .../project/projSet/AbstractProjectionSet.java     |  80 +++
 .../scan/project/projSet/AbstractReadColProj.java  |  50 ++
 .../scan/project/projSet/EmptyProjectionSet.java   |  40 ++
 .../project/projSet/ExplicitProjectionSet.java     | 109 ++++
 .../scan/project/projSet/ProjectedMapColumn.java   |  39 ++
 .../scan/project/projSet/ProjectedReadColumn.java  |  78 +++
 .../scan/project/projSet/ProjectionSetBuilder.java | 101 ++++
 .../scan/project/projSet/ProjectionSetFactory.java |  79 +++
 .../impl/scan/project/projSet/TypeConverter.java   | 173 ++++++
 .../project/projSet/UnprojectedReadColumn.java     |  41 ++
 .../project/projSet/WildcardProjectionSet.java     |  55 ++
 .../impl/scan/project/projSet/package-info.java    |  99 ++++
 .../drill/exec/physical/rowSet/ProjectionSet.java  | 105 ++++
 .../exec/physical/rowSet/impl/ColumnBuilder.java   | 218 ++-----
 .../exec/physical/rowSet/impl/ColumnState.java     |   3 +-
 .../exec/physical/rowSet/impl/ContainerState.java  |  18 +-
 .../rowSet/impl/DefaultSchemaTransformer.java      |  77 ---
 .../drill/exec/physical/rowSet/impl/ListState.java |   7 +-
 .../exec/physical/rowSet/impl/LoaderInternals.java |   8 +
 .../exec/physical/rowSet/impl/OptionBuilder.java   |  47 +-
 .../physical/rowSet/impl/RepeatedListState.java    |   5 +-
 .../physical/rowSet/impl/ResultSetLoaderImpl.java  |  36 +-
 .../physical/rowSet/impl/SchemaTransformer.java    |  46 --
 .../rowSet/impl/SchemaTransformerImpl.java         | 194 -------
 .../exec/physical/rowSet/impl/TupleState.java      |  12 +-
 .../exec/physical/rowSet/impl/UnionState.java      |   5 +-
 .../rowSet/project/ImpliedTupleRequest.java        |   8 +-
 .../physical/rowSet/project/ProjectionType.java    | 178 ++++++
 .../rowSet/project/RequestedColumnImpl.java        |  29 +-
 .../physical/rowSet/project/RequestedTuple.java    |  14 +-
 .../rowSet/project/RequestedTupleImpl.java         |  30 +-
 .../record/metadata/AbstractColumnMetadata.java    |  34 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java      | 138 +----
 .../exec/store/easy/text/TextFormatPlugin.java     |  78 +--
 .../easy/text/compliant/v3/FieldVarCharOutput.java |   2 +-
 .../text/compliant/v3/RepeatedVarCharOutput.java   |   2 +-
 .../physical/impl/scan/TestFileScanFramework.java  |   1 -
 .../impl/scan/TestScanOperExecOuputSchema.java     |  75 ++-
 .../impl/scan/TestScanOrchestratorEarlySchema.java |   8 +-
 .../impl/scan/project/TestScanLevelProjection.java | 208 +++++--
 .../scan/project/projSet/TestProjectionSet.java    | 625 +++++++++++++++++++++
 .../impl/TestResultSetLoaderEmptyProject.java      |  13 +-
 .../rowSet/impl/TestResultSetLoaderProjection.java |  92 +--
 .../impl/TestResultSetLoaderTypeConversion.java    |  22 +-
 .../{impl => project}/TestProjectedTuple.java      |  65 ++-
 .../rowSet/project/TestProjectionType.java         | 154 +++++
 .../record/metadata/TestMetadataProperties.java    |  20 -
 .../store/easy/text/compliant/BaseCsvTest.java     |   2 -
 .../apache/drill/exec/store/log/TestLogReader.java |  65 ++-
 .../drill/test/rowSet/test/TestDummyWriter.java    |  20 +-
 .../exec/record/metadata/AbstractPropertied.java   |   9 +
 .../drill/exec/record/metadata/ColumnMetadata.java |  30 +-
 .../drill/exec/record/metadata/ProjectionType.java | 102 ----
 .../drill/exec/record/metadata/Propertied.java     |   2 +
 .../drill/exec/vector/accessor/ColumnWriter.java   |  10 +
 .../drill/exec/vector/accessor/TupleWriter.java    |  15 -
 .../accessor/convert/AbstractWriteConverter.java   |   5 +
 .../accessor/writer/AbstractArrayWriter.java       |   3 +
 .../accessor/writer/AbstractObjectWriter.java      |   3 +
 .../accessor/writer/AbstractScalarWriterImpl.java  |   3 +
 .../accessor/writer/AbstractTupleWriter.java       |  12 +-
 .../exec/vector/accessor/writer/MapWriter.java     |  20 +-
 .../vector/accessor/writer/UnionWriterImpl.java    |   3 +
 .../accessor/writer/dummy/DummyArrayWriter.java    |   3 +
 .../accessor/writer/dummy/DummyScalarWriter.java   |   3 +
 72 files changed, 2918 insertions(+), 1329 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/AbstractUnresolvedColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/AbstractUnresolvedColumn.java
index 0bbae00..8668407 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/AbstractUnresolvedColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/AbstractUnresolvedColumn.java
@@ -24,33 +24,47 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
  * Represents a projected column that has not yet been bound to a
  * table column, special column or a null column. Once bound, this
  * column projection is replaced with the detailed binding.
+ * <p>
+ * Occurs in a scan-level projection to identify columns needed in
+ * the output batch. Once we see reader data, we create a
+ * {@link ResolvedColumn} to replace this unresolved form. The
+ * resolved form identifies how to map data from its source (reader,
+ * null column builder, etc.) to the output batch. Thus the columns
+ * here are placeholders to be rewritten once more data is available.
  */
+
 public abstract class AbstractUnresolvedColumn implements ColumnProjection {
 
+  /**
+   * Represents an unresolved table column to be provided by the
+   * reader (or filled in with nulls.) May be associated with
+   * a provided schema column.
+   */
+
   public static class UnresolvedColumn extends AbstractUnresolvedColumn {
 
+    private final ColumnMetadata colDefn;
+
     public UnresolvedColumn(RequestedColumn inCol) {
-      super(inCol);
+      this(inCol, null);
     }
-  }
 
-  public static class UnresolvedWildcardColumn extends AbstractUnresolvedColumn {
-
-    public UnresolvedWildcardColumn(RequestedColumn inCol) {
+    public UnresolvedColumn(RequestedColumn inCol, ColumnMetadata colDefn) {
       super(inCol);
+      this.colDefn = colDefn;
     }
-  }
 
-  public static class UnresolvedSchemaColumn extends AbstractUnresolvedColumn {
+    public ColumnMetadata metadata() { return colDefn; }
 
-    private final ColumnMetadata colDefn;
+    @Override
+    public String name() { return colDefn == null ? super.name() : colDefn.name(); }
+ }
+
+  public static class UnresolvedWildcardColumn extends AbstractUnresolvedColumn {
 
-    public UnresolvedSchemaColumn(RequestedColumn inCol, ColumnMetadata colDefn) {
+    public UnresolvedWildcardColumn(RequestedColumn inCol) {
       super(inCol);
-      this.colDefn = colDefn;
     }
-
-    public ColumnMetadata metadata() { return colDefn; }
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
index 49a39f2..7dfcdf0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.scan.project;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder.NullBuilderBuilder;
 import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetBuilder;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
 import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
@@ -72,7 +73,6 @@ public class ReaderSchemaOrchestrator implements VectorSource {
     options.setRowCountLimit(Math.min(readerBatchSize, scanOrchestrator.options.scanBatchRecordLimit));
     options.setVectorCache(scanOrchestrator.vectorCache);
     options.setBatchSizeLimit(scanOrchestrator.options.scanBatchByteLimit);
-    options.setSchemaTransform(scanOrchestrator.options.schemaTransformer);
     options.setContext(errorContext);
 
     // Set up a selection list if available and is a subset of
@@ -82,9 +82,9 @@ public class ReaderSchemaOrchestrator implements VectorSource {
     // the odd case where the reader claims a fixed schema, but
     // adds a column later.
 
-    if (! scanOrchestrator.scanProj.projectAll()) {
-      options.setProjectionSet(scanOrchestrator.scanProj.readerProjection());
-    }
+    ProjectionSetBuilder projBuilder = scanOrchestrator.scanProj.projectionSet();
+    projBuilder.typeConverter(scanOrchestrator.options.typeConverter);
+    options.setProjection(projBuilder.build());
     options.setSchema(readerSchema);
 
     // Create the table loader
@@ -208,13 +208,11 @@ public class ReaderSchemaOrchestrator implements VectorSource {
   }
 
   private ResolvedRow newRootTuple() {
-    NullBuilderBuilder nullBuilder = new NullBuilderBuilder()
+    return new ResolvedRow(new NullBuilderBuilder()
         .setNullType(scanOrchestrator.options.nullType)
-        .allowRequiredNullColumns(scanOrchestrator.options.allowRequiredNullColumns);
-    if (scanOrchestrator.options.schemaTransformer != null) {
-      nullBuilder.setOutputSchema(scanOrchestrator.options.schemaTransformer.outputSchema());
-    }
-    return new ResolvedRow(nullBuilder.build());
+        .allowRequiredNullColumns(scanOrchestrator.options.allowRequiredNullColumns)
+        .setOutputSchema(scanOrchestrator.options.outputSchema())
+        .build());
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java
index 0048813..4e4c026 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java
@@ -78,4 +78,16 @@ public class ResolvedNullColumn extends ResolvedColumn implements NullColumnSpec
 
   @Override
   public String defaultValue() { return defaultValue; }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf
+      .append("[")
+      .append(getClass().getSimpleName())
+      .append(" name=")
+      .append(name())
+      .append("]");
+    return buf.toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
index 4a02b33..9593c22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -23,14 +23,13 @@ import java.util.List;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedSchemaColumn;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedWildcardColumn;
-import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetBuilder;
+import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 
@@ -203,11 +202,6 @@ public class ScanLevelProjection {
              this == SCHEMA_WILDCARD ||
              this == STRICT_SCHEMA_WILDCARD;
     }
-
-    public boolean hasSchema() {
-      return this == SCHEMA_WILDCARD ||
-             this == STRICT_SCHEMA_WILDCARD;
-    }
   }
 
   /**
@@ -399,18 +393,20 @@ public class ScanLevelProjection {
     // projection. With a schema, we want the schema columns (which may
     // or may not correspond to reader columns.)
 
-    List<RequestedColumn> outputProj;
-    if (projectionType == ScanProjectionType.WILDCARD) {
-      outputProj = null;
+    if (projectionType != ScanProjectionType.EMPTY &&
+        projectionType != ScanProjectionType.EXPLICIT) {
+
+      readerProjection = ImpliedTupleRequest.ALL_MEMBERS;
     } else {
-      outputProj = new ArrayList<>();
+
+      List<RequestedColumn> outputProj = new ArrayList<>();
       for (ColumnProjection col : outputCols) {
         if (col instanceof AbstractUnresolvedColumn) {
           outputProj.add(((AbstractUnresolvedColumn) col).element());
         }
       }
+      readerProjection = RequestedTupleImpl.build(outputProj);
     }
-    readerProjection = RequestedTupleImpl.build(outputProj);
   }
 
   /**
@@ -428,9 +424,9 @@ public class ScanLevelProjection {
       throw new IllegalArgumentException("Duplicate * entry in project list");
     }
 
-    // Expand schema columns, if provided
+    // Expand strict schema columns, if provided
 
-    expandOutputSchema();
+    boolean expanded = expandOutputSchema();
 
     // Remember the wildcard position, if we need to insert it.
     // Ensures that the main wildcard expansion occurs before add-on
@@ -454,7 +450,7 @@ public class ScanLevelProjection {
     // If not consumed, put the wildcard column into the projection list as a
     // placeholder to be filled in later with actual table columns.
 
-    if (hasOutputSchema()) {
+    if (expanded) {
       projectionType =
           outputSchema.booleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)
           ? ScanProjectionType.STRICT_SCHEMA_WILDCARD
@@ -465,9 +461,9 @@ public class ScanLevelProjection {
     }
   }
 
-  private void expandOutputSchema() {
+  private boolean expandOutputSchema() {
     if (outputSchema == null) {
-      return;
+      return false;
     }
 
     // Expand the wildcard. From the perspective of the reader, this is an explicit
@@ -477,10 +473,16 @@ public class ScanLevelProjection {
 
     for (int i = 0; i < outputSchema.size(); i++) {
       ColumnMetadata col = outputSchema.metadata(i);
-      RequestedColumn projCol = new RequestedColumnImpl(readerProjection, col.name(),
-          ProjectionType.typeFor(col.majorType()));
-      outputCols.add(new UnresolvedSchemaColumn(projCol, col));
+
+      // Skip columns tagged as "special"; those that should not expand
+      // automatically.
+
+      if (col.booleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD)) {
+        continue;
+      }
+      outputCols.add(new UnresolvedColumn(null, col));
     }
+    return true;
   }
 
   /**
@@ -524,7 +526,15 @@ public class ScanLevelProjection {
 
     // This is a desired table column.
 
-    addTableColumn(new UnresolvedColumn(inCol));
+    addTableColumn(inCol);
+  }
+
+  private void addTableColumn(RequestedColumn inCol) {
+    ColumnMetadata outputCol = null;
+    if (outputSchema != null) {
+      outputCol = outputSchema.metadata(inCol.name());
+    }
+    addTableColumn(new UnresolvedColumn(inCol, outputCol));
   }
 
   public void addTableColumn(ColumnProjection outCol) {
@@ -598,11 +608,16 @@ public class ScanLevelProjection {
    * the wildcard)
    */
 
-  public boolean projectNone() { return projectionType == ScanProjectionType.EMPTY; }
+  public boolean isEmptyProjection() { return projectionType == ScanProjectionType.EMPTY; }
 
   public RequestedTuple rootProjection() { return outputProjection; }
 
-  public RequestedTuple readerProjection() { return readerProjection; }
+  public ProjectionSetBuilder projectionSet() {
+    return new ProjectionSetBuilder()
+      .outputSchema(outputSchema)
+      .parsedProjection(readerProjection)
+      .errorContext(errorContext);
+  }
 
   public boolean hasOutputSchema() { return outputSchema != null; }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index 37f7c75..12469ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -18,9 +18,7 @@
 package org.apache.drill.exec.physical.impl.scan.project;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
@@ -28,9 +26,8 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.scan.project.ReaderLevelProjection.ReaderProjectionResolver;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.TypeConverter;
 import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
-import org.apache.drill.exec.physical.rowSet.impl.SchemaTransformer;
-import org.apache.drill.exec.physical.rowSet.impl.SchemaTransformerImpl;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
@@ -166,14 +163,12 @@ public class ScanSchemaOrchestrator {
     private boolean useSchemaSmoothing;
     private boolean allowRequiredNullColumns;
     private List<SchemaPath> projection;
-    private TupleMetadata outputSchema;
-    private SchemaTransformer schemaTransformer;
-    private Map<String, String> conversionProps;
+    private TypeConverter.Builder typeConverterBuilder = TypeConverter.builder();
 
     /**
      * Context for error messages.
      */
-    private CustomErrorContext context;
+    private CustomErrorContext errorContext;
 
     /**
      * Specify an optional metadata manager. Metadata is a set of constant
@@ -247,30 +242,16 @@ public class ScanSchemaOrchestrator {
       this.projection = projection;
     }
 
-    public void setOutputSchema(TupleMetadata schema) {
-      outputSchema = schema;
-    }
-
-    public void setSchemaTransformer(SchemaTransformer transformer) {
-      this.schemaTransformer = transformer;
-    }
-
-    public void setConversionProperty(String key, String value) {
-      if (key == null || value == null) {
-        return;
-      }
-      if (conversionProps == null) {
-        conversionProps = new HashMap<>();
-      }
-      conversionProps.put(key, value);
+    public TypeConverter.Builder typeConverterBuilder() {
+      return typeConverterBuilder;
     }
 
     public void setContext(CustomErrorContext context) {
-      this.context = context;
+      this.errorContext = context;
     }
 
     public CustomErrorContext errorContext() {
-      return context;
+      return errorContext;
     }
   }
 
@@ -298,7 +279,7 @@ public class ScanSchemaOrchestrator {
     public final List<SchemaPath> projection;
     public final boolean useSchemaSmoothing;
     public final boolean allowRequiredNullColumns;
-    public final SchemaTransformer schemaTransformer;
+    public final TypeConverter typeConverter;
 
     /**
      * Context for error messages.
@@ -313,22 +294,15 @@ public class ScanSchemaOrchestrator {
       schemaResolvers = builder.schemaResolvers;
       projection = builder.projection;
       useSchemaSmoothing = builder.useSchemaSmoothing;
-      context = builder.context;
-      boolean allowRequiredNulls = builder.allowRequiredNullColumns;
-      if (builder.schemaTransformer != null) {
-        // Use client-provided conversions
-        schemaTransformer = builder.schemaTransformer;
-      } else if (builder.outputSchema != null) {
-        // Use only implicit conversions
-        schemaTransformer = new SchemaTransformerImpl(
-            builder.outputSchema, builder.conversionProps);
-        if (builder.outputSchema.booleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)) {
-          allowRequiredNulls = true;
-        }
-      } else {
-        schemaTransformer = null;
-      }
-      allowRequiredNullColumns = allowRequiredNulls;
+      context = builder.errorContext;
+      typeConverter = builder.typeConverterBuilder
+        .errorContext(builder.errorContext)
+        .build();
+      allowRequiredNullColumns = builder.allowRequiredNullColumns;
+    }
+
+    protected TupleMetadata outputSchema() {
+      return typeConverter == null ? null : typeConverter.providedSchema();
     }
   }
 
@@ -393,14 +367,10 @@ public class ScanSchemaOrchestrator {
 
     // Parse the projection list.
 
-    TupleMetadata outputSchema = null;
-    if (options.schemaTransformer != null) {
-      outputSchema = options.schemaTransformer.outputSchema();
-    }
     scanProj = ScanLevelProjection.builder()
         .projection(options.projection)
         .parsers(options.parsers)
-        .outputSchema(outputSchema)
+        .outputSchema(options.outputSchema())
         .context(builder.errorContext())
         .build();
     if (scanProj.projectAll() && options.useSchemaSmoothing) {
@@ -421,7 +391,7 @@ public class ScanSchemaOrchestrator {
   }
 
   public boolean isProjectNone() {
-    return scanProj.projectNone();
+    return scanProj.isEmptyProjection();
   }
 
   public boolean hasSchema() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java
index ea5c750..a2dceb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.project;
 
 import java.util.List;
 
-import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedSchemaColumn;
+import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -30,9 +30,8 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
  * appears, it is projected into the output schema. If not found,
  * then a null column (as defined by the output schema) is projected.
  * <p>
- * If the schema is strict, then we stop here. If not strict, then
- * any unmatched reader schema columns are appended to the output
- * tuple.
+ * Note that we don't go down this path for strict schema: in that
+ * case we expanded the columns at the scan level.
  */
 
 public class WildcardSchemaProjection extends ReaderLevelProjection {
@@ -48,25 +47,25 @@ public class WildcardSchemaProjection extends ReaderLevelProjection {
 
     boolean readerProjectionMap[] = new boolean[readerSchema.size()];
     for (ColumnProjection col : scanProj.columns()) {
-      if (col instanceof UnresolvedSchemaColumn) {
+      if (col instanceof UnresolvedColumn) {
 
         // Look for a match in the reader schema
 
-        ColumnMetadata readerCol = readerSchema.metadata(col.name());
-        UnresolvedSchemaColumn schemaCol = (UnresolvedSchemaColumn) col;
-        if (readerCol == null) {
-
-          // No match, project a null column
-
-          rootTuple.add(rootTuple.nullBuilder.add(schemaCol.metadata()));
-        } else {
+        UnresolvedColumn tableCol = (UnresolvedColumn) col;
+        ColumnMetadata readerCol = readerSchema.metadata(tableCol.name());
+        if (readerCol != null) {
 
           // Is a match, project this reader column
 
           int index = readerSchema.index(col.name());
           readerProjectionMap[index] = true;
           rootTuple.add(
-              new ResolvedTableColumn(schemaCol.metadata(), rootTuple, index));
+              new ResolvedTableColumn(tableCol.metadata(), rootTuple, index));
+        } else {
+
+          // No match, project a null column
+
+          rootTuple.add(rootTuple.nullBuilder.add(tableCol.metadata()));
         }
       } else {
 
@@ -84,9 +83,11 @@ public class WildcardSchemaProjection extends ReaderLevelProjection {
           continue;
         }
         ColumnMetadata readerCol = readerSchema.metadata(i);
-        rootTuple.add(
-            new ResolvedTableColumn(readerCol.name(),
-                readerCol.schema(), rootTuple, i));
+        if (! readerCol.booleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD)) {
+          rootTuple.add(
+              new ResolvedTableColumn(readerCol.name(),
+                  readerCol.schema(), rootTuple, i));
+        }
       }
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
index 155fcf8..0181e42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
@@ -21,10 +21,26 @@
  * variety of special columns. Requested columns can exist in the table,
  * or may be "missing" with null values applied. The code here prepares
  * a run-time projection plan based on the actual table schema.
+ *
+ * <h4>Overview</h4>
+ *
+ * The projection framework look at schema as a set of transforms:
  * <p>
- * Looks at schema as a set of transforms.
  * <ul>
- * <li>Scan-level projection list from the query plan: The list of columns
+ * <li>Scan level: physical plan projection list and optional provided
+ * schema information.</li>
+ * <li>File level: materializes implicit file and parition columns.</li>
+ * <li>Reader level: integrates the actual schema discovered by the
+ * reader with the scan-level projection list.</li>
+ * </ul>
+ * <p>
+ * Projection turns out to be a very complex operation in a schema-on-read
+ * system such as Drill. Provided schema helps resolve ambiguities inherent
+ * in schema-on-read, but at the cost of some additional complexity.
+ *
+ * <h4>Background</h4>
+ *
+ * The Scan-level projection holds the list of columns
  * (or the wildcard) as requested by the user in the query. The planner
  * determines which columns to project. In Drill, projection is speculative:
  * it is a list of names which the planner hopes will appear in the data
@@ -32,35 +48,89 @@
  * it turns out that no such column exists. Else, the reader must figure out
  * the data type for any columns that does exist.
  * <p>
+ * With the advent of provided schema in Drill 1.16, the scan level projection
+ * integrates that schema information with the projection list provided in
+ * the physical operator. If a schema is provided, then each scan-level
+ * column tracks the schema information for that column.
+ * <p>
+ * The scan-level projection also
+ * implements the special rules for a "strict" provided schema: if the operator
+ * projection list contains a wildcard, a schema is provided, and the schema
+ * is strict, then the scan level projection expands the wildcard into the
+ * set of columns in the provided schema. Doing so ensures that the scan
+ * output contains exactly those columns from the schema, even if the columns
+ * must be null or at a default value. (The result set loader does additional
+ * filtering as well.)
+ * <p>
  * The scan project list defines the set of columns which the scan operator
  * is obliged to send downstream. Ideally, the scan operator sends exactly the
  * same schema (the project list with types filled in) for all batches. Since
  * batches may come from different files, the scan operator is obligated to
- * unify the schemas from those files (or blocks.)</ul>
- * <li>Reader (file)-level projection occurs for each reader. A single scan
- * may use multiple readers to read data. Each reader may offer more information
- * about the schema. For example, a Parquet reader can obtain schema information
+ * unify the schemas from those files (or blocks.)
+ * <p>
+ * Reader (file)-level projection occurs for each reader. A single scan
+ * may use multiple readers to read data. From the reader's perspective, it
+ * offers the schema it discovers in the file. The reader itself is rather
+ * inflexible: it must deal with the data it finds, of the type found in
+ * the data source.
+ * <p>
+ * The reader thus tells the result set loader that it has such-and-so schema.
+ * It does that either at open time (so-called "early" schema, such as for
+ * CSV, JDBC or Parquet) or as it discovers the columns (so-called "late"
+ * schema as in JSON.) Again, in each case, the data source schema is what
+ * it is; it can't be changed due to the wishes of the scan-level projection.
+ * <p>
+ * Readers obtain column schema from the file or data source. For example,
+ * a Parquet reader can obtain schema information
  * from the Parquet headers. A JDBC reader obtains schema information from the
- * returned schema. This is called "early schema." File-based readers can at least
- * add implicit file or partition columns.
+ * returned schema. As noted above, we use the term "early schema" when type
+ * information is available at open time, before reading the first row of data.
+ * <p>
+ * By contrast eaders such as JSON and CSV are "late schema": they don't know the data
+ * schema until they read the file. This is true "schema on read." Further, for
+ * JSON, the data may change from one batch to the next as the reader "discovers"
+ * fields that did not appear in earlier batches. This requires some amount of
+ * "schema smoothing": the ability to preserve a consistent output schema even
+ * as the input schema jiggles around some.
+ * <p>
+ * Drill supports many kinds of data sources via plugins. The DFS plugin works
+ * with files in a distributed store such as HDFS. Such file-based readers
+ * add implicit file or partition columns. Since these columns are generic to
+ * all format plugins, they are factored out into a file scan framework which
+ * inserts the "implicit" columns separate from the reader-provided columns.
+ *
+ * <h4>Design</h4>
+ *
+ * This leads to a multi-stage merge operation. The result set loader is
+ * presented with each column one-by-one (either at open time or during read.)
+ * When a column is presented, the projection framework makes a number of
+ * decisions:
+ * <p>
+ * <ul>
+ * <li>Is the column projected? For example, if a query is <tt>SELECT a, b, c</tt>
+ * and the reader offers column <tt>d</tt>, then column d will not be projected.
+ * In the wildcard case, "special" columns will be omitted from the column
+ * expansion and will be unprojected.</li>
+ * <li>Is type conversion needed? If a schema is provided, and the type of the
+ * column requested in the provided schema differs from that offered by the
+ * reader, the framework can insert a type-conversion "shim", assuming that
+ * the framework knows how to do the conversion. Else, and error is raised.</li>
+ * <li>Is the column type and mode consistent with the projection list?
+ * Suppose the query is <tt>SELECT a, b[10], c.d</tt>. Column `a` matches
+ * any reader column. But, column `b` is valid only for an array (not a map
+ * and not a scalar.) Column `c` must be a map (or array of maps.) And so on.</li>
+ * </ul>
  * <p>
  * The result is a refined schema: the scan level schema with more information
  * filled in. For Parquet, all projection information can be filled in. For
  * CSV or JSON, we can only add file metadata information, but not yet the
- * actual data schema.</ul>
- * <li>Batch-level schema: once a reader reads actual data, it now knows
+ * actual data schema.
+ * <p>
+ * Batch-level schema: once a reader reads actual data, it now knows
  * exactly what it read. This is the "schema on read model." Thus, after reading
  * a batch, any remaining uncertainty about the projected schema is removed.
  * The actual data defined data types and so on.
  * <p>
- * Readers such as JSON and CSV are "late schema": they don't know the data
- * schema until they read the file. This is true "schema on read." Further, for
- * JSON, the data may change from one batch to the next as the reader "discovers"
- * fields that did not appear in earlier batches. This requires some amount of
- * "schema smoothing": the ability to preserve a consistent output schema even
- * as the input schema jiggles around some.</ul>
- * </ul>
- * <p>
  * The goal of this mechanism is to handle the above use cases cleanly, in a
  * common set of classes, and to avoid the need for each reader to figure out
  * all these issues for themselves (as was the case with earlier versions of
@@ -71,9 +141,72 @@
  * distinct class. Classes combine via composition to create a "framework"
  * suitable for each kind of reader: whether it be early or late schema,
  * file-based or something else, etc.
+ *
+ * <h4>Nuances of Reader-Level Projection</h4>
+ *
+ * We've said that the scan-level projection identifies what the query
+ * <i>wants</i>. We've said that the reader identifies what the external
+ * data actually <i>is</i>. We've mentioned how we bridge between the
+ * two. Here we explore this in more detail.
+ * <p>
+ * Run-time schema resolution occurs at various stages:
  * <p>
+ * <ul>
+ * <li>The per-column resolution identified earlier: matching types,
+ * type conversion, and so on.</li>
+ * <li>The reader provides some set of columns. We don't know which
+ * columns until the end of the first (or more generally, every) batch.
+ * Suppose the query wants <tt>SELECT a, b, c</tt> but the reader turns
+ * out to provide only `a` and `b`. On after the first batch do we
+ * realize that we need column `c` as a "null" column (of a type defined
+ * in the provided schema, specified by the plugin, or good-old nullable
+ * INT.)</li>
+ * <li>The result set loader will have created "dummy" columns for
+ * unprojected columns. The reader can still write to such columns
+ * (because they represent data in the file), but the associated column
+ * writer simply ignores the data. As a result, the result set loader
+ * should produce only a (possibly full) subset of projected columns.</li>
+ * <li>After each reader batch, the projection framework goes to work
+ * filling in implicit columns, and filling in missing columns. It is
+ * important to remember that this pass *must* be done *after* a batch
+ * is read since we don't now the columns that the reader can provided
+ * until after a batch is read.</li>
+ * <li>Some readers, such as JSON, can "change its mind" about the
+ * schema across batches. For example, the first batch may include
+ * only columns a and b. Later in the JSON file, the reader may
+ * discover column c. This means that the above post-batch analysis
+ * must be repeated each time the reader changes the schema. (The result
+ * set loader tracks schema changes for this purpose.)</li>
+ * <li>File schemas evolve. The same changes noted above can occur
+ * cross files. Maybe file 1 has column `x` as a BIGINT, while file 2
+ * has column 'x' as INT. A "smoothing" step attempts to avoid hard
+ * schema changes if they can be avoided. While smoothing is a clever
+ * idea, it only handles some cases. Provided schema is a more reliable
+ * solution (but is not yet widely deployed.)</li>
+ * </ul>
+ *
+ * <h4>Reader-Level Projection Set</h4>
+ *
+ * The Projection Set mechanism is designed to handle the increasing nuances
+ * of Drill run-time projection by providing a source of information about
+ * each column that the reader may discover:
+ * <ul>
+ * <li>Is the column projected?</li><ul>
+ *   <li>If the query is explicit (<tt>SELECT a, b, c</tt>), is the column
+ *   in the projection list?</li>
+ *   <li>If the query is a wildcard (<tt>SELECT *</tt>), is the column
+ *   marked as special (not included in the wildcard)?</li>
+ *   <li>If the query is wildcard, and a strict schema is provided, is
+ *   the column part of the provided schema?</li></ul></li>
+ * <li>Verify column is consistent with projection.</li>
+ * <li>Type conversion, if needed.</li>
+ * </ul>
+ *
+ * <h4>Projection Via Rewrites</h4>
+ *
  * The core concept is one of successive refinement of the project
  * list through a set of rewrites:
+ * <p>
  * <ul>
  * <li>Scan-level rewrite: convert {@link SchemaPath} entries into
  * internal column nodes, tagging the nodes with the column type:
@@ -103,9 +236,9 @@
  *                       |
  *                       v
  *                +------------+
- *                | Scan Level |
- *                | Projection | -----------+
- *                +------------+            |
+ *                | Scan Level |     +----------------+
+ *                | Projection | --->| Projection Set |
+ *                +------------+     +----------------+
  *                       |                  |
  *                       v                  v
  *  +------+      +------------+     +------------+      +-----------+
@@ -114,8 +247,8 @@
  *  +------+      +------------+     +------------+      +-----------+
  *                       |                  |
  *                       v                  |
- *               +--------------+   Table   |
- *               | Schema Level |   Schema  |
+ *               +--------------+   Reader  |
+ *               | Reader Level |   Schema  |
  *               |  Projection  | <---------+
  *               +--------------+           |
  *                       |                  |
@@ -129,6 +262,9 @@
  *                 Output Batch
  * </pre>
  * <p>
+ * The left side can be thought of as the "what we want" description of the
+ * schema, with the right side being "what the reader actually discovered."
+ * <p>
  * The output mapper includes mechanisms to populate implicit columns, create
  * null columns, and to merge implicit, null and data columns, omitting
  * unprojected data columns.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java
new file mode 100644
index 0000000..c6775d0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+
+/**
+ * Base class for projection set implementations. Handles an optional
+ * type conversion based on a provided schema, custom conversion, or both.
+ */
+
+public abstract class AbstractProjectionSet implements ProjectionSet {
+  protected final TypeConverter typeConverter;
+  protected final TupleMetadata providedSchema;
+  protected final boolean isStrict;
+  protected CustomErrorContext errorContext;
+
+  public AbstractProjectionSet(TypeConverter typeConverter) {
+    this.typeConverter = typeConverter;
+    providedSchema = typeConverter == null ? null :
+        typeConverter.providedSchema();
+    isStrict = providedSchema != null &&
+        typeConverter.providedSchema().booleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP);
+  }
+
+  public AbstractProjectionSet(TypeConverter typeConverter, boolean isStrict) {
+    this.typeConverter = typeConverter;
+    providedSchema = typeConverter == null ? null :
+        typeConverter.providedSchema();
+    this.isStrict = isStrict;
+  }
+
+  public AbstractProjectionSet() {
+    this(null);
+  }
+
+  @Override
+  public void setErrorContext(CustomErrorContext errorContext) {
+    this.errorContext = errorContext;
+  }
+
+  protected static boolean isSpecial(ColumnMetadata col) {
+    return col.booleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD);
+  }
+
+  protected ColumnMetadata outputSchema(ColumnMetadata col) {
+    return providedSchema == null ? null :
+      providedSchema.metadata(col.name());
+  }
+
+  protected ColumnConversionFactory conversion(ColumnMetadata inputSchema, ColumnMetadata outputCol) {
+    return typeConverter == null ? null :
+      typeConverter.conversionFactory(inputSchema, outputCol);
+  }
+
+  protected TypeConverter childConverter(ColumnMetadata outputSchema) {
+    TupleMetadata childSchema = outputSchema == null ? null : outputSchema.mapSchema();
+    return typeConverter == null ? null :
+      typeConverter.childConverter(childSchema);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractReadColProj.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractReadColProj.java
new file mode 100644
index 0000000..3b7158b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractReadColProj.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet.ColumnReadProjection;
+import org.apache.drill.exec.physical.rowSet.project.ProjectionType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+
+public abstract class AbstractReadColProj implements ColumnReadProjection {
+  protected final ColumnMetadata readSchema;
+
+  public AbstractReadColProj(ColumnMetadata readSchema) {
+    this.readSchema = readSchema;
+  }
+
+  @Override
+  public ColumnMetadata readSchema() { return readSchema; }
+
+  @Override
+  public boolean isProjected() { return true; }
+
+  @Override
+  public ColumnConversionFactory conversionFactory() { return null; }
+
+  @Override
+  public ColumnMetadata providedSchema() { return readSchema; }
+
+  @Override
+  public ProjectionSet mapProjection() { return ProjectionSetFactory.projectAll(); }
+
+  @Override
+  public ProjectionType projectionType() { return null; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
new file mode 100644
index 0000000..af7b9e1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+
+/**
+ * Handles simple cases in which either all columns are projected
+ * or no columns are projected.
+ */
+
+public class EmptyProjectionSet implements ProjectionSet {
+
+  public static final ProjectionSet PROJECT_NONE = new EmptyProjectionSet();
+
+  @Override
+  public ColumnReadProjection readProjection(ColumnMetadata col) {
+    return new UnprojectedReadColumn(col);
+  }
+
+  @Override
+  public void setErrorContext(CustomErrorContext errorContext) { }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
new file mode 100644
index 0000000..60c88f3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.physical.rowSet.project.ProjectionType;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.TupleProjectionType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+
+/**
+ * Projection set based on an explicit set of columns provided
+ * in the physical plan. Columns in the list are projected, others
+ * are not.
+ */
+
+public class ExplicitProjectionSet extends AbstractProjectionSet {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplicitProjectionSet.class);
+
+  private final RequestedTuple requestedProj;
+
+  public ExplicitProjectionSet(RequestedTuple requestedProj, TypeConverter typeConverter) {
+    super(typeConverter);
+    this.requestedProj = requestedProj;
+  }
+
+  @Override
+  public ColumnReadProjection readProjection(ColumnMetadata col) {
+    RequestedColumn reqCol = requestedProj.get(col.name());
+    if (reqCol == null) {
+      return new UnprojectedReadColumn(col);
+    }
+    ColumnMetadata outputSchema = outputSchema(col);
+    validateProjection(reqCol, outputSchema == null ? col : outputSchema);
+    if (!col.isMap()) {
+
+      // Non-map column.
+
+      ColumnConversionFactory conv = conversion(col, outputSchema);
+      return new ProjectedReadColumn(col, reqCol, outputSchema, conv);
+    }
+    else {
+
+      // Maps are tuples. Create a tuple projection and wrap it in
+      // a column projection.
+
+      TypeConverter childConverter = childConverter(outputSchema);
+      ProjectionSet mapProjection;
+      if (! reqCol.type().isTuple() || reqCol.mapProjection().type() == TupleProjectionType.ALL) {
+
+        // Projection is simple: "m". This is equivalent to
+        // (non-SQL) m.*
+        // This may also be a projection of the form m.a, m. The
+        // general projection takes precedence.
+
+        mapProjection =  new WildcardProjectionSet(childConverter, isStrict);
+      } else {
+
+        // Else, selected map items are projected, say m.a, m.c.
+        // (Here, we'll never hit the case where none of the map is
+        // projected; that case, while allowed in the RequestedTuple
+        // implementation, can never occur in a SELECT list.)
+
+        mapProjection = new ExplicitProjectionSet(reqCol.mapProjection(), childConverter);
+      }
+      return new ProjectedMapColumn(col, reqCol, outputSchema, mapProjection);
+    }
+  }
+
+  public void validateProjection(RequestedColumn colReq, ColumnMetadata readCol) {
+    if (colReq == null || readCol == null) {
+      return;
+    }
+    ProjectionType type = colReq.type();
+    if (type == null) {
+      return;
+    }
+    ProjectionType neededType = ProjectionType.typeFor(readCol.majorType());
+    if (type.isCompatible(neededType)) {
+      return;
+    }
+    throw UserException.validationError()
+      .message("Column type not compatible with projection specification")
+      .addContext("Column:", readCol.name())
+      .addContext("Projection type:", type.label())
+      .addContext("Column type:", Types.getSqlTypeName(readCol.majorType()))
+      .addContext(errorContext)
+      .build(logger);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java
new file mode 100644
index 0000000..26b3742
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedMapColumn.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+
+public class ProjectedMapColumn extends ProjectedReadColumn {
+
+  private final ProjectionSet mapProjection;
+
+  public ProjectedMapColumn(ColumnMetadata readSchema,
+      RequestedColumn requestedCol, ColumnMetadata outputSchema,
+      ProjectionSet mapProjection) {
+    super(readSchema, requestedCol, outputSchema, null);
+    this.mapProjection = mapProjection;
+  }
+
+  @Override
+  public ProjectionSet mapProjection() {
+    return mapProjection;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedReadColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedReadColumn.java
new file mode 100644
index 0000000..1e866de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedReadColumn.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.physical.rowSet.project.ProjectionType;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+
+/**
+ * Projected column. Includes at least the reader schema. May also
+ * include projection specification, and output schema and a type
+ * conversion.
+ */
+
+public class ProjectedReadColumn extends AbstractReadColProj {
+  private final RequestedColumn requestedCol;
+  private final ColumnMetadata outputSchema;
+  private final ColumnConversionFactory conversionFactory;
+
+  public ProjectedReadColumn(ColumnMetadata readSchema) {
+    this(readSchema, null, null, null);
+  }
+
+  public ProjectedReadColumn(ColumnMetadata readSchema,
+      RequestedColumn requestedCol) {
+    this(readSchema, requestedCol, null, null);
+  }
+
+  public ProjectedReadColumn(ColumnMetadata readSchema,
+      ColumnMetadata outputSchema, ColumnConversionFactory conversionFactory) {
+    this(readSchema, null, outputSchema, null);
+  }
+
+  public ProjectedReadColumn(ColumnMetadata readSchema,
+      RequestedColumn requestedCol, ColumnMetadata outputSchema,
+      ColumnConversionFactory conversionFactory) {
+    super(readSchema);
+    this.requestedCol = requestedCol;
+    this.outputSchema = outputSchema;
+    this.conversionFactory = conversionFactory;
+  }
+
+  @Override
+  public ColumnMetadata providedSchema() {
+    return outputSchema == null ? readSchema : outputSchema;
+  }
+
+  @Override
+  public ProjectionSet mapProjection() {
+    // Should never occur: maps should use the map class.
+    return null;
+  }
+
+  @Override
+  public ProjectionType projectionType() {
+    return requestedCol == null ? null : requestedCol.type();
+  }
+
+  @Override
+  public ColumnConversionFactory conversionFactory() { return conversionFactory; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetBuilder.java
new file mode 100644
index 0000000..56034c7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetBuilder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import java.util.Collection;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.TupleProjectionType;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+public class ProjectionSetBuilder {
+
+  private RequestedTuple parsedProjection;
+  private TypeConverter typeConverter;
+  private CustomErrorContext errorContext;
+
+  /**
+   * Record (batch) readers often read a subset of available table columns,
+   * but want to use a writer schema that includes all columns for ease of
+   * writing. (For example, a CSV reader must read all columns, even if the user
+   * wants a subset. The unwanted columns are simply discarded.)
+   * <p>
+   * This option provides a projection list, in the form of column names, for
+   * those columns which are to be projected. Only those columns will be
+   * backed by value vectors; non-projected columns will be backed by "null"
+   * writers that discard all values.
+   *
+   * @param projection the list of projected columns
+   * @return this builder
+   */
+
+  public ProjectionSetBuilder projectionList(Collection<SchemaPath> projection) {
+    if (projection == null) {
+      parsedProjection = null;
+    } else {
+      parsedProjection = RequestedTupleImpl.parse(projection);
+    }
+    return this;
+  }
+
+  public ProjectionSetBuilder parsedProjection(RequestedTuple projection) {
+    parsedProjection = projection;
+    return this;
+  }
+
+  public ProjectionSetBuilder outputSchema(TupleMetadata schema) {
+    typeConverter = TypeConverter.builder().providedSchema(schema).build();
+    return this;
+  }
+
+  public ProjectionSetBuilder typeConverter(TypeConverter converter) {
+    this.typeConverter = converter;
+    return this;
+  }
+
+  public ProjectionSetBuilder errorContext(CustomErrorContext errorContext) {
+    this.errorContext = errorContext;
+    return this;
+  }
+
+  public ProjectionSet build() {
+    TupleProjectionType projType = parsedProjection == null ?
+        TupleProjectionType.ALL : parsedProjection.type();
+
+    ProjectionSet projSet;
+    switch (projType) {
+    case ALL:
+      projSet = new WildcardProjectionSet(typeConverter);
+      break;
+    case NONE:
+      projSet = ProjectionSetFactory.projectNone();
+      break;
+    case SOME:
+      projSet = new ExplicitProjectionSet(parsedProjection, typeConverter);
+      break;
+    default:
+      throw new IllegalStateException("Unexpected projection type: " + projType.toString());
+    }
+    projSet.setErrorContext(errorContext);
+    return projSet;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetFactory.java
new file mode 100644
index 0000000..7bc711a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectionSetFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.TypeConverter.CustomTypeTransform;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionDefn;
+
+public class ProjectionSetFactory {
+
+  private static class SimpleTransform implements CustomTypeTransform {
+
+    private final ColumnConversionFactory colFactory;
+
+    public SimpleTransform(ColumnConversionFactory colFactory) {
+      this.colFactory = colFactory;
+    }
+
+    @Override
+    public ColumnConversionFactory transform(ColumnMetadata inputDefn,
+        Map<String, String> properties,
+        ColumnMetadata outputDefn, ConversionDefn defn) {
+      return colFactory;
+    }
+  }
+
+  public static ProjectionSet projectAll() { return new WildcardProjectionSet(null); }
+
+  public static ProjectionSet projectNone() { return EmptyProjectionSet.PROJECT_NONE; }
+
+
+  public static ProjectionSet wrap(RequestedTuple mapProjection) {
+    switch (mapProjection.type()) {
+    case ALL:
+      return projectAll();
+    case NONE:
+      return projectNone();
+    case SOME:
+      return new ExplicitProjectionSet(mapProjection, null);
+    default:
+      throw new IllegalStateException("Unexpected projection type: " +
+            mapProjection.type().toString());
+    }
+  }
+
+  public static ProjectionSet build(List<SchemaPath> selection) {
+    if (selection == null) {
+      return projectAll();
+    }
+    return wrap(RequestedTupleImpl.parse(selection));
+  }
+
+  public static CustomTypeTransform simpleTransform(ColumnConversionFactory colFactory) {
+    return new SimpleTransform(colFactory);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TypeConverter.java
new file mode 100644
index 0000000..064b678
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TypeConverter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionDefn;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionType;
+
+public class TypeConverter {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(TypeConverter.class);
+
+  public static interface CustomTypeTransform {
+    ColumnConversionFactory transform(ColumnMetadata inputDefn,
+        Map<String, String> properties,
+        ColumnMetadata outputDefn, ConversionDefn defn);
+  }
+
+  private static class NullTypeTransform implements CustomTypeTransform {
+    @Override
+    public ColumnConversionFactory transform(ColumnMetadata inputDefn,
+        Map<String, String> properties,
+        ColumnMetadata outputDefn, ConversionDefn defn) {
+      return null;
+    }
+  }
+
+  public static class Builder {
+    private TupleMetadata providedSchema;
+    private CustomTypeTransform transform;
+    private Map<String, String> properties;
+    private CustomErrorContext errorContext;
+
+    public Builder providedSchema(TupleMetadata schema) {
+      providedSchema = schema;
+      return this;
+    }
+
+    public Builder transform(TypeConverter.CustomTypeTransform transform) {
+      this.transform = transform;
+      return this;
+    }
+
+    public Builder properties(Map<String, String> properties) {
+      this.properties = properties;
+      return this;
+    }
+
+    public Builder setConversionProperty(String key, String value) {
+      if (key == null || value == null) {
+        return this;
+      }
+      if (properties == null) {
+        properties = new HashMap<>();
+      }
+      properties.put(key, value);
+      return this;
+    }
+
+    public Builder errorContext(CustomErrorContext errorContext) {
+      this.errorContext = errorContext;
+      return this;
+    }
+
+    public TypeConverter build() {
+      return new TypeConverter(this);
+    }
+  }
+
+  private final TupleMetadata providedSchema;
+  private final CustomTypeTransform customTransform;
+  private final Map<String, String> properties;
+  private final CustomErrorContext errorContext;
+
+  public static Builder builder() { return new Builder(); }
+
+  public TypeConverter(Builder builder) {
+    this.providedSchema = builder.providedSchema;
+    this.customTransform = builder.transform == null ?
+        new NullTypeTransform() : builder.transform;
+    this.properties = builder.properties;
+    this.errorContext = builder.errorContext;
+  }
+
+  public TypeConverter(TypeConverter parent,
+      TupleMetadata childSchema) {
+    this.providedSchema = childSchema;
+    this.customTransform = parent.customTransform;
+    this.properties = parent.properties;
+    this.errorContext = parent.errorContext;
+  }
+
+  public TupleMetadata providedSchema() { return providedSchema; }
+
+  public ColumnConversionFactory conversionFactory(ColumnMetadata inputSchema,
+      ColumnMetadata outputCol) {
+    if (outputCol == null) {
+      return customConversion(inputSchema);
+    } else {
+      return schemaBasedConversion(inputSchema, outputCol);
+    }
+  }
+
+  private ColumnConversionFactory customConversion(ColumnMetadata inputSchema) {
+    return customTransform.transform(inputSchema, properties, null, null);
+  }
+
+  public ColumnConversionFactory schemaBasedConversion(ColumnMetadata inputSchema,
+      ColumnMetadata outputCol) {
+
+    // Custom transforms take priority. Allows replacing the standard
+    // conversions. Also allows conversions between the same type, such
+    // as rescaling units.
+
+    ConversionDefn defn = StandardConversions.analyze(inputSchema, outputCol);
+    ColumnConversionFactory factory = customTransform.transform(inputSchema, properties, outputCol, defn);
+    if (factory != null) {
+      return factory;
+    }
+
+    // Some conversions are automatic.
+
+    if (defn.type != ConversionType.EXPLICIT) {
+      return null;
+    }
+
+    // If an explicit conversion is needed, but no standard conversion
+    // is available, we have no way to do the conversion.
+
+    if (defn.conversionClass == null) {
+      throw UserException.validationError()
+        .message("Runtime type conversion not available")
+        .addContext("Input type", inputSchema.typeString())
+        .addContext("Output type", outputCol.typeString())
+        .addContext(errorContext)
+        .build(logger);
+    }
+
+    // Return a factory for the conversion.
+
+    return StandardConversions.factory(defn.conversionClass, properties);
+  }
+
+  public TypeConverter childConverter(TupleMetadata childSchema) {
+    if (childSchema == null && providedSchema == null) {
+      return this;
+    }
+    return new TypeConverter(this, childSchema);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/UnprojectedReadColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/UnprojectedReadColumn.java
new file mode 100644
index 0000000..18332c6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/UnprojectedReadColumn.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+
+/**
+ * Unprojected column. No validation needed. No type conversion.
+ * Reader column just "free wheels", without a materialized vector,
+ * accepting any data the reader cares to throw at it, then simply
+ * discarding that data.
+ */
+
+public class UnprojectedReadColumn extends AbstractReadColProj {
+
+  public UnprojectedReadColumn(ColumnMetadata readSchema) {
+    super(readSchema);
+  }
+
+  @Override
+  public boolean isProjected() { return false; }
+
+  @Override
+  public ProjectionSet mapProjection() { return ProjectionSetFactory.projectNone(); }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
new file mode 100644
index 0000000..dc4858b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+
+public class WildcardProjectionSet extends AbstractProjectionSet {
+
+  public WildcardProjectionSet(TypeConverter typeConverter) {
+    super(typeConverter);
+  }
+
+  public WildcardProjectionSet(TypeConverter typeConverter, boolean isStrict) {
+    super(typeConverter, isStrict);
+  }
+
+  @Override
+  public ColumnReadProjection readProjection(ColumnMetadata col) {
+    if (isSpecial(col)) {
+      return new UnprojectedReadColumn(col);
+    }
+    ColumnMetadata outputSchema = outputSchema(col);
+    if (outputSchema == null) {
+      if (isStrict) {
+        return new UnprojectedReadColumn(col);
+      }
+    } else if (isSpecial(outputSchema)) {
+      return new UnprojectedReadColumn(col);
+    }
+    if (col.isMap()) {
+      return new ProjectedMapColumn(col, null, outputSchema,
+          new WildcardProjectionSet(childConverter(outputSchema), isStrict));
+
+    } else {
+      ColumnConversionFactory conv = conversion(col, outputSchema);
+      return new ProjectedReadColumn(col, null, outputSchema, conv);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/package-info.java
new file mode 100644
index 0000000..55af6c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/package-info.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The dynamic projection in Drill is complex. With the advent of
+ * provided schema, we now have many ways to manage projection. The
+ * classes here implement these many policies. They are implemented
+ * as distinct classes (rather than chains of if-statements) to
+ * make the classes easier to test and reason about.
+ * <p>
+ * Projection is a combination of three distinct policies:
+ * <ul>
+ * <li>Projection policy (all, none, explicit, etc.)</li>
+ * <li>Column policy (unprojected, explicit projection,
+ * projection with schema, etc.)</li>
+ * <li>Type conversion: none, based on a provided schema,
+ * custom.</li>
+ * </ul>
+ * Experience has shown that these must be separated: each is designed
+ * and tested separately to keep the problem tractable.
+ *
+ * <h4>Projection Set Cases</h4>
+ *
+ * The project cases and their classes:
+ * <p>
+ * <dl>
+ * <dt>{@link EmptyProjectionSet}</dt>
+ * <dd><tt>SELECT COUNT(*)</tt>: Project nothing. Only count records.</dd>
+ * <dl>
+ * <dt>{@link WildcardProjectionSet}</dt>
+ * <dd><tt>SELECT *</tt>: Project everything, with an optional provided
+ * schema. If a schema is provided, and is strict, then project only
+ * reader columns that appear in the provided schema.
+ * However, don't project columns which have been marked as
+ * special: {@link ColumnMetadata#EXCLUDE_FROM_WILDCARD}, whether marked
+ * in the reader or provided schemas.</dd>
+ * <dt>{@link ExplicitProjectionSet}</dt>
+ * <dd><tt>SELECT a, b[10], c.d</tt>: Explicit projection with or without
+ * a schema. Project only the selected columns. Verify that the reader
+ * provides column types/modes consistent with the implied form in the
+ * projection list. That is, in this example, `b` must be an array.</dd>
+ * </dl>
+ *
+ * <h4>Column Projection Cases</h4>
+ *
+ * Each projection set answers a query: "the reader wants to add such-and-so
+ * column: what should I do?" Since the reader is free to add any column,
+ * we don't cache the list of columns as is done with the parsed project
+ * list, or the output schema. Instead, we handle each column on a
+ * case-by-case basis; we create a {@link ColumnReadProjection} instance
+ * to answer the query. Instances of this class are meant to be transient:
+ * use them and discard them. We answer the query differently depending on
+ * many factors, including:
+ * <p>
+ * <dl>
+ * <dt>{@link UnprojectedReadColumn}</dt>
+ * <dd>Column is not projected. Nothing to convert, no type checks
+ * needed. The result set loader should create a dummy writer for this
+ * case.</dd>
+ * <dt>{@link ProjectedReadColumn}</dt>
+ * <dd>Column is projected. It may have an associated projection list
+ * item, an output schema, or a type conversion. All these variations
+ * should be transparent to the consumer.</dd>
+ * </dl>
+ *
+ * <h4>Type Conversion</h4>
+ *
+ * The {@link TypeConverter} class handles a provided schema, custom type
+ * conversion, and custom properties passed to the conversion shims. A null
+ * type converter passed to a projection set means no conversion is done.
+ * (The mechanism creates a dummy projection in this case.)
+ *
+ * <h4>Construction</h4>
+ *
+ * Two classes build the above complex cases:
+ * <p>
+ * <dl>
+ * <dt>{@link ProjectionSetFactory}<dt>
+ * <dd>Builds simple projection sets that take few parameters.</dd>
+ * <dt>{@link ProjectionSetBuilder}</dt>
+ * <dd>Handles the complex cases.</dd>
+ */
+
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ProjectionSet.java
new file mode 100644
index 0000000..4562b0c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ProjectionSet.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.rowSet.project.ProjectionType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Provides a dynamic, run-time view of a projection set. Used by
+ * the result set loader to:
+ * <ul>
+ * <li>Determine if a column is projected according to some
+ * defined projection schema (see implementation for details.)</li>
+ * <li>Provide type conversions, either using built-in implicit
+ * conversions, or a custom conversion. Type conversions require
+ * the reader column and a "provided" column that gives the "to"
+ * type for the conversion. Without the "to" column, the reader
+ * column type is used as-is.</li>
+ * <li>Verify that the (possibly converted) type and mode are
+ * compatible with an explicit projection item. For example, if
+ * the query has `a.b`, but `a` is scalar, then there is an
+ * inconsistency.</li>
+ * </ul>
+ * <p>
+ * This interface filters columns added dynamically
+ * at scan time. The reader may offer a column (as to add a column
+ * writer for the column.) The projection mechanism says whether to
+ * materialize the column, or whether to ignore the column and
+ * return a dummy column writer.
+ * <p>
+ * The Project All must handle several additional nuances:
+ * <ul>
+ * <li>External schema: If an external schema is provided, then that
+ * schema may be "strict" which causes the wildcard to expand to the
+ * set of columns defined within the schema. When used with columns
+ * added dynamically, a column may be excluded from the projection
+ * set if it is not part of the defined external schema.</ul>
+ * <li>Metadata filtering: A reader may offer a special column which
+ * is available only in explicit projection, and behaves like Drill's
+ * implicit file columns. Such columns are not included in a "project
+ * all" projection.</li>
+ * <p>
+ * At present, only the top-level row supports these additional filtering
+ * options; they are not supported on maps (though could be with additional
+ * effort.)
+ * <p>
+ * Special columns are generic and thus handled here. External schema
+ * is handled in a subclass in the scan projection framework.
+ * <p>
+ */
+public interface ProjectionSet {
+
+  /**
+   * Response to a query against a reader projection to indicate projection
+   * status of a reader-provided column. This is a transient object which
+   * indicates whether a reader column is projected, and if so, the attributes
+   * of that projection.
+   */
+
+  public interface ColumnReadProjection {
+
+    /**
+     * Determine if the given column is to be projected. Used when
+     * adding columns to the result set loader. Skips columns omitted
+     * from an explicit projection, or columns within a wildcard projection
+     * where the column is "special" and is not expanded in the wildcard.
+     */
+
+    boolean isProjected();
+
+    ColumnMetadata readSchema();
+    ColumnMetadata providedSchema();
+    ColumnConversionFactory conversionFactory();
+    ProjectionSet mapProjection();
+
+    /**
+     * The projection type from the parse of the projection list,
+     * if available. Used for testing only. Don't use this in production
+     * code, let this class do the checks itself.
+     */
+    @VisibleForTesting
+    ProjectionType projectionType();
+  }
+
+  void setErrorContext(CustomErrorContext errorContext);
+  ColumnReadProjection readProjection(ColumnMetadata col);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
index 88da7f1..de66a38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
@@ -19,16 +19,14 @@ package org.apache.drill.exec.physical.rowSet.impl;
 
 import java.util.ArrayList;
 
-import org.apache.drill.common.exceptions.CustomErrorContext;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet.ColumnReadProjection;
 import org.apache.drill.exec.physical.rowSet.impl.ColumnState.PrimitiveColumnState;
 import org.apache.drill.exec.physical.rowSet.impl.ListState.ListVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.RepeatedListState.RepeatedListColumnState;
 import org.apache.drill.exec.physical.rowSet.impl.RepeatedListState.RepeatedListVectorState;
-import org.apache.drill.exec.physical.rowSet.impl.SchemaTransformer.ColumnTransform;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.SimpleVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapArrayState;
@@ -37,18 +35,13 @@ import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.TupleState.SingleMapState;
 import org.apache.drill.exec.physical.rowSet.impl.UnionState.UnionColumnState;
 import org.apache.drill.exec.physical.rowSet.impl.UnionState.UnionVectorState;
-import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
-import org.apache.drill.exec.record.metadata.ProjectionType;
-import org.apache.drill.exec.record.metadata.PropertyAccessor;
 import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.convert.AbstractWriteConverter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
@@ -88,46 +81,6 @@ import org.apache.drill.exec.vector.complex.UnionVector;
  */
 public class ColumnBuilder {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnBuilder.class);
-
-  /**
-   * Default column transform for an unprojected column. No type conversion
-   * is needed, unprojected columns just "free-wheel": they are along for the
-   * ride, but don't do anything. They do not cause a vector to be materialized.
-   * The client, however, can still write to them, though the data is ignored.
-   */
-  public static class NoOpTransform implements ColumnTransform {
-
-    private final ColumnMetadata columnSchema;
-
-    public NoOpTransform(ColumnMetadata columnSchema) {
-      this.columnSchema = columnSchema;
-    }
-
-    @Override
-    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
-      assert false; // Should never be materialized
-      return null;
-    }
-
-    @Override
-    public ProjectionType projectionType() { return ProjectionType.UNPROJECTED; }
-
-    @Override
-    public ColumnMetadata inputSchema() { return columnSchema; }
-
-    @Override
-    public ColumnMetadata outputSchema() { return columnSchema; }
-  }
-
-  private final SchemaTransformer schemaTransformer;
-  private final CustomErrorContext context;
-
-  public ColumnBuilder(SchemaTransformer schemaTransformer, CustomErrorContext context) {
-    this.schemaTransformer = schemaTransformer;
-    this.context = context;
-  }
-
   /**
    * Implementation of the work to add a new column to this tuple given a
    * schema description of the column.
@@ -140,38 +93,23 @@ public class ColumnBuilder {
    */
   public ColumnState buildColumn(ContainerState parent, ColumnMetadata columnSchema) {
 
-    // Indicate projection in the metadata.
-
-    ProjectionType projType = parent.projectionType(columnSchema.name());
-    ColumnTransform outputCol;
-    if (projType == ProjectionType.UNPROJECTED) {
-      PropertyAccessor.set(columnSchema, ColumnMetadata.PROJECTED_PROP, false);
-      outputCol = new NoOpTransform(columnSchema);
-    } else {
-
-      // Transform the column from input to output type.
-
-      outputCol = schemaTransformer.transform(columnSchema, projType);
-    }
-
-    // Build the column
-
-    switch (outputCol.outputSchema().structureType()) {
+    ColumnReadProjection colProj = parent.projectionSet().readProjection(columnSchema);
+    switch (colProj.providedSchema().structureType()) {
     case TUPLE:
-      return buildMap(parent, outputCol);
+      return buildMap(parent, colProj);
     case VARIANT:
       // Variant: UNION or (non-repeated) LIST
       if (columnSchema.isArray()) {
         // (non-repeated) LIST (somewhat like a repeated UNION)
-        return buildList(parent, outputCol);
+        return buildList(parent, colProj);
       } else {
         // (Non-repeated) UNION
-        return buildUnion(parent, outputCol);
+        return buildUnion(parent, colProj);
       }
     case MULTI_ARRAY:
-      return buildRepeatedList(parent, outputCol);
+      return buildRepeatedList(parent, colProj);
     default:
-      return buildPrimitive(parent, outputCol);
+      return buildPrimitive(parent, colProj);
     }
   }
 
@@ -186,29 +124,11 @@ public class ColumnBuilder {
    * @return column state for the new column
    */
 
-  private ColumnState buildPrimitive(ContainerState parent, ColumnTransform outputCol) {
-    ProjectionType projType = outputCol.projectionType();
-    ColumnMetadata columnSchema = outputCol.outputSchema();
-
-    // Enforce correspondence between implied type from the projection list
-    // and the actual type of the column.
-
-    switch (projType) {
-    case ARRAY:
-      if (! columnSchema.isArray()) {
-        incompatibleProjection(projType, columnSchema);
-      }
-      break;
-    case TUPLE:
-    case TUPLE_ARRAY:
-      incompatibleProjection(projType, columnSchema);
-      break;
-    default:
-      break;
-    }
+  private ColumnState buildPrimitive(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
 
     ValueVector vector;
-    if (projType == ProjectionType.UNPROJECTED) {
+    if (!colProj.isProjected()) {
 
       // Column is not projected. No materialized backing for the column.
 
@@ -230,7 +150,7 @@ public class ColumnBuilder {
     // Create the writer.
 
     final AbstractObjectWriter colWriter = ColumnWriterFactory.buildColumnWriter(
-        columnSchema, outputCol, vector);
+        columnSchema, colProj.conversionFactory(), vector);
 
     // Build the vector state which manages the vector.
 
@@ -253,18 +173,6 @@ public class ColumnBuilder {
         vectorState);
   }
 
-  private void incompatibleProjection(ProjectionType projType,
-      ColumnMetadata columnSchema) {
-    throw UserException
-      .validationError()
-      .message("Incompatible projection type and data type for column `%s`", columnSchema.name())
-      .addContext("Column:", columnSchema.name())
-      .addContext("Type:", Types.getSqlTypeName(columnSchema.majorType()))
-      .addContext("Projection type:", projType.label())
-      .addContext(context)
-      .build(logger);
-  }
-
   /**
    * Build a new map (single or repeated) column. Except for maps nested inside
    * of unions, no map vector is created
@@ -275,8 +183,8 @@ public class ColumnBuilder {
    * @return column state for the map column
    */
 
-  private ColumnState buildMap(ContainerState parent, ColumnTransform outputCol) {
-    ColumnMetadata columnSchema = outputCol.outputSchema();
+  private ColumnState buildMap(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
 
     // When dynamically adding columns, must add the (empty)
     // map by itself, then add columns to the map via separate
@@ -288,27 +196,18 @@ public class ColumnBuilder {
     // Create the vector, vector state and writer.
 
     if (columnSchema.isArray()) {
-      return buildMapArray(parent, outputCol);
+      return buildMapArray(parent, colProj);
     } else {
-      return buildSingleMap(parent, outputCol);
+      return buildSingleMap(parent, colProj);
     }
   }
 
-  private ColumnState buildSingleMap(ContainerState parent, ColumnTransform outputCol) {
-    ProjectionType projType = outputCol.projectionType();
-    ColumnMetadata columnSchema = outputCol.outputSchema();
+  private ColumnState buildSingleMap(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
 
-    switch (projType) {
-    case ARRAY:
-    case TUPLE_ARRAY:
-      incompatibleProjection(projType, columnSchema);
-      break;
-    default:
-      break;
-    }
     MapVector vector;
     VectorState vectorState;
-    if (projType == ProjectionType.UNPROJECTED) {
+    if (!colProj.isProjected()) {
       vector = null;
       vectorState = new NullVectorState();
     } else {
@@ -324,19 +223,18 @@ public class ColumnBuilder {
     final TupleObjectWriter mapWriter = MapWriter.buildMap(columnSchema, vector, new ArrayList<>());
     final SingleMapState mapState = new SingleMapState(parent.loader(),
         parent.vectorCache().childCache(columnSchema.name()),
-        parent.projectionSet().mapProjection(columnSchema.name()));
+        colProj.mapProjection());
     return new MapColumnState(mapState, mapWriter, vectorState, parent.isVersioned());
   }
 
-  private ColumnState buildMapArray(ContainerState parent, ColumnTransform outputCol) {
-    ProjectionType projType = outputCol.projectionType();
-    ColumnMetadata columnSchema = outputCol.outputSchema();
+  private ColumnState buildMapArray(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
 
     // Create the map's offset vector.
 
     RepeatedMapVector mapVector;
     UInt4Vector offsetVector;
-    if (projType == ProjectionType.UNPROJECTED) {
+    if (!colProj.isProjected()) {
       mapVector = null;
       offsetVector = null;
     } else {
@@ -365,7 +263,7 @@ public class ColumnBuilder {
     // Wrap the offset vector in a vector state
 
     VectorState offsetVectorState;
-    if (projType == ProjectionType.UNPROJECTED) {
+    if (!colProj.isProjected()) {
       offsetVectorState = new NullVectorState();
     } else {
       offsetVectorState = new OffsetVectorState(
@@ -379,7 +277,7 @@ public class ColumnBuilder {
 
     final MapArrayState mapState = new MapArrayState(parent.loader(),
         parent.vectorCache().childCache(columnSchema.name()),
-        parent.projectionSet().mapProjection(columnSchema.name()));
+        colProj.mapProjection());
     return new MapColumnState(mapState, writer, mapVectorState, parent.isVersioned());
   }
 
@@ -400,24 +298,10 @@ public class ColumnBuilder {
    * @param columnSchema column schema
    * @return column
    */
-  private ColumnState buildUnion(ContainerState parent, ColumnTransform outputCol) {
-    ProjectionType projType = outputCol.projectionType();
-    ColumnMetadata columnSchema = outputCol.outputSchema();
+  private ColumnState buildUnion(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
     assert columnSchema.isVariant() && ! columnSchema.isArray();
 
-    switch (projType) {
-    case ARRAY:
-    case TUPLE:
-    case TUPLE_ARRAY:
-      incompatibleProjection(projType, columnSchema);
-      break;
-    case UNPROJECTED:
-      throw new UnsupportedOperationException("Drill does not currently support unprojected union columns: " +
-          columnSchema.name());
-    default:
-      break;
-    }
-
     // Create the union vector.
     // Don't get the union vector from the vector cache. Union vectors may
     // have content that varies from batch to batch. Only the leaf
@@ -438,7 +322,7 @@ public class ColumnBuilder {
     // Create the manager for the columns within the union.
 
     final UnionState unionState = new UnionState(parent.loader(),
-        parent.vectorCache().childCache(columnSchema.name()), new ImpliedTupleRequest(true));
+        parent.vectorCache().childCache(columnSchema.name()));
 
     // Bind the union state to the union writer to handle column additions.
 
@@ -449,18 +333,8 @@ public class ColumnBuilder {
     return new UnionColumnState(parent.loader(), writer, vectorState, unionState);
   }
 
-  private ColumnState buildList(ContainerState parent, ColumnTransform outputCol) {
-    ProjectionType projType = outputCol.projectionType();
-    ColumnMetadata columnSchema = outputCol.outputSchema();
-
-    switch (projType) {
-    case TUPLE:
-    case TUPLE_ARRAY:
-      incompatibleProjection(projType, columnSchema);
-      break;
-    default:
-      break;
-    }
+  private ColumnState buildList(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
 
     // If the list has declared a single type, and has indicated that this
     // is the only type expected, then build the list as a nullable array
@@ -470,12 +344,12 @@ public class ColumnBuilder {
     final VariantMetadata variant = columnSchema.variantSchema();
     if (variant.isSimple()) {
       if (variant.size() == 1) {
-        return buildSimpleList(parent, outputCol);
+        return buildSimpleList(parent, colProj);
       } else if (variant.size() == 0) {
         throw new IllegalArgumentException("Size of a non-expandable list can't be zero");
       }
     }
-    return buildUnionList(parent, outputCol);
+    return buildUnionList(parent, colProj);
   }
 
   /**
@@ -493,8 +367,8 @@ public class ColumnBuilder {
    * @return the column state for the list
    */
 
-  private ColumnState buildSimpleList(ContainerState parent, ColumnTransform outputCol) {
-    ColumnMetadata columnSchema = outputCol.outputSchema();
+  private ColumnState buildSimpleList(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
 
     // The variant must have the one and only type.
 
@@ -504,8 +378,7 @@ public class ColumnBuilder {
     // Create the manager for the one and only column within the list.
 
     final ListState listState = new ListState(parent.loader(),
-        parent.vectorCache().childCache(columnSchema.name()),
-        new ImpliedTupleRequest(true));
+        parent.vectorCache().childCache(columnSchema.name()));
 
     // Create the child vector, writer and state.
 
@@ -553,8 +426,8 @@ public class ColumnBuilder {
    * @return the column state for the list
    */
 
-  private ColumnState buildUnionList(ContainerState parent, ColumnTransform outputCol) {
-    ColumnMetadata columnSchema = outputCol.outputSchema();
+  private ColumnState buildUnionList(ContainerState parent, ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
 
     // The variant must start out empty.
 
@@ -588,8 +461,7 @@ public class ColumnBuilder {
     // may not be grouped into a union.)
 
     final ListState listState = new ListState(parent.loader(),
-        parent.vectorCache().childCache(columnSchema.name()),
-        ImpliedTupleRequest.ALL_MEMBERS);
+        parent.vectorCache().childCache(columnSchema.name()));
 
     // Bind the union state to the union writer to handle column additions.
 
@@ -602,9 +474,8 @@ public class ColumnBuilder {
   }
 
   private ColumnState buildRepeatedList(ContainerState parent,
-      ColumnTransform outputCol) {
-    ProjectionType projType = outputCol.projectionType();
-    ColumnMetadata columnSchema = outputCol.outputSchema();
+      ColumnReadProjection colProj) {
+    ColumnMetadata columnSchema = colProj.providedSchema();
 
     assert columnSchema.type() == MinorType.LIST;
     assert columnSchema.mode() == DataMode.REPEATED;
@@ -614,15 +485,6 @@ public class ColumnBuilder {
 
     assert columnSchema.childSchema() == null;
 
-    switch (projType) {
-    case TUPLE:
-    case TUPLE_ARRAY:
-      incompatibleProjection(projType, columnSchema);
-      break;
-    default:
-      break;
-    }
-
     // Build the repeated vector.
 
     final RepeatedListVector vector = new RepeatedListVector(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
index 36b9db8..6625da2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
@@ -191,7 +191,8 @@ public abstract class ColumnState {
       AbstractObjectWriter writer, VectorState vectorState) {
     this.loader = loader;
     this.vectorState = vectorState;
-    this.addVersion = loader.bumpVersion();
+    addVersion = writer.isProjected() ?
+        loader.bumpVersion() : loader.activeSchemaVersion();
     state = loader.hasOverflow() ?
         State.NEW_LOOK_AHEAD : State.NORMAL;
     this.writer = writer;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java
index c28f7ae..f1464cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java
@@ -19,10 +19,10 @@ package org.apache.drill.exec.physical.rowSet.impl;
 
 import java.util.Collection;
 
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetFactory;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 
 /**
  * Abstract representation of a container of vectors: a row, a map, a
@@ -44,7 +44,7 @@ import org.apache.drill.exec.record.metadata.ProjectionType;
 public abstract class ContainerState {
 
   protected final LoaderInternals loader;
-  protected final RequestedTuple projectionSet;
+  protected final ProjectionSet projectionSet;
   protected ColumnState parentColumn;
 
   /**
@@ -54,12 +54,16 @@ public abstract class ContainerState {
 
   protected final ResultVectorCache vectorCache;
 
-  public ContainerState(LoaderInternals loader, ResultVectorCache vectorCache, RequestedTuple projectionSet) {
+  public ContainerState(LoaderInternals loader, ResultVectorCache vectorCache, ProjectionSet projectionSet) {
     this.loader = loader;
     this.vectorCache = vectorCache;
     this.projectionSet = projectionSet;
   }
 
+  public ContainerState(LoaderInternals loader, ResultVectorCache vectorCache) {
+    this(loader, vectorCache, ProjectionSetFactory.projectAll());
+  }
+
   public void bindColumnState(ColumnState parentState) {
     this.parentColumn = parentState;
   }
@@ -80,11 +84,7 @@ public abstract class ContainerState {
 
   protected LoaderInternals loader() { return loader; }
   public ResultVectorCache vectorCache() { return vectorCache; }
-  public RequestedTuple projectionSet() { return projectionSet; }
-
-  public ProjectionType projectionType(String columnName) {
-    return projectionSet.projectionType(columnName);
-  }
+  public ProjectionSet projectionSet() { return projectionSet; }
 
   public ColumnState addColumn(ColumnMetadata columnSchema) {
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/DefaultSchemaTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/DefaultSchemaTransformer.java
deleted file mode 100644
index d1d53ab..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/DefaultSchemaTransformer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.rowSet.impl;
-
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.ProjectionType;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.convert.AbstractWriteConverter;
-import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
-
-/**
- * Default schema transformer that maps input types to output types and
- * simply passes along the input schema and projection type. Provides
- * support for an ad-hoc column conversion factory (to create type
- * conversion shims), such as those used in unit tests.
- */
-public class DefaultSchemaTransformer implements SchemaTransformer {
-
-  public class DefaultColumnTransformer implements ColumnTransform {
-
-    private final ColumnMetadata columnSchema;
-    private final ProjectionType projType;
-
-    public DefaultColumnTransformer(ColumnMetadata inputSchema, ProjectionType projType) {
-      columnSchema = inputSchema;
-      this.projType = projType;
-    }
-
-    @Override
-    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
-      if (conversionFactory == null) {
-        return null;
-      }
-      return conversionFactory.newWriter(baseWriter);
-    }
-
-    @Override
-    public ProjectionType projectionType() { return projType; }
-
-    @Override
-    public ColumnMetadata inputSchema() { return columnSchema; }
-
-    @Override
-    public ColumnMetadata outputSchema() { return columnSchema; }
-  }
-
-  private final ColumnConversionFactory conversionFactory;
-
-  public DefaultSchemaTransformer(ColumnConversionFactory conversionFactory) {
-    this.conversionFactory = conversionFactory;
-  }
-
-  @Override
-  public ColumnTransform transform(ColumnMetadata inputSchema,
-      ProjectionType projType) {
-    return new DefaultColumnTransformer(inputSchema, projType);
-  }
-
-  @Override
-  public TupleMetadata outputSchema() { return null; }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
index 37104f4..780e1bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.IsSetVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.UnionState.UnionVectorState;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.exec.record.metadata.VariantSchema;
@@ -40,6 +39,7 @@ import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl;
 import org.apache.drill.exec.vector.accessor.writer.WriterEvents;
 import org.apache.drill.exec.vector.complex.ListVector;
 import org.apache.drill.exec.vector.complex.UnionVector;
+import org.apache.drill.exec.vector.complex.impl.UnionWriter;
 
 /**
  * Represents the contents of a list vector. A list vector is an odd creature.
@@ -202,9 +202,8 @@ public class ListState extends ContainerState
 
   private final Map<MinorType, ColumnState> columns = new HashMap<>();
 
-  public ListState(LoaderInternals loader, ResultVectorCache vectorCache,
-      RequestedTuple projectionSet) {
-    super(loader, vectorCache, projectionSet);
+  public ListState(LoaderInternals loader, ResultVectorCache vectorCache) {
+    super(loader, vectorCache);
   }
 
   public VariantMetadata variantSchema() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java
index f559c43..59d86b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java
@@ -43,6 +43,14 @@ interface LoaderInternals {
   int bumpVersion();
 
   /**
+   * Reports the current schema version. Used when adding an unprojected
+   * column which should not affect the output schema.
+   *
+   * @return the current schema version
+   */
+  int activeSchemaVersion();
+
+  /**
    * Accumulate the initial vector allocation sizes.
    *
    * @param allocationBytes number of bytes allocated to a vector
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
index 0079f50..72bbae6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
@@ -17,17 +17,13 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
-import java.util.Collection;
-
 import org.apache.drill.common.exceptions.CustomErrorContext;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 /**
  * Builder for the options for the row set loader. Reasonable defaults
@@ -38,12 +34,10 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 public class OptionBuilder {
   protected int vectorSizeLimit;
   protected int rowCountLimit;
-  protected Collection<SchemaPath> projection;
-  protected RequestedTuple projectionSet;
   protected ResultVectorCache vectorCache;
+  protected ProjectionSet projectionSet;
   protected TupleMetadata schema;
   protected long maxBatchSize;
-  protected SchemaTransformer schemaTransformer;
 
   /**
    * Error message context
@@ -80,31 +74,6 @@ public class OptionBuilder {
   }
 
   /**
-   * Record (batch) readers often read a subset of available table columns,
-   * but want to use a writer schema that includes all columns for ease of
-   * writing. (For example, a CSV reader must read all columns, even if the user
-   * wants a subset. The unwanted columns are simply discarded.)
-   * <p>
-   * This option provides a projection list, in the form of column names, for
-   * those columns which are to be projected. Only those columns will be
-   * backed by value vectors; non-projected columns will be backed by "null"
-   * writers that discard all values.
-   *
-   * @param projection the list of projected columns
-   * @return this builder
-   */
-
-  public OptionBuilder setProjection(Collection<SchemaPath> projection) {
-    this.projection = projection;
-    return this;
-  }
-
-  public OptionBuilder setProjectionSet(RequestedTuple projectionSet) {
-    this.projectionSet = projectionSet;
-    return this;
-  }
-
-  /**
    * Downstream operators require "vector persistence": the same vector
    * must represent the same column in every batch. For the scan operator,
    * which creates multiple readers, this can be a challenge. The vector
@@ -138,15 +107,8 @@ public class OptionBuilder {
     return this;
   }
 
-  /**
-   * Provide an optional higher-level schema transformer which can convert
-   * columns from one type to another.
-   *
-   * @param transform the column conversion factory
-   * @return this builder
-   */
-  public OptionBuilder setSchemaTransform(SchemaTransformer transform) {
-    schemaTransformer = transform;
+  public OptionBuilder setProjection(ProjectionSet projSet) {
+    this.projectionSet = projSet;
     return this;
   }
 
@@ -159,7 +121,6 @@ public class OptionBuilder {
   }
 
   public ResultSetOptions build() {
-    Preconditions.checkArgument(projection == null || projectionSet == null);
     return new ResultSetOptions(this);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java
index 39d7d44..7aa146d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java
@@ -20,10 +20,10 @@ package org.apache.drill.exec.physical.rowSet.impl;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetFactory;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseContainerColumnState;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
-import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
@@ -33,7 +33,6 @@ import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.RepeatedListWriter;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
@@ -158,7 +157,7 @@ public class RepeatedListState extends ContainerState implements RepeatedListWri
 
   public RepeatedListState(LoaderInternals loader,
       ResultVectorCache vectorCache) {
-    super(loader, vectorCache, ImpliedTupleRequest.ALL_MEMBERS);
+    super(loader, vectorCache, ProjectionSetFactory.projectAll());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
index ab2bc48..e871db4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
@@ -20,13 +20,12 @@ package org.apache.drill.exec.physical.rowSet.impl;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetFactory;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.TupleState.RowState;
-import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
@@ -49,10 +48,9 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
     protected final int vectorSizeLimit;
     protected final int rowCountLimit;
     protected final ResultVectorCache vectorCache;
-    protected final RequestedTuple projectionSet;
+    protected final ProjectionSet projectionSet;
     protected final TupleMetadata schema;
     protected final long maxBatchSize;
-    protected final SchemaTransformer schemaTransformer;
 
     /**
      * Context for error messages.
@@ -62,11 +60,10 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
     public ResultSetOptions() {
       vectorSizeLimit = ValueVector.MAX_BUFFER_SIZE;
       rowCountLimit = DEFAULT_ROW_COUNT;
-      projectionSet = new ImpliedTupleRequest(true);
+      projectionSet = ProjectionSetFactory.projectAll();
       vectorCache = null;
       schema = null;
       maxBatchSize = -1;
-      schemaTransformer = null;
       errorContext = null;
     }
 
@@ -76,18 +73,10 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
       vectorCache = builder.vectorCache;
       schema = builder.schema;
       maxBatchSize = builder.maxBatchSize;
-      schemaTransformer = builder.schemaTransformer;
       errorContext = builder.errorContext;
-
-      // If projection, build the projection map.
-      // The caller might have already built the map. If so,
-      // use it.
-
-      if (builder.projectionSet != null) {
-        projectionSet = builder.projectionSet;
-      } else {
-        projectionSet = RequestedTupleImpl.parse(builder.projection);
-      }
+      projectionSet = builder.projectionSet == null ?
+          ProjectionSetFactory.projectAll() :
+          builder.projectionSet;
     }
 
     public void dump(HierarchicalFormatter format) {
@@ -284,18 +273,14 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
 
   protected int accumulatedBatchSize;
 
-  protected final RequestedTuple projectionSet;
+  protected final ProjectionSet projectionSet;
 
   public ResultSetLoaderImpl(BufferAllocator allocator, ResultSetOptions options) {
     this.allocator = allocator;
     this.options = options;
     targetRowCount = options.rowCountLimit;
     writerIndex = new WriterIndexImpl(this);
-    SchemaTransformer schemaTransformer = options.schemaTransformer;
-    if (schemaTransformer == null) {
-      schemaTransformer = new DefaultSchemaTransformer(null);
-    }
-    columnBuilder = new ColumnBuilder(schemaTransformer, options.errorContext);
+    columnBuilder = new ColumnBuilder();
 
     // Set the projections
 
@@ -364,6 +349,9 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
   }
 
   @Override
+  public int activeSchemaVersion( ) { return activeSchemaVersion; }
+
+  @Override
   public int schemaVersion() {
     switch (state) {
     case ACTIVE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformer.java
deleted file mode 100644
index cdcf8df..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.rowSet.impl;
-
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.ProjectionType;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
-
-/**
- * Interface to a mechanism that transforms the schema desired by
- * a reader (or other client of the result set loader) to the schema
- * desired for the output batch. Automates conversions of multiple
- * types, such as parsing a Varchar into a date or INT, etc. The actual
- * conversion policy is provided by the implementation.
- */
-public interface SchemaTransformer {
-
-  /**
-   * Describes how to transform a column from input type to output type,
-   * including the associated projection type
-   */
-  public interface ColumnTransform extends ColumnConversionFactory {
-    ProjectionType projectionType();
-    ColumnMetadata inputSchema();
-    ColumnMetadata outputSchema();
-  }
-
-  TupleMetadata outputSchema();
-  ColumnTransform transform(ColumnMetadata inputSchema, ProjectionType projType);
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java
deleted file mode 100644
index 4df02e8..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SchemaTransformerImpl.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.rowSet.impl;
-
-import java.util.Map;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionType;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.ProjectionType;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.convert.AbstractWriteConverter;
-import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
-import org.apache.drill.exec.vector.accessor.convert.StandardConversions;
-import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionDefn;
-
-/**
- * Base class for plugin-specific type transforms. Handles basic type
- * checking. Assumes a type conversion is needed only if the output
- * column is defined and has a type or mode different than the input.
- * Else, assumes no transform is needed. Subclases can change or enhance
- * this policy. The subclass provides the actual per-column transform logic.
- * <p>
- * This class also handles setting default values for required vectors
- * when a default value is available from the column schema.
- */
-
-public class SchemaTransformerImpl implements SchemaTransformer {
-
-  private static final org.slf4j.Logger logger =
-      org.slf4j.LoggerFactory.getLogger(SchemaTransformerImpl.class);
-
-  public static abstract class AbstractColumnTransform implements ColumnTransform {
-
-    private final ColumnMetadata inputSchema;
-    private final ColumnMetadata outputSchema;
-    private final ProjectionType projType;
-
-    public AbstractColumnTransform(ColumnMetadata colDefn, ProjectionType projType,
-        ColumnMetadata outputDefn) {
-      inputSchema = colDefn;
-      outputSchema = outputDefn;
-      this.projType = projType;
-    }
-
-    @Override
-    public ProjectionType projectionType() { return projType; }
-
-    @Override
-    public ColumnMetadata inputSchema() { return inputSchema; }
-
-    @Override
-    public ColumnMetadata outputSchema() { return outputSchema; }
-  }
-
-  /**
-   * A no-op transform that simply keeps the input column schema and
-   * writer without any changes.
-   */
-  public static class PassThroughColumnTransform extends AbstractColumnTransform {
-
-    public PassThroughColumnTransform(ColumnMetadata colDefn, ProjectionType projType,
-        ColumnMetadata outputDefn) {
-      super(colDefn, projType, outputDefn);
-    }
-
-    @Override
-    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
-      return null;
-    }
-  }
-
-  /**
-   * Full column transform that has separate input and output types
-   * and provides a type conversion writer to convert between the
-   * two. The conversion writer factory is provided via composition,
-   * not by subclassing this class.
-   */
-  public static class ColumnSchemaTransform extends AbstractColumnTransform {
-
-    private final ColumnConversionFactory conversionFactory;
-
-    public ColumnSchemaTransform(ColumnMetadata inputSchema, ColumnMetadata outputSchema,
-        ProjectionType projType, ColumnConversionFactory conversionFactory) {
-      super(inputSchema, projType, outputSchema);
-      this.conversionFactory = conversionFactory;
-    }
-
-    @Override
-    public AbstractWriteConverter newWriter(ScalarWriter baseWriter) {
-      if (conversionFactory == null) {
-        return null;
-      }
-      return conversionFactory.newWriter(baseWriter);
-    }
-  }
-
-  protected final TupleMetadata outputSchema;
-  protected final Map<String, String> properties;
-
-  public SchemaTransformerImpl(TupleMetadata outputSchema,
-      Map<String, String> properties) {
-    this.outputSchema = outputSchema;
-    this.properties = properties;
-  }
-
-  /**
-   * Implement a basic policy to pass through input columns for which there
-   * is no matching output column, and to do a type conversion only if types
-   * and modes differ.
-   * <p>
-   * Subclasses can change this behavior if, say, they want to do conversion
-   * even if the types are the same (such as parsing a VARCHAR field to produce
-   * another VARCHAR.)
-   */
-  @Override
-  public ColumnTransform transform(ColumnMetadata inputSchema,
-      ProjectionType projType) {
-
-    // Should never get an unprojected column; should be handled
-    // by the caller.
-
-    assert projType != ProjectionType.UNPROJECTED;
-
-    // If no matching column, assume a pass-through transform
-
-    ColumnMetadata outputCol = outputSchema.metadata(inputSchema.name());
-    if (outputCol == null) {
-      return new PassThroughColumnTransform(inputSchema, projType, inputSchema);
-    }
-
-    ConversionDefn defn = StandardConversions.analyze(inputSchema, outputCol);
-    ColumnConversionFactory factory = customTransform(inputSchema, outputCol, defn);
-    if (factory == null) {
-      switch (defn.type) {
-      case NONE:
-      case IMPLICIT:
-        return new PassThroughColumnTransform(inputSchema, projType, outputCol);
-      case EXPLICIT:
-        if (defn.conversionClass == null) {
-          throw UserException.validationError()
-            .message("Runtime type conversion not available")
-            .addContext("Column:", outputCol.name())
-            .addContext("Input type:", inputSchema.typeString())
-            .addContext("Output type:", outputCol.typeString())
-            .build(logger);
-        }
-        factory = StandardConversions.factory(defn.conversionClass, properties);
-        break;
-      default:
-        throw new IllegalStateException("Unexpected conversion type: " + defn.type);
-      }
-    }
-    return new ColumnSchemaTransform(inputSchema, outputCol, projType, factory);
-  }
-
-  /**
-   * Overridden to provide a custom conversion between input an output types.
-   *
-   * @param inputDefn the column schema for the input column which the
-   * client code (e.g. reader) wants to produce
-   * @param outputDefn the column schema for the output vector to be produced
-   * by this operator
-   * @param defn a description of the required conversion. This method is
-   * required to do nothing of conversion type is
-   * {@link ScanProjectionType.EXPLICIT} and the conversion class is null, meaning
-   * that no standard conversion is available
-   * @return a column transformer factory to implement a custom conversion,
-   * or null to use the standard conversion
-   */
-  private ColumnConversionFactory customTransform(ColumnMetadata inputDefn,
-      ColumnMetadata outputDefn, ConversionDefn defn) {
-    return null;
-  }
-
-  @Override
-  public TupleMetadata outputSchema() { return outputSchema; }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
index 1eb441d..d0b44a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
@@ -22,9 +22,9 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseContainerColumnState;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
@@ -316,7 +316,7 @@ public abstract class TupleState extends ContainerState
 
     public MapState(LoaderInternals events,
         ResultVectorCache vectorCache,
-        RequestedTuple projectionSet) {
+        ProjectionSet projectionSet) {
       super(events, vectorCache, projectionSet);
     }
 
@@ -388,7 +388,7 @@ public abstract class TupleState extends ContainerState
 
     public SingleMapState(LoaderInternals events,
         ResultVectorCache vectorCache,
-        RequestedTuple projectionSet) {
+        ProjectionSet projectionSet) {
       super(events, vectorCache, projectionSet);
     }
 
@@ -408,7 +408,7 @@ public abstract class TupleState extends ContainerState
 
     public MapArrayState(LoaderInternals events,
         ResultVectorCache vectorCache,
-        RequestedTuple projectionSet) {
+        ProjectionSet projectionSet) {
       super(events, vectorCache, projectionSet);
     }
 
@@ -459,7 +459,7 @@ public abstract class TupleState extends ContainerState
 
   protected TupleState(LoaderInternals events,
       ResultVectorCache vectorCache,
-      RequestedTuple projectionSet) {
+      ProjectionSet projectionSet) {
     super(events, vectorCache, projectionSet);
   }
 
@@ -533,7 +533,7 @@ public abstract class TupleState extends ContainerState
 
       // Ignore unprojected columns
 
-      if (! colState.schema().isProjected()) {
+      if (! colState.writer().isProjected()) {
         continue;
       }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/UnionState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/UnionState.java
index e504619..1d96e48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/UnionState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/UnionState.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseContainerColumnState;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.FixedWidthVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.SimpleVectorState;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.exec.record.metadata.VariantSchema;
@@ -151,8 +150,8 @@ public class UnionState extends ContainerState
 
   private final Map<MinorType, ColumnState> columns = new HashMap<>();
 
-  public UnionState(LoaderInternals events, ResultVectorCache vectorCache, RequestedTuple projectionSet) {
-    super(events, vectorCache, projectionSet);
+  public UnionState(LoaderInternals events, ResultVectorCache vectorCache) {
+    super(events, vectorCache);
   }
 
   public UnionWriterImpl writer() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
index f464bae..b03c949 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 
 /**
  * Represents a wildcard: SELECT * when used at the root tuple.
@@ -46,7 +45,7 @@ public class ImpliedTupleRequest implements RequestedTuple {
   @Override
   public ProjectionType projectionType(String colName) {
     return allProjected
-      ? ProjectionType.UNSPECIFIED
+      ? ProjectionType.GENERAL
       : ProjectionType.UNPROJECTED;
   }
 
@@ -66,4 +65,9 @@ public class ImpliedTupleRequest implements RequestedTuple {
 
   @Override
   public void buildName(StringBuilder buf) { }
+
+  @Override
+  public TupleProjectionType type() {
+    return allProjected ? TupleProjectionType.ALL : TupleProjectionType.NONE;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ProjectionType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ProjectionType.java
new file mode 100644
index 0000000..7e0d6fd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ProjectionType.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.project;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+
+/**
+ * Specifies the type of projection obtained by parsing the
+ * projection list. The type is returned from a query of the
+ * form "how is this column projected, if at all?"
+ * <p>
+ * The projection type allows the scan framework to catch
+ * inconsistencies, such as projecting an array as a map,
+ * and so on.
+ */
+
+public enum ProjectionType {
+
+  /**
+   * The column is not projected in the query.
+   */
+
+  UNPROJECTED,
+
+  /**
+   * Projection is a wildcard.
+   */
+  WILDCARD,     // *
+
+  /**
+   * Projection is by simple name. "General" means that
+   * we have no hints about the type of the column from
+   * the projection.
+   */
+
+  GENERAL,      // x
+
+  /**
+   * The column is projected as a scalar. This state
+   * requires metadata beyond the projection list and
+   * is returned only when that metadata is available.
+   */
+
+  SCALAR,       // x (from schema)
+
+  /**
+   * Applies to the parent of an x.y pair in projection: the
+   * existence of a dotted-member tells us that the parent
+   * must be a tuple (e.g. a Map.)
+   */
+
+  TUPLE,        // x.y
+
+  /**
+   * The projection includes an array suffix, so the column
+   * must be an array.
+   */
+
+  ARRAY,        // x[0]
+
+  /**
+   * Combination of array and map hints.
+   */
+
+  TUPLE_ARRAY;  // x[0].y
+
+  public boolean isTuple() {
+    return this == ProjectionType.TUPLE || this == ProjectionType.TUPLE_ARRAY;
+  }
+
+  public boolean isArray() {
+    return this == ProjectionType.ARRAY || this == ProjectionType.TUPLE_ARRAY;
+  }
+
+  /**
+   * We can't tell, just from the project list, if a column must
+   * be scalar. A column of the form "a" could be a scalar, but
+   * that form is also consistent with maps and arrays.
+   */
+  public boolean isMaybeScalar() {
+    return this == GENERAL || this == SCALAR;
+  }
+
+  public static ProjectionType typeFor(MajorType majorType) {
+    if (majorType.getMinorType() == MinorType.MAP) {
+      if (majorType.getMode() == DataMode.REPEATED) {
+        return TUPLE_ARRAY;
+      } else {
+        return TUPLE;
+      }
+    }
+    if (majorType.getMode() == DataMode.REPEATED) {
+      return ARRAY;
+    }
+    if (majorType.getMinorType() == MinorType.LIST) {
+      return ARRAY;
+    }
+    return SCALAR;
+  }
+
+  /**
+   * Reports if this type (representing an item in a projection list)
+   * is compatible with the projection type representing an actual
+   * column produced by an operator. The check is not symmetric.
+   * <p>
+   * For example, a column of type map array is compatible with a
+   * projection of type map "m.a" (project all a members of the map array),
+   * but a projection type of map array "m[1].a" is not compatible with
+   * a (non-array) map column.
+   *
+   * @param readType projection type, from {@link #typeFor(MajorType)},
+   * for an actual column
+   * @return true if this projection type is compatible with the
+   * column's projection type
+   */
+
+  public boolean isCompatible(ProjectionType readType) {
+    switch (readType) {
+    case UNPROJECTED:
+    case GENERAL:
+    case WILDCARD:
+      return true;
+    default:
+      break;
+    }
+
+    switch (this) {
+    case ARRAY:
+      return readType == ARRAY || readType == TUPLE_ARRAY;
+    case TUPLE_ARRAY:
+      return readType == TUPLE_ARRAY;
+    case SCALAR:
+      return readType == SCALAR;
+    case TUPLE:
+      return readType == TUPLE || readType == TUPLE_ARRAY;
+    case UNPROJECTED:
+    case GENERAL:
+    case WILDCARD:
+      return true;
+    default:
+      throw new IllegalStateException(toString());
+    }
+  }
+
+  public String label() {
+    switch (this) {
+    case SCALAR:
+      return "scalar (a)";
+    case ARRAY:
+      return "array (a[n])";
+    case TUPLE:
+      return "tuple (a.x)";
+    case TUPLE_ARRAY:
+      return "tuple array (a[n].x)";
+    case WILDCARD:
+      return "wildcard (*)";
+    default:
+      return name();
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedColumnImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedColumnImpl.java
index 59b5103..11e9187 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedColumnImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedColumnImpl.java
@@ -20,9 +20,9 @@ package org.apache.drill.exec.physical.rowSet.project;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 
 /**
  * Represents one name element. Like a {@link NameSegment}, except that this
@@ -65,7 +65,7 @@ public class RequestedColumnImpl implements RequestedColumn {
   @Override
   public boolean isWildcard() { return type == ProjectionType.WILDCARD; }
   @Override
-  public boolean isSimple() { return type == ProjectionType.UNSPECIFIED; }
+  public boolean isSimple() { return type == ProjectionType.GENERAL; }
 
   @Override
   public boolean isArray() { return type.isArray(); }
@@ -157,7 +157,7 @@ public class RequestedColumnImpl implements RequestedColumn {
     } else if (members != null) {
       type = ProjectionType.TUPLE;
     } else {
-      type = ProjectionType.UNSPECIFIED;
+      type = ProjectionType.GENERAL;
     }
   }
 
@@ -190,6 +190,24 @@ public class RequestedColumnImpl implements RequestedColumn {
   }
 
   @Override
+  public RequestedTuple mapProjection() {
+    switch (type) {
+    case ARRAY:
+    case GENERAL:
+      // Don't know if the target is a tuple or not.
+
+      return ImpliedTupleRequest.ALL_MEMBERS;
+    case TUPLE:
+    case TUPLE_ARRAY:
+      return members == null ? ImpliedTupleRequest.ALL_MEMBERS : members;
+    case UNPROJECTED:
+      return ImpliedTupleRequest.NO_MEMBERS;
+    default:
+      return null;
+    }
+  }
+
+  @Override
   public String toString() {
     final StringBuilder buf = new StringBuilder();
     buf
@@ -212,7 +230,4 @@ public class RequestedColumnImpl implements RequestedColumn {
     buf.append("]");
     return buf.toString();
   }
-
-  @Override
-  public RequestedTuple mapProjection() { return members; }
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTuple.java
index fee0892..30fee3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTuple.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTuple.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.rowSet.project;
 import java.util.List;
 
 import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 
 /**
  * Represents the set of columns projected for a tuple (row or map.)
@@ -30,11 +29,11 @@ import org.apache.drill.exec.record.metadata.ProjectionType;
  * <p>
  * Three variations exist:
  * <ul>
- * <li>Project all ({@link ImpliedTupleRequest#ALL_MEMBERS}): used for a tuple when
- * all columns are projected. Example: the root tuple (the row) in
+ * <li>Project all ({@link ImpliedTupleRequest#ALL_MEMBERS}): used for a tuple
+ * when all columns are projected. Example: the root tuple (the row) in
  * a <tt>SELECT *</tt> query.</li>
- * <li>Project none  (also {@link ImpliedTupleRequest#NO_MEMBERS}): used when no
- * columns are projected from a tuple, such as when a map itself is
+ * <li>Project none (also {@link ImpliedTupleRequest#NO_MEMBERS}): used when
+ * no columns are projected from a tuple, such as when a map itself is
  * not projected, so none of its member columns are projected.</li>
  * <li>Project some ({@link RequestedTupleImpl}: used in the
  * <tt>SELECT a, c, e</tt> case in which the query identifies which
@@ -86,6 +85,11 @@ public interface RequestedTuple {
     String summary();
   }
 
+  public enum TupleProjectionType {
+    ALL, NONE, SOME
+  }
+
+  TupleProjectionType type();
   void parseSegment(PathSegment child);
   RequestedColumn get(String colName);
   ProjectionType projectionType(String colName);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
index b93c0c0..90ef358 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
@@ -18,13 +18,14 @@
 package org.apache.drill.exec.physical.rowSet.project;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.ArraySegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.exec.record.metadata.TupleNameSpace;
 
 /**
@@ -74,6 +75,7 @@ import org.apache.drill.exec.record.metadata.TupleNameSpace;
 public class RequestedTupleImpl implements RequestedTuple {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class);
+  private static final Collection<SchemaPath> PROJECT_ALL = Collections.singletonList(SchemaPath.STAR_COLUMN);
 
   private final RequestedColumnImpl parent;
   private final TupleNameSpace<RequestedColumn> projection = new TupleNameSpace<>();
@@ -142,7 +144,7 @@ public class RequestedTupleImpl implements RequestedTuple {
       return new ImpliedTupleRequest(true);
     }
     if (projList.isEmpty()) {
-      return new ImpliedTupleRequest(false);
+      return ImpliedTupleRequest.NO_MEMBERS;
     }
     return new RequestedTupleImpl(projList);
   }
@@ -172,10 +174,10 @@ public class RequestedTupleImpl implements RequestedTuple {
 
   public static RequestedTuple parse(Collection<SchemaPath> projList) {
     if (projList == null) {
-      return new ImpliedTupleRequest(true);
+      projList = PROJECT_ALL;
     }
-    if (projList.isEmpty()) {
-      return new ImpliedTupleRequest(false);
+    else if (projList.isEmpty()) {
+      return ImpliedTupleRequest.NO_MEMBERS;
     }
     RequestedTupleImpl projSet = new RequestedTupleImpl();
     for (SchemaPath col : projList) {
@@ -301,4 +303,22 @@ public class RequestedTupleImpl implements RequestedTuple {
       parent.buildName(buf);
     }
   }
+
+  /**
+   * Tuple projection type. This is a rough approximation. A scan-level projection
+   * may include both a wildcard and implicit columns. This form is best used
+   * in testing where such ambiguities do not apply.
+   */
+  @Override
+  public TupleProjectionType type() {
+    if (projection.isEmpty()) {
+      return TupleProjectionType.NONE;
+    }
+    for (RequestedColumn col : projection) {
+      if (col.isWildcard()) {
+        return TupleProjectionType.ALL;
+      }
+    }
+    return TupleProjectionType.SOME;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
index b46f6b5..0eaaf3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.record.metadata;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -30,10 +30,11 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.schema.parser.SchemaExprParser;
 import org.joda.time.format.DateTimeFormatter;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 
 /**
  * Abstract definition of column metadata. Allows applications to create
@@ -204,21 +205,6 @@ public abstract class AbstractColumnMetadata extends AbstractPropertied implemen
   }
 
   @Override
-  public void setProjected(boolean projected) {
-    if (projected) {
-      // Projected is the default
-      setProperty(PROJECTED_PROP, null);
-    } else {
-      PropertyAccessor.set(this, PROJECTED_PROP, projected);
-    }
-  }
-
-  @Override
-  public boolean isProjected() {
-    return PropertyAccessor.getBoolean(this, PROJECTED_PROP, true);
-  }
-
-  @Override
   public void setFormat(String value) {
     setProperty(FORMAT_PROP, value);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 3c79aea..4a2790e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -147,142 +147,6 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
           .addContext(errorContext())
           .build(logger);
       }
-
-      return new ScanBatch(context, oContext, readers, implicitColumns);
-    }
-
-    /**
-     * Create a record reader given a file system, a file description and other
-     * information. For backward compatibility, calls the plugin method by
-     * default.
-     *
-     * @param plugin
-     *          the plugin creating the scan
-     * @param context
-     *          fragment context for the fragment running the scan
-     * @param dfs
-     *          Drill's distributed file system facade
-     * @param fileWork
-     *          description of the file to scan
-     * @param columns
-     *          list of columns to project
-     * @param userName
-     *          the name of the user performing the scan
-     * @return a scan operator
-     * @throws ExecutionSetupException
-     *           if anything goes wrong
-     */
-
-    public RecordReader getRecordReader(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
-        FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
-        List<SchemaPath> columns, String userName) throws ExecutionSetupException {
-      return plugin.getRecordReader(context, dfs, fileWork, columns, userName);
-    }
-  }
-
-  /**
-   * Revised scanner based on the revised {@link org.apache.drill.exec.physical.rowSet.ResultSetLoader}
-   * and {@link org.apache.drill.exec.physical.impl.scan.RowBatchReader} classes.
-   * Handles most projection tasks automatically. Able to limit
-   * vector and batch sizes. Use this for new format plugins.
-   */
-
-  public abstract static class ScanFrameworkCreator
-      implements ScanBatchCreator {
-
-    protected EasyFormatPlugin<? extends FormatPluginConfig> plugin;
-
-    public ScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
-      this.plugin = plugin;
-    }
-
-    /**
-     * Builds the revised {@link FileBatchReader}-based scan batch.
-     *
-     * @param context
-     * @param scan
-     * @return
-     * @throws ExecutionSetupException
-     */
-
-    @Override
-    public CloseableRecordBatch buildScan(
-        final FragmentContext context,
-        final EasySubScan scan) throws ExecutionSetupException {
-
-      // Assemble the scan operator and its wrapper.
-
-      try {
-        final FileScanBuilder builder = frameworkBuilder(context.getOptions(), scan);
-        builder.setProjection(scan.getColumns());
-        builder.setFiles(scan.getWorkUnits());
-        builder.setConfig(plugin.easyConfig().fsConf);
-
-        // The text readers use required Varchar columns to represent null columns.
-
-        builder.allowRequiredNullColumns(true);
-        final Path selectionRoot = scan.getSelectionRoot();
-        if (selectionRoot != null) {
-          builder.metadataOptions().setSelectionRoot(selectionRoot);
-          builder.metadataOptions().setPartitionDepth(scan.getPartitionDepth());
-        }
-        FileScanFramework framework = builder.buildFileFramework();
-        return new OperatorRecordBatch(
-            context, scan,
-            new ScanOperatorExec(
-                framework));
-      } catch (final UserException e) {
-        // Rethrow user exceptions directly
-        throw e;
-      } catch (final Throwable e) {
-        // Wrap all others
-        throw new ExecutionSetupException(e);
-      }
-    }
-
-    /**
-     * Create the plugin-specific framework that manages the scan. The framework
-     * creates batch readers one by one for each file or block. It defines semantic
-     * rules for projection. It handles "early" or "late" schema readers. A typical
-     * framework builds on standardized frameworks for files in general or text
-     * files in particular.
-     *
-     * @param options system/session options which can be used to control or
-     * customize the scan framework
-     * @param scan the physical operation definition for the scan operation. Contains
-     * one or more files to read. (The Easy format plugin works only for files.)
-     * @return the scan framework which orchestrates the scan operation across
-     * potentially many files
-     * @throws ExecutionSetupException for all setup failures
-     */
-    protected abstract FileScanBuilder frameworkBuilder(
-        OptionManager options, EasySubScan scan) throws ExecutionSetupException;
-  }
-
-  /**
-   * Generic framework creator for files that just use the basic file
-   * support: metadata, etc. Specialized use cases (special "columns"
-   * column, say) will require a specialized implementation.
-   */
-
-  public abstract static class FileScanFrameworkCreator extends ScanFrameworkCreator {
-
-    private final FileReaderFactory readerCreator;
-
-    public FileScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
-        FileReaderFactory readerCreator) {
-      super(plugin);
-      this.readerCreator = readerCreator;
-    }
-
-    @Override
-    protected FileScanBuilder frameworkBuilder(
-        OptionManager options, EasySubScan scan) throws ExecutionSetupException {
-
-      FileScanBuilder builder = new FileScanBuilder();
-      builder.setReaderFactory(readerCreator);
-      return builder;
->>>>>>> ea212504f... DRILL-7279: Enable provided schema for text files without headers
     }
   }
 
@@ -495,7 +359,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
       // Pass along the output schema, if any
 
-      builder.setOutputSchema(scan.getSchema());
+      builder.typeConverterBuilder().providedSchema(scan.getSchema());
       final Path selectionRoot = scan.getSelectionRoot();
       if (selectionRoot != null) {
         builder.metadataOptions().setSelectionRoot(selectionRoot);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 7c3d950..f12fffb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -224,73 +224,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     }
   }
 
-  /**
-   * Builds the V3 text scan operator.
-   */
-  private static class TextScanBatchCreator extends ScanFrameworkCreator {
-
-    private final TextFormatPlugin textPlugin;
-
-    public TextScanBatchCreator(TextFormatPlugin plugin) {
-      super(plugin);
-      textPlugin = plugin;
-    }
-
-    @Override
-    protected FileScanBuilder frameworkBuilder(
-        OptionManager options,
-        EasySubScan scan) throws ExecutionSetupException {
-      ColumnsScanBuilder builder = new ColumnsScanBuilder();
-      TextParsingSettingsV3 settings = new TextParsingSettingsV3(textPlugin.getConfig(), scan, options);
-      builder.setReaderFactory(new ColumnsReaderFactory(settings));
-
-      // Provide custom error context
-      builder.setContext(
-          new CustomErrorContext() {
-            @Override
-            public void addContext(UserException.Builder builder) {
-              builder.addContext("Format plugin:", PLUGIN_NAME);
-              builder.addContext("Plugin config name:", textPlugin.getName());
-              builder.addContext("Extract headers:",
-                  Boolean.toString(settings.isHeaderExtractionEnabled()));
-              builder.addContext("Skip first line:",
-                  Boolean.toString(settings.isSkipFirstLine()));
-            }
-          });
-
-      // If this format has no headers, or wants to skip them,
-      // then we must use the columns column to hold the data.
-
-      builder.requireColumnsArray(settings.isUseRepeatedVarChar());
-
-      // Text files handle nulls in an unusual way. Missing columns
-      // are set to required Varchar and filled with blanks. Yes, this
-      // means that the SQL statement or code cannot differentiate missing
-      // columns from empty columns, but that is how CSV and other text
-      // files have been defined within Drill.
-
-      builder.setNullType(
-          MajorType.newBuilder()
-            .setMinorType(MinorType.VARCHAR)
-            .setMode(DataMode.REQUIRED)
-            .build());
-
-      // Pass along the output schema, if any
-
-      builder.setOutputSchema(scan.getSchema());
-
-      // CSV maps blank columns to nulls (for nullable non-string columns),
-      // or to the default value (for non-nullable non-string columns.)
-
-      builder.setConversionProperty(AbstractConvertFromString.BLANK_ACTION_PROP,
-          AbstractConvertFromString.BLANK_AS_NULL);
-
-      return builder;
-    }
-  }
-
-  public TextFormatPlugin(String name, DrillbitContext context,
-      Configuration fsConf, StoragePluginConfig storageConfig) {
+  public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
      this(name, context, fsConf, storageConfig, new TextFormatConfig());
   }
 
@@ -369,13 +303,14 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
   protected FileScanBuilder frameworkBuilder(
       OptionManager options, EasySubScan scan) throws ExecutionSetupException {
     ColumnsScanBuilder builder = new ColumnsScanBuilder();
-    builder.setReaderFactory(new ColumnsReaderFactory(this));
+    TextParsingSettingsV3 settings =
+        new TextParsingSettingsV3(getConfig(), scan, options);
+    builder.setReaderFactory(new ColumnsReaderFactory(settings));
 
     // If this format has no headers, or wants to skip them,
     // then we must use the columns column to hold the data.
 
-    builder.requireColumnsArray(
-        ! getConfig().isHeaderExtractionEnabled());
+    builder.requireColumnsArray(settings.isUseRepeatedVarChar());
 
     // Text files handle nulls in an unusual way. Missing columns
     // are set to required Varchar and filled with blanks. Yes, this
@@ -388,7 +323,8 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     // CSV maps blank columns to nulls (for nullable non-string columns),
     // or to the default value (for non-nullable non-string columns.)
 
-    builder.setConversionProperty(AbstractConvertFromString.BLANK_ACTION_PROP,
+    builder.typeConverterBuilder().setConversionProperty(
+        AbstractConvertFromString.BLANK_ACTION_PROP,
         AbstractConvertFromString.BLANK_AS_NULL);
 
     // The text readers use required Varchar columns to represent null columns.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
index 482c5cb..e291b3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
@@ -46,7 +46,7 @@ class FieldVarCharOutput extends BaseFieldOutput {
     final TupleMetadata schema = writer.tupleSchema();
     final boolean projectionMask[] = new boolean[schema.size()];
     for (int i = 0; i < schema.size(); i++) {
-      projectionMask[i] = schema.metadata(i).isProjected();
+      projectionMask[i] = writer.column(i).isProjected();
     }
     return projectionMask;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
index f7f1035..4c1a0b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
@@ -53,7 +53,7 @@ public class RepeatedVarCharOutput extends BaseFieldOutput {
     // If the one and only field (`columns`) is not selected, then this
     // is a COUNT(*) or similar query. Select nothing.
 
-    if (! loader.tupleSchema().metadata(0).isProjected()) {
+    if (! loader.column(0).isProjected()) {
       return -1;
     }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index 96ce354..0b3f9c8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -522,7 +522,6 @@ public class TestFileScanFramework extends SubOperatorTest {
 
   @Test
   public void testMapProject() {
-
     MockMapReader reader = new MockMapReader();
     reader.batchLimit = 1;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
index 4cd7fe6..413a3f0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
@@ -21,18 +21,19 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.categories.RowSetTests;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -108,10 +109,9 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
     outputSchema.metadata("e").setDefaultValue("30");
 
     BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
-    // Project (schema + reader), (reader only), (schema only), (neither)
     builder.setProjection(new String[]{"a", "b", "d", "f"});
     builder.addReader(new MockSimpleReader());
-    builder.builder.setOutputSchema(outputSchema);
+    builder.builder.typeConverterBuilder().providedSchema(outputSchema);
     builder.builder.setNullType(Types.optional(MinorType.VARCHAR));
     ScanFixture scanFixture = builder.build();
     ScanOperatorExec scan = scanFixture.scanOp;
@@ -148,10 +148,15 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
     scanFixture.close();
   }
 
+  /**
+   * Test non-strict specified schema, with a wildcard, with extra
+   * reader columns. Reader columns are included in output.
+   */
+
   @Test
   public void testOutputSchemaWithWildcard() {
     TupleMetadata outputSchema = new SchemaBuilder()
-        .add("a", MinorType.INT) // Projected, in reader
+        .add("a", MinorType.INT)    // Projected, in reader
         .add("d", MinorType.BIGINT) // Projected, not in reader
         .add("e", MinorType.BIGINT) // Not projected, not in reader
         .buildSchema();
@@ -159,10 +164,9 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
     outputSchema.metadata("e").setDefaultValue("30");
 
     BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
-    // Project (schema + reader), (reader only), (schema only), (neither)
     builder.setProjection(RowSetTestUtils.projectAll());
     builder.addReader(new MockSimpleReader());
-    builder.builder.setOutputSchema(outputSchema);
+    builder.builder.typeConverterBuilder().providedSchema(outputSchema);
     builder.builder.setNullType(Types.optional(MinorType.VARCHAR));
     ScanFixture scanFixture = builder.build();
     ScanOperatorExec scan = scanFixture.scanOp;
@@ -203,7 +207,7 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
   @Test
   public void testStrictOutputSchemaWithWildcard() {
     TupleMetadata outputSchema = new SchemaBuilder()
-        .add("a", MinorType.INT) // Projected, in reader
+        .add("a", MinorType.INT)    // Projected, in reader
         .add("d", MinorType.BIGINT) // Projected, not in reader
         .add("e", MinorType.BIGINT) // Not projected, not in reader
         .buildSchema();
@@ -212,10 +216,10 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
     outputSchema.setProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, Boolean.TRUE.toString());
 
     BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
-    // Project (schema + reader), (reader only), (schema only), (neither)
+    // Project schema only
     builder.setProjection(RowSetTestUtils.projectAll());
     builder.addReader(new MockSimpleReader());
-    builder.builder.setOutputSchema(outputSchema);
+    builder.builder.typeConverterBuilder().providedSchema(outputSchema);
     builder.builder.setNullType(Types.optional(MinorType.VARCHAR));
     ScanFixture scanFixture = builder.build();
     ScanOperatorExec scan = scanFixture.scanOp;
@@ -250,4 +254,55 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
     assertFalse(scan.next());
     scanFixture.close();
   }
+
+  @Test
+  public void testStrictOutputSchemaWithWildcardAndSpecialCols() {
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)    // Projected, in reader
+        .add("d", MinorType.BIGINT) // Projected, not in reader
+        .add("e", MinorType.BIGINT) // Not projected, not in reader
+        .buildSchema();
+    outputSchema.metadata("d").setDefaultValue("20");
+    outputSchema.metadata("e").setDefaultValue("30");
+    outputSchema.setProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, Boolean.TRUE.toString());
+    outputSchema.metadata("a").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+
+    BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
+    // Project schema only
+    builder.setProjection(RowSetTestUtils.projectAll());
+    builder.addReader(new MockSimpleReader());
+    builder.builder.typeConverterBuilder().providedSchema(outputSchema);
+    builder.builder.setNullType(Types.optional(MinorType.VARCHAR));
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("d", MinorType.BIGINT)
+        .add("e", MinorType.BIGINT)
+         .buildSchema();
+
+    // Initial schema
+
+    assertTrue(scan.buildSchema());
+    {
+      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+           .build();
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+    }
+
+    // Batch with defaults and null types
+
+    assertTrue(scan.next());
+    {
+      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+          .addRow(20L, 30L)
+          .build();
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+    }
+
+    assertFalse(scan.next());
+    scanFixture.close();
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
index 193095a..d90efb3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
@@ -34,9 +34,9 @@ import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetUtilities;
@@ -403,7 +403,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     // Verify that unprojected column is unprojected in the
     // table loader.
 
-    assertFalse(loader.writer().column("b").schema().isProjected());
+    assertFalse(loader.writer().column("b").isProjected());
 
     BatchSchema expectedSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
@@ -463,8 +463,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
     // table loader.
 
     assertTrue(loader.isProjectionEmpty());
-    assertFalse(loader.writer().column("a").schema().isProjected());
-    assertFalse(loader.writer().column("b").schema().isProjected());
+    assertFalse(loader.writer().column("a").isProjected());
+    assertFalse(loader.writer().column("b").isProjected());
 
     // Verify empty batch.
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
index a803bfc..18d9ac1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.scan.project;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -29,14 +30,15 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedSchemaColumn;
 import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedWildcardColumn;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionType;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet.ColumnReadProjection;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
+import org.apache.drill.exec.physical.rowSet.project.ProjectionType;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.SubOperatorTest;
@@ -68,7 +70,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
         ScanTestUtils.parsers());
 
     assertFalse(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
+    assertFalse(scanProj.isEmptyProjection());
 
     assertEquals(3, scanProj.requestedCols().size());
     assertEquals("a", scanProj.requestedCols().get(0).rootName());
@@ -91,11 +93,18 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertNotNull(outputProj.get("a"));
     assertTrue(outputProj.get("a").isSimple());
 
-    RequestedTuple readerProj = scanProj.readerProjection();
-    assertEquals(3, readerProj.projections().size());
-    assertNotNull(readerProj.get("a"));
-    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a"));
-    assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("d"));
+    // Make up a reader schema and test the projection set.
+
+    TupleMetadata readerSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .add("c", MinorType.INT)
+        .add("d", MinorType.INT)
+        .buildSchema();
+
+    ProjectionSet projSet = scanProj.projectionSet().build();
+    assertTrue(projSet.readProjection(readerSchema.metadata("a")).isProjected());
+    assertFalse(projSet.readProjection(readerSchema.metadata("d")).isProjected());
   }
 
   /**
@@ -106,12 +115,16 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
   @Test
   public void testMap() {
+
+    // SELECT a.x, b.x, a.y, b.y, c
+    // We infer a and b are maps.
+
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
         ScanTestUtils.parsers());
 
     assertFalse(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
+    assertFalse(scanProj.isEmptyProjection());
 
     assertEquals(3, scanProj.columns().size());
     assertEquals("a", scanProj.columns().get(0).name());
@@ -122,12 +135,12 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
     assertTrue(scanProj.columns().get(0) instanceof UnresolvedColumn);
 
-    // Map structure
+    // Inferred map structure
 
     final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
     assertTrue(a.isTuple());
-    assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("x"));
-    assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("y"));
+    assertEquals(ProjectionType.GENERAL, a.mapProjection().projectionType("x"));
+    assertEquals(ProjectionType.GENERAL, a.mapProjection().projectionType("y"));
     assertEquals(ProjectionType.UNPROJECTED,  a.mapProjection().projectionType("z"));
 
     final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element();
@@ -140,12 +153,33 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertNotNull(outputProj.get("a"));
     assertTrue(outputProj.get("a").isTuple());
 
-    RequestedTuple readerProj = scanProj.readerProjection();
-    assertEquals(3, readerProj.projections().size());
-    assertNotNull(readerProj.get("a"));
-    assertEquals(ProjectionType.TUPLE, readerProj.projectionType("a"));
-    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("c"));
-    assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("d"));
+    // Make up a reader schema and test the projection set.
+
+    TupleMetadata readerSchema = new SchemaBuilder()
+        .addMap("a")
+          .add("x", MinorType.INT)
+          .add("y", MinorType.INT)
+          .resumeSchema()
+        .addMap("b")
+          .add("x", MinorType.INT)
+          .add("y", MinorType.INT)
+          .resumeSchema()
+        .add("c", MinorType.INT)
+        .add("d", MinorType.INT)
+        .buildSchema();
+
+    // Verify the projection set as if we were a reader. Note that the
+    // projection type is used here for testing; should not be used by
+    // an actual reader.
+
+    ProjectionSet projSet = scanProj.projectionSet().build();
+    ColumnReadProjection aProj = projSet.readProjection(readerSchema.metadata("a"));
+    assertTrue(aProj.isProjected());
+    assertEquals(ProjectionType.TUPLE, aProj.projectionType());
+    ColumnReadProjection cProj = projSet.readProjection(readerSchema.metadata("c"));
+    assertTrue(cProj.isProjected());
+    assertEquals(ProjectionType.GENERAL, cProj.projectionType());
+    assertFalse(projSet.readProjection(readerSchema.metadata("d")).isProjected());
   }
 
   /**
@@ -160,7 +194,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
         ScanTestUtils.parsers());
 
     assertFalse(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
+    assertFalse(scanProj.isEmptyProjection());
 
     assertEquals(1, scanProj.columns().size());
     assertEquals("a", scanProj.columns().get(0).name());
@@ -185,11 +219,18 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertNotNull(outputProj.get("a"));
     assertTrue(outputProj.get("a").isArray());
 
-    RequestedTuple readerProj = scanProj.readerProjection();
-    assertEquals(1, readerProj.projections().size());
-    assertNotNull(readerProj.get("a"));
-    assertEquals(ProjectionType.ARRAY, readerProj.projectionType("a"));
-    assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("c"));
+    // Make up a reader schema and test the projection set.
+
+    TupleMetadata readerSchema = new SchemaBuilder()
+        .addArray("a", MinorType.INT)
+        .add("c", MinorType.INT)
+        .buildSchema();
+
+    ProjectionSet projSet = scanProj.projectionSet().build();
+    ColumnReadProjection aProj = projSet.readProjection(readerSchema.metadata("a"));
+    assertTrue(aProj.isProjected());
+    assertEquals(ProjectionType.ARRAY, aProj.projectionType());
+    assertFalse(projSet.readProjection(readerSchema.metadata("c")).isProjected());
   }
 
   /**
@@ -204,7 +245,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
         ScanTestUtils.parsers());
 
     assertTrue(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
+    assertFalse(scanProj.isEmptyProjection());
     assertEquals(1, scanProj.requestedCols().size());
     assertTrue(scanProj.requestedCols().get(0).isDynamicStar());
 
@@ -223,12 +264,19 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
     RequestedTuple outputProj = scanProj.rootProjection();
     assertEquals(1, outputProj.projections().size());
-    assertNotNull(outputProj.get("**"));
-    assertTrue(outputProj.get("**").isWildcard());
+    assertNotNull(outputProj.get(SchemaPath.DYNAMIC_STAR));
+    assertTrue(outputProj.get(SchemaPath.DYNAMIC_STAR).isWildcard());
+
+    // Make up a reader schema and test the projection set.
 
-    RequestedTuple readerProj = scanProj.readerProjection();
-    assertTrue(readerProj instanceof ImpliedTupleRequest);
-    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a"));
+    TupleMetadata readerSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("c", MinorType.INT)
+        .buildSchema();
+
+    ProjectionSet projSet = scanProj.projectionSet().build();
+    assertTrue(projSet.readProjection(readerSchema.metadata("a")).isProjected());
+    assertTrue(projSet.readProjection(readerSchema.metadata("c")).isProjected());
   }
 
   /**
@@ -243,7 +291,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
         ScanTestUtils.parsers());
 
     assertFalse(scanProj.projectAll());
-    assertTrue(scanProj.projectNone());
+    assertTrue(scanProj.isEmptyProjection());
     assertEquals(0, scanProj.requestedCols().size());
 
     // Verify tuple projection
@@ -251,9 +299,14 @@ public class TestScanLevelProjection extends SubOperatorTest {
     RequestedTuple outputProj = scanProj.rootProjection();
     assertEquals(0, outputProj.projections().size());
 
-    RequestedTuple readerProj = scanProj.readerProjection();
-    assertTrue(readerProj instanceof ImpliedTupleRequest);
-    assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("a"));
+    // Make up a reader schema and test the projection set.
+
+    TupleMetadata readerSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+
+    ProjectionSet projSet = scanProj.projectionSet().build();
+    assertFalse(projSet.readProjection(readerSchema.metadata("a")).isProjected());
   }
 
   /**
@@ -269,7 +322,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
           ScanTestUtils.parsers());
 
     assertTrue(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
+    assertFalse(scanProj.isEmptyProjection());
     assertEquals(2, scanProj.requestedCols().size());
     assertEquals(1, scanProj.columns().size());
 
@@ -277,14 +330,20 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
     RequestedTuple outputProj = scanProj.rootProjection();
     assertEquals(2, outputProj.projections().size());
-    assertNotNull(outputProj.get("**"));
-    assertTrue(outputProj.get("**").isWildcard());
+    assertNotNull(outputProj.get(SchemaPath.DYNAMIC_STAR));
+    assertTrue(outputProj.get(SchemaPath.DYNAMIC_STAR).isWildcard());
     assertNotNull(outputProj.get("a"));
 
-    RequestedTuple readerProj = scanProj.readerProjection();
-    assertTrue(readerProj instanceof ImpliedTupleRequest);
-    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a"));
-    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("c"));
+    // Make up a reader schema and test the projection set.
+
+    TupleMetadata readerSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("c", MinorType.INT)
+        .buildSchema();
+
+    ProjectionSet projSet = scanProj.projectionSet().build();
+    assertTrue(projSet.readProjection(readerSchema.metadata("a")).isProjected());
+    assertTrue(projSet.readProjection(readerSchema.metadata("c")).isProjected());
   }
 
   /**
@@ -298,7 +357,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
           ScanTestUtils.parsers());
 
     assertTrue(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
+    assertFalse(scanProj.isEmptyProjection());
     assertEquals(2, scanProj.requestedCols().size());
     assertEquals(1, scanProj.columns().size());
   }
@@ -355,15 +414,42 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertEquals(ScanProjectionType.SCHEMA_WILDCARD, scanProj.projectionType());
 
     assertEquals(2, scanProj.columns().size());
-    assertEquals("a", scanProj.columns().get(0).name());
-    assertTrue(scanProj.columns().get(0) instanceof UnresolvedSchemaColumn);
-    assertEquals("b", scanProj.columns().get(1).name());
-    assertTrue(scanProj.columns().get(1) instanceof UnresolvedSchemaColumn);
+    ColumnProjection aCol = scanProj.columns().get(0);
+    assertEquals("a", aCol.name());
+    assertTrue(aCol instanceof UnresolvedColumn);
+    assertSame(outputSchema.metadata("a"), ((UnresolvedColumn) aCol).metadata());
+    ColumnProjection bCol = scanProj.columns().get(1);
+    assertEquals("b", bCol.name());
+    assertTrue(bCol instanceof UnresolvedColumn);
+    assertSame(outputSchema.metadata("b"), ((UnresolvedColumn) bCol).metadata());
+
+    ProjectionSet projSet = scanProj.projectionSet().build();
+    assertTrue(projSet.readProjection(outputSchema.metadata("a")).isProjected());
+    assertTrue(projSet.readProjection(outputSchema.metadata("b")).isProjected());
+  }
+
+  @Test
+  public void testOutputSchemaWildcardSpecialCols() {
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.BIGINT)
+        .add("c", MinorType.VARCHAR)
+        .buildSchema();
+
+    // Mark b as special; not expanded in wildcard.
+
+    outputSchema.metadata("b").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+
+    final ScanLevelProjection scanProj = ScanLevelProjection.build(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers(),
+        outputSchema);
 
-    RequestedTuple readerProj = scanProj.readerProjection();
-    assertEquals(2, readerProj.projections().size());
-    assertEquals(ProjectionType.SCALAR, readerProj.projectionType("a"));
-    assertEquals(ProjectionType.SCALAR, readerProj.projectionType("b"));
+    assertEquals(ScanProjectionType.SCHEMA_WILDCARD, scanProj.projectionType());
+
+    assertEquals(2, scanProj.columns().size());
+    assertEquals("a", scanProj.columns().get(0).name());
+    assertEquals("c", scanProj.columns().get(1).name());
   }
 
   /**
@@ -387,13 +473,21 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
     assertEquals(2, scanProj.columns().size());
     assertEquals("a", scanProj.columns().get(0).name());
-    assertTrue(scanProj.columns().get(0) instanceof UnresolvedSchemaColumn);
+    assertTrue(scanProj.columns().get(0) instanceof UnresolvedColumn);
     assertEquals("b", scanProj.columns().get(1).name());
-    assertTrue(scanProj.columns().get(1) instanceof UnresolvedSchemaColumn);
+    assertTrue(scanProj.columns().get(1) instanceof UnresolvedColumn);
+
+    // Make up a reader schema and test the projection set.
+
+    TupleMetadata readerSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .add("c", MinorType.INT)
+        .buildSchema();
 
-    RequestedTuple readerProj = scanProj.readerProjection();
-    assertEquals(2, readerProj.projections().size());
-    assertEquals(ProjectionType.SCALAR, readerProj.projectionType("a"));
-    assertEquals(ProjectionType.SCALAR, readerProj.projectionType("b"));
+    ProjectionSet projSet = scanProj.projectionSet().build();
+    assertTrue(projSet.readProjection(readerSchema.metadata("a")).isProjected());
+    assertTrue(projSet.readProjection(readerSchema.metadata("b")).isProjected());
+    assertFalse(projSet.readProjection(readerSchema.metadata("c")).isProjected());
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
new file mode 100644
index 0000000..b2c5de2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.TypeConverter.CustomTypeTransform;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet.ColumnReadProjection;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.physical.rowSet.project.ProjectionType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+import org.apache.drill.exec.vector.accessor.convert.ConvertStringToInt;
+import org.apache.drill.exec.vector.accessor.convert.StandardConversions;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test the projection set used by the result set loader as
+ * columns are added. The projection set combines information from
+ * the SELECT (project) list, from an optional provided schema, and
+ * from an optional type converter to decide whether a particular
+ * new column should be projected or not, and if so, is any type
+ * conversion is needed.
+ * <p>
+ * The code and tests here keep the result set loader simple: it just
+ * asks a question about projection and gets an answer, the complexity
+ * of projection should be fully tested here, then just sanity tested
+ * in the result set loader.
+ */
+
+@Category(RowSetTests.class)
+public class TestProjectionSet {
+
+  /**
+   * Empty projection, no schema
+   */
+
+  @Test
+  public void testEmptyProjection() {
+    ProjectionSet projSet = ProjectionSetFactory.projectNone();
+
+    TupleMetadata readSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .addMap("m")
+          .add("b", MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+
+    ColumnMetadata aSchema = readSchema.metadata("a");
+    ColumnReadProjection aCol = projSet.readProjection(aSchema);
+    assertFalse(aCol.isProjected());
+
+    // Verify properties of an unprojected column
+
+    assertSame(aSchema, aCol.readSchema());
+    assertSame(aSchema, aCol.providedSchema());
+    assertNull(aCol.conversionFactory());
+    assertSame(EmptyProjectionSet.PROJECT_NONE, aCol.mapProjection());
+    assertNull(aCol.projectionType());
+
+    ColumnReadProjection mCol = projSet.readProjection(readSchema.metadata("m"));
+    assertFalse(mCol.isProjected());
+
+    ColumnReadProjection bCol = mCol.mapProjection().readProjection(
+        readSchema.metadata("m").mapSchema().metadata("b"));
+    assertFalse(bCol.isProjected());
+  }
+
+  /**
+   * Wildcard projection, no schema
+   */
+
+  @Test
+  public void testWildcardProjection() {
+    ProjectionSet projSet = ProjectionSetFactory.projectAll();
+
+    TupleMetadata readSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .buildSchema();
+
+    ColumnMetadata aSchema = readSchema.metadata("a");
+    ColumnReadProjection aCol = projSet.readProjection(aSchema);
+    assertTrue(aCol.isProjected());
+    assertSame(aSchema, aCol.readSchema());
+    assertSame(aSchema, aCol.providedSchema());
+    assertNull(aCol.conversionFactory());
+    assertNull(aCol.mapProjection());
+    assertNull(aCol.projectionType());
+  }
+
+  /**
+   * Wildcard projection, no schema
+   */
+
+  @Test
+  public void testWildcardMapProjection() {
+    ProjectionSet projSet = ProjectionSetFactory.projectAll();
+
+    TupleMetadata readSchema = new SchemaBuilder()
+        .addMap("m")
+          .add("b", MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+
+    ColumnReadProjection mCol = projSet.readProjection(readSchema.metadata("m"));
+    assertTrue(mCol.isProjected());
+
+    ColumnReadProjection bCol = mCol.mapProjection().readProjection(
+        readSchema.metadata("m").mapSchema().metadata("b"));
+    assertTrue(bCol.isProjected());
+  }
+
+  /**
+   * Wildcard projection, with schema. Some columns marked
+   * as special; not expanded by wildcard.
+   */
+
+  @Test
+  public void testWildcardAndSchemaProjection() {
+    TupleMetadata readSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .add("c", MinorType.INT)
+        .add("d", MinorType.INT)
+        .buildSchema();
+    readSchema.metadata("b").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("c", MinorType.INT)
+        .add("d", MinorType.INT)
+        .buildSchema();
+    outputSchema.metadata("c").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+
+    TypeConverter converter = TypeConverter.builder()
+        .providedSchema(outputSchema)
+        .build();
+
+    ProjectionSet projSet = new WildcardProjectionSet(converter);
+
+    ColumnReadProjection aCol = projSet.readProjection(readSchema.metadata("a"));
+    assertTrue(aCol.isProjected());
+    assertSame(outputSchema.metadata("a"), aCol.providedSchema());
+    assertNotNull(aCol.conversionFactory());
+
+    // Column b marked as special by reader
+
+    ColumnReadProjection bCol = projSet.readProjection(readSchema.metadata("b"));
+    assertFalse(bCol.isProjected());
+    assertSame(readSchema.metadata("b"), bCol.providedSchema());
+    assertNull(bCol.conversionFactory());
+
+    // Column c marked as special by provided schema
+
+    ColumnReadProjection cCol = projSet.readProjection(readSchema.metadata("c"));
+    assertFalse(cCol.isProjected());
+    assertSame(readSchema.metadata("c"), cCol.providedSchema());
+    assertNull(cCol.conversionFactory());
+
+    // Column d needs no conversion
+
+    ColumnReadProjection dCol = projSet.readProjection(readSchema.metadata("d"));
+    assertTrue(dCol.isProjected());
+    assertSame(outputSchema.metadata("d"), dCol.providedSchema());
+    assertNull(dCol.conversionFactory());
+  }
+
+  /**
+   * Wildcard projection, with schema. Some columns marked
+   * as special; not expanded by wildcard.
+   */
+
+  @Test
+  public void testWildcardAndSchemaMapProjection() {
+    TupleMetadata readSchema = new SchemaBuilder()
+        .addMap("m")
+          .add("e", MinorType.VARCHAR)
+          .add("f", MinorType.VARCHAR)
+          .add("g", MinorType.VARCHAR)
+          .add("h", MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    TupleMetadata mReadSchema = readSchema.metadata("m").mapSchema();
+    mReadSchema.metadata("f").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .addMap("m")
+          .add("e", MinorType.INT)
+          .add("f", MinorType.VARCHAR)
+          .add("g", MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+    TupleMetadata mOutputSchema = outputSchema.metadata("m").mapSchema();
+    mOutputSchema.metadata("g").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+
+    TypeConverter converter = TypeConverter.builder()
+        .providedSchema(outputSchema)
+        .build();
+
+    ProjectionSet projSet = new WildcardProjectionSet(converter);
+
+    // Column m is a map
+
+    ColumnReadProjection mCol = projSet.readProjection(readSchema.metadata("m"));
+    assertTrue(mCol.isProjected());
+    assertSame(outputSchema.metadata("m"), mCol.providedSchema());
+    assertNull(mCol.conversionFactory());
+    ProjectionSet mProj = mCol.mapProjection();
+
+    // Column m.e requires conversion
+
+    ColumnReadProjection eCol = mProj.readProjection(mReadSchema.metadata("e"));
+    assertTrue(eCol.isProjected());
+    assertSame(mReadSchema.metadata("e"), eCol.readSchema());
+    assertSame(mOutputSchema.metadata("e"), eCol.providedSchema());
+    assertNotNull(eCol.conversionFactory());
+
+    // Column m.f marked as special by reader
+
+    ColumnReadProjection fCol = mProj.readProjection(mReadSchema.metadata("f"));
+    assertFalse(fCol.isProjected());
+
+    // Column m.g marked as special by provided schema
+
+    ColumnReadProjection gCol = mProj.readProjection(mReadSchema.metadata("g"));
+    assertFalse(gCol.isProjected());
+
+    // Column m.h needs no conversion
+
+    ColumnReadProjection hCol = mProj.readProjection(mReadSchema.metadata("h"));
+    assertTrue(hCol.isProjected());
+    assertSame(mReadSchema.metadata("h"), hCol.providedSchema());
+    assertNull(hCol.conversionFactory());
+  }
+
+  /**
+   * Wildcard and strict schema
+   */
+
+  @Test
+  public void testWildcardAndStrictSchemaProjection() {
+    TupleMetadata readSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .add("c", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    outputSchema.setBooleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, true);
+
+    TypeConverter converter = TypeConverter.builder()
+        .providedSchema(outputSchema)
+        .build();
+
+    ProjectionSet projSet = new WildcardProjectionSet(converter);
+
+    ColumnReadProjection aCol = projSet.readProjection(readSchema.metadata("a"));
+    assertTrue(aCol.isProjected());
+    assertSame(outputSchema.metadata("a"), aCol.providedSchema());
+    assertNotNull(aCol.conversionFactory());
+
+    // Column b not in provided schema
+
+    ColumnReadProjection bCol = projSet.readProjection(readSchema.metadata("b"));
+    assertFalse(bCol.isProjected());
+    assertSame(readSchema.metadata("b"), bCol.providedSchema());
+    assertNull(bCol.conversionFactory());
+  }
+
+  /**
+   * Wildcard and strict schema
+   */
+
+  @Test
+  public void testWildcardAndStrictMapSchemaProjection() {
+    TupleMetadata readSchema = new SchemaBuilder()
+        .addMap("m")
+          .add("c", MinorType.INT)
+          .add("d", MinorType.VARCHAR)
+          .resumeSchema()
+        .addMap("m2")
+          .add("e", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    TupleMetadata mReadSchema = readSchema.metadata("m").mapSchema();
+    TupleMetadata m2ReadSchema = readSchema.metadata("m2").mapSchema();
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .addMap("m")
+          .add("c", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    outputSchema.setBooleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, true);
+    TupleMetadata mOutputSchema = outputSchema.metadata("m").mapSchema();
+
+    TypeConverter converter = TypeConverter.builder()
+        .providedSchema(outputSchema)
+        .build();
+
+    ProjectionSet projSet = new WildcardProjectionSet(converter);
+
+    // Column m is a map in provided schema
+
+    ColumnReadProjection mCol = projSet.readProjection(readSchema.metadata("m"));
+    assertTrue(mCol.isProjected());
+    assertSame(outputSchema.metadata("m"), mCol.providedSchema());
+    assertNull(mCol.conversionFactory());
+    ProjectionSet mProj = mCol.mapProjection();
+
+    // Column m.c is in the provided schema
+
+    ColumnReadProjection cCol = mProj.readProjection(mReadSchema.metadata("c"));
+    assertTrue(cCol.isProjected());
+    assertSame(mOutputSchema.metadata("c"), cCol.providedSchema());
+    assertNull(cCol.conversionFactory());
+
+    // Column m.d is not in the provided schema
+
+    ColumnReadProjection dCol = mProj.readProjection(mReadSchema.metadata("d"));
+    assertFalse(dCol.isProjected());
+
+    // Column m2, a map, is not in the provided schema
+
+    ColumnReadProjection m2Col = projSet.readProjection(mReadSchema.metadata("d"));
+    assertFalse(m2Col.isProjected());
+    ProjectionSet m2Proj = mCol.mapProjection();
+
+    // Since m2 is not in the provided schema, its members are not projected.
+
+    ColumnReadProjection eCol = m2Proj.readProjection(m2ReadSchema.metadata("e"));
+    assertFalse(eCol.isProjected());
+  }
+
+  /**
+   * Test explicit projection without a provided schema.
+   * Also, sanity test of the builder for the project all,
+   * project none cases.
+   */
+
+  @Test
+  public void testExplicitProjection() {
+    TupleMetadata readSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    ColumnMetadata aSchema = readSchema.metadata("a");
+
+    ProjectionSet projSet = ProjectionSetFactory.build(
+        RowSetTestUtils.projectList("a"));
+
+    ColumnReadProjection aCol = projSet.readProjection(aSchema);
+    assertTrue(aCol.isProjected());
+    assertSame(aSchema, aCol.readSchema());
+    assertSame(aSchema, aCol.providedSchema());
+    assertNull(aCol.conversionFactory());
+    assertNull(aCol.mapProjection());
+    assertEquals(ProjectionType.GENERAL, aCol.projectionType());
+
+    ColumnReadProjection bCol = projSet.readProjection(readSchema.metadata("b"));
+    assertFalse(bCol.isProjected());
+  }
+
+  @Test
+  public void testExplicitMapProjection() {
+
+    // Schema to allow us to use three kinds of map projection
+
+    TupleMetadata readSchema = new SchemaBuilder()
+        .addMap("m1")
+          .add("c", MinorType.INT)
+          .add("d", MinorType.VARCHAR)
+          .resumeSchema()
+        .addMap("m2")
+          .add("e", MinorType.INT)
+          .resumeSchema()
+        .addMap("m3")
+          .add("f", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+
+    ColumnMetadata m1Schema = readSchema.metadata("m1");
+    ColumnMetadata m2Schema = readSchema.metadata("m2");
+    ColumnMetadata m3Schema = readSchema.metadata("m3");
+    TupleMetadata m1ReadSchema = m1Schema.mapSchema();
+    TupleMetadata m2ReadSchema = m2Schema.mapSchema();
+    TupleMetadata m3ReadSchema = m3Schema.mapSchema();
+
+    // Project one member of map m1, all of m2, none of m3
+
+    ProjectionSet projSet = ProjectionSetFactory.build(
+        RowSetTestUtils.projectList("m1.c", "m2"));
+
+    // Verify that m1 is projected as a tuple
+
+    ColumnReadProjection m1Col = projSet.readProjection(m1Schema);
+    assertTrue(m1Col.isProjected());
+    assertSame(m1Schema, m1Col.readSchema());
+    assertSame(m1Schema, m1Col.providedSchema());
+    assertNull(m1Col.conversionFactory());
+    assertEquals(ProjectionType.TUPLE, m1Col.projectionType());
+
+    // m1.c is projected
+
+    ColumnReadProjection cCol = m1Col.mapProjection().readProjection(m1ReadSchema.metadata("c"));
+    assertTrue(cCol.isProjected());
+    assertEquals(ProjectionType.GENERAL, cCol.projectionType());
+
+    // but m1.d is not projected
+
+    assertFalse(m1Col.mapProjection().readProjection(m1ReadSchema.metadata("d")).isProjected());
+
+    // m2 is entirely projected
+
+    ColumnReadProjection m2Col = projSet.readProjection(m2Schema);
+    assertEquals(ProjectionType.GENERAL, m2Col.projectionType());
+    assertTrue(m2Col.isProjected());
+    assertSame(m2Schema, m2Col.readSchema());
+    assertSame(m2Schema, m2Col.providedSchema());
+    assertNull(m2Col.conversionFactory());
+    assertTrue(m2Col.mapProjection() instanceof WildcardProjectionSet);
+    assertEquals(ProjectionType.GENERAL, m2Col.projectionType());
+    assertTrue(m2Col.mapProjection().readProjection(m2ReadSchema.metadata("e")).isProjected());
+
+    // m3 is not projected at all
+
+    ColumnReadProjection m3Col = projSet.readProjection(m3Schema);
+    assertFalse(m3Col.isProjected());
+    assertFalse(m3Col.mapProjection().readProjection(m3ReadSchema.metadata("f")).isProjected());
+  }
+
+
+  @Test
+  public void testExplicitRedundantMapProjection() {
+
+    // Schema to allow us to use three kinds of map projection
+
+    TupleMetadata readSchema = new SchemaBuilder()
+        .addMap("m1")
+          .add("c", MinorType.INT)
+          .add("d", MinorType.VARCHAR)
+          .resumeSchema()
+        .buildSchema();
+
+    ColumnMetadata m1Schema = readSchema.metadata("m1");
+    TupleMetadata m1ReadSchema = m1Schema.mapSchema();
+
+    // Project one member of map1, all of map2, none of map3
+
+    ProjectionSet projSet = ProjectionSetFactory.build(
+        RowSetTestUtils.projectList("m1.c", "m1"));
+
+    // Verify that m1 is projected as a tuple
+
+    ColumnReadProjection m1Col = projSet.readProjection(m1Schema);
+    assertTrue(m1Col.isProjected());
+    assertEquals(ProjectionType.TUPLE, m1Col.projectionType());
+
+    // M1.c is projected
+
+    ColumnReadProjection cCol = m1Col.mapProjection().readProjection(m1ReadSchema.metadata("c"));
+    assertTrue(cCol.isProjected());
+
+    // M1.d is also projected because m1 was projected as a whole
+
+    assertTrue(m1Col.mapProjection().readProjection(m1ReadSchema.metadata("d")).isProjected());
+  }
+
+  /**
+   * Explicit projection with implied wildcard projection of the map.
+   * That is, SELECT m is logically equivalent to SELECT m.*
+   * and is subject to the strict schema projection rule.
+   */
+
+  @Test
+  public void testImpliedWildcardWithStrictSchema() {
+    TupleMetadata readSchema = new SchemaBuilder()
+        .addMap("m")
+          .add("a", MinorType.INT)
+          .add("b", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+
+    ColumnMetadata mSchema = readSchema.metadata("m");
+    TupleMetadata mReadSchema = mSchema.mapSchema();
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .addMap("m")
+          .add("a", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+
+    outputSchema.setBooleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, true);
+
+    ProjectionSet projSet = new ProjectionSetBuilder()
+        .typeConverter(TypeConverter.builder()
+            .providedSchema(outputSchema)
+            .build())
+        .projectionList(RowSetTestUtils.projectList("m"))
+        .build();
+
+    ColumnReadProjection mCol = projSet.readProjection(mSchema);
+    assertTrue(mCol.isProjected());
+    ProjectionSet mProj = mCol.mapProjection();
+    assertTrue(mProj.readProjection(mReadSchema.metadata("a")).isProjected());
+    assertFalse(mProj.readProjection(mReadSchema.metadata("b")).isProjected());
+  }
+
+  /**
+   * Explicit projection of three forms: wildcard, explicit, none.
+   * Wildcard and none already tested above, here we test the
+   * builder. With schema.
+   */
+
+  @Test
+  public void testExplicitSchemaProjection() {
+    TupleMetadata readSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+    outputSchema.setBooleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, true);
+
+    TypeConverter converter = TypeConverter.builder()
+        .providedSchema(outputSchema)
+        .build();
+
+    ColumnMetadata readColSchema = readSchema.metadata("a");
+
+    // Project all
+
+    ProjectionSet projSet = new ProjectionSetBuilder()
+        .typeConverter(converter)
+        .build();
+
+    ColumnReadProjection col = projSet.readProjection(readColSchema);
+    assertTrue(col.isProjected());
+    assertSame(outputSchema.metadata("a"), col.providedSchema());
+    assertNotNull(col.conversionFactory());
+
+    // Project none
+
+    projSet = new ProjectionSetBuilder()
+        .typeConverter(converter)
+        .projectionList(new ArrayList<>())
+        .build();
+
+    col = projSet.readProjection(readColSchema);
+    assertFalse(col.isProjected());
+
+    // Project some]
+
+   projSet = new ProjectionSetBuilder()
+        .typeConverter(converter)
+        .projectionList(RowSetTestUtils.projectList("a"))
+        .build();
+
+    col = projSet.readProjection(readColSchema);
+    assertTrue(col.isProjected());
+    assertSame(readColSchema, col.readSchema());
+    assertSame(outputSchema.metadata("a"), col.providedSchema());
+    assertNotNull(col.conversionFactory());
+
+    assertFalse(projSet.readProjection(readSchema.metadata("b")).isProjected());
+  }
+
+  /**
+   * Wildcard projection, no schema, custom column transform.
+   */
+
+  @Test
+  public void testTransformConversion() {
+    ColumnConversionFactory conv = StandardConversions.factory(ConvertStringToInt.class);
+    CustomTypeTransform customTransform = ProjectionSetFactory.simpleTransform(conv);
+    TypeConverter typeConverter = TypeConverter.builder()
+        .transform(customTransform)
+        .build();
+
+    ProjectionSet projSet = new WildcardProjectionSet(typeConverter);
+
+    TupleMetadata readSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .buildSchema();
+
+    ColumnMetadata readColSchema = readSchema.metadata("a");
+    ColumnReadProjection col = projSet.readProjection(readColSchema);
+    assertTrue(col.isProjected());
+    assertSame(conv, col.conversionFactory());
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderEmptyProject.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderEmptyProject.java
index 9150bc1..89c6ddc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderEmptyProject.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderEmptyProject.java
@@ -27,18 +27,19 @@ import java.util.List;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetFactory;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 @Category(RowSetTests.class)
 public class TestResultSetLoaderEmptyProject extends SubOperatorTest {
@@ -57,7 +58,7 @@ public class TestResultSetLoaderEmptyProject extends SubOperatorTest {
         .add("b", MinorType.INT)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -115,7 +116,7 @@ public class TestResultSetLoaderEmptyProject extends SubOperatorTest {
         .add("d", MinorType.INT)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -138,7 +139,7 @@ public class TestResultSetLoaderEmptyProject extends SubOperatorTest {
         .add("b", MinorType.INT)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -173,7 +174,7 @@ public class TestResultSetLoaderEmptyProject extends SubOperatorTest {
           .resumeSchema()
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -214,7 +215,7 @@ public class TestResultSetLoaderEmptyProject extends SubOperatorTest {
           .resumeSchema()
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
index 035ae75..9a9afff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
@@ -17,9 +17,9 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
 import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
-import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -33,6 +33,7 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetFactory;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
@@ -42,6 +43,7 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
@@ -71,7 +73,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
         .add("d", MinorType.INT)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -83,7 +85,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
   public void testProjectionDynamic() {
     List<SchemaPath> selection = RowSetTestUtils.projectList("c", "b", "e");
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
     RowSetLoader rootWriter = rsLoader.writer();
@@ -110,12 +112,12 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
     assertEquals(3, actualSchema.index("d"));
     assertEquals(-1, actualSchema.index("e"));
 
-    // Non-projected columns identify themselves via metadata
+    // Non-projected columns identify themselves
 
-    assertFalse(actualSchema.metadata("a").isProjected());
-    assertTrue(actualSchema.metadata("b").isProjected());
-    assertTrue(actualSchema.metadata("c").isProjected());
-    assertFalse(actualSchema.metadata("d").isProjected());
+    assertFalse(rootWriter.column("a").isProjected());
+    assertTrue(rootWriter.column("b").isProjected());
+    assertTrue(rootWriter.column("c").isProjected());
+    assertFalse(rootWriter.column("d").isProjected());
 
     // Write some data. Doesn't need much.
 
@@ -156,7 +158,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
         .addArray("a3", MinorType.INT)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -165,17 +167,14 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
     // Verify the projected columns
 
     TupleMetadata actualSchema = rootWriter.tupleSchema();
-    ColumnMetadata a1Md = actualSchema.metadata("a1");
-    assertTrue(a1Md.isArray());
-    assertTrue(a1Md.isProjected());
+    assertTrue(actualSchema.metadata("a1").isArray());
+    assertTrue(rootWriter.column("a1").isProjected());
 
-    ColumnMetadata a2Md = actualSchema.metadata("a2");
-    assertTrue(a2Md.isArray());
-    assertTrue(a2Md.isProjected());
+    assertTrue(actualSchema.metadata("a2").isArray());
+    assertTrue(rootWriter.column("a2").isProjected());
 
-    ColumnMetadata a3Md = actualSchema.metadata("a3");
-    assertTrue(a3Md.isArray());
-    assertFalse(a3Md.isProjected());
+    assertTrue(actualSchema.metadata("a3").isArray());
+    assertFalse(rootWriter.column("a3").isProjected());
 
     // Write a couple of rows.
 
@@ -217,7 +216,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
           .resumeSchema()
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -227,25 +226,28 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
 
     TupleMetadata actualSchema = rootWriter.tupleSchema();
     ColumnMetadata m1Md = actualSchema.metadata("m1");
+    TupleWriter m1Writer = rootWriter.tuple("m1");
     assertTrue(m1Md.isMap());
-    assertTrue(m1Md.isProjected());
+    assertTrue(m1Writer.isProjected());
     assertEquals(2, m1Md.mapSchema().size());
-    assertTrue(m1Md.mapSchema().metadata("a").isProjected());
-    assertTrue(m1Md.mapSchema().metadata("b").isProjected());
+    assertTrue(m1Writer.column("a").isProjected());
+    assertTrue(m1Writer.column("b").isProjected());
 
     ColumnMetadata m2Md = actualSchema.metadata("m2");
+    TupleWriter m2Writer = rootWriter.tuple("m2");
     assertTrue(m2Md.isMap());
-    assertTrue(m2Md.isProjected());
+    assertTrue(m2Writer.isProjected());
     assertEquals(2, m2Md.mapSchema().size());
-    assertFalse(m2Md.mapSchema().metadata("c").isProjected());
-    assertTrue(m2Md.mapSchema().metadata("d").isProjected());
+    assertFalse(m2Writer.column("c").isProjected());
+    assertTrue(m2Writer.column("d").isProjected());
 
     ColumnMetadata m3Md = actualSchema.metadata("m3");
+    TupleWriter m3Writer = rootWriter.tuple("m3");
     assertTrue(m3Md.isMap());
-    assertFalse(m3Md.isProjected());
+    assertFalse(m3Writer.isProjected());
     assertEquals(2, m3Md.mapSchema().size());
-    assertFalse(m3Md.mapSchema().metadata("e").isProjected());
-    assertFalse(m3Md.mapSchema().metadata("f").isProjected());
+    assertFalse(m3Writer.column("e").isProjected());
+    assertFalse(m3Writer.column("f").isProjected());
 
     // Write a couple of rows.
 
@@ -276,7 +278,14 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
 
   @Test
   public void testMapProjectionMemberAndMap() {
+
+    // SELECT m1, m1.b
+    // This really means project all of m1; m1.b is along for the ride.
+
     List<SchemaPath> selection = RowSetTestUtils.projectList("m1", "m1.b");
+
+    // Define an "early" reader schema consistent with the projection.
+
     TupleMetadata schema = new SchemaBuilder()
         .addMap("m1")
           .add("a", MinorType.INT)
@@ -284,7 +293,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
           .resumeSchema()
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -294,11 +303,12 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
 
     TupleMetadata actualSchema = rootWriter.tupleSchema();
     ColumnMetadata m1Md = actualSchema.metadata("m1");
+    TupleWriter m1Writer = rootWriter.tuple("m1");
     assertTrue(m1Md.isMap());
-    assertTrue(m1Md.isProjected());
+    assertTrue(m1Writer.isProjected());
     assertEquals(2, m1Md.mapSchema().size());
-    assertTrue(m1Md.mapSchema().metadata("a").isProjected());
-    assertTrue(m1Md.mapSchema().metadata("b").isProjected());
+    assertTrue(m1Writer.column("a").isProjected());
+    assertTrue(m1Writer.column("b").isProjected());
 
     // Write a couple of rows.
 
@@ -343,7 +353,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
           .resumeSchema()
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -401,7 +411,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
         .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
@@ -463,7 +473,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
         .add("col", MinorType.VARCHAR)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     try {
@@ -481,7 +491,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
         .add("col", MinorType.VARCHAR)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     try {
@@ -499,7 +509,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
         .add("col", MinorType.VARCHAR)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     try {
@@ -517,7 +527,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
         .addArray("col", MinorType.VARCHAR)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     try {
@@ -535,7 +545,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
         .addArray("col", MinorType.VARCHAR)
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     try {
@@ -555,7 +565,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
           .resumeSchema()
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     try {
@@ -576,7 +586,7 @@ public class TestResultSetLoaderProjection extends SubOperatorTest {
           .resumeSchema()
         .buildSchema();
     ResultSetOptions options = new OptionBuilder()
-        .setProjection(selection)
+        .setProjection(ProjectionSetFactory.build(selection))
         .setSchema(schema)
         .build();
     try {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java
index 76e8f96..673361d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTypeConversion.java
@@ -22,6 +22,10 @@ import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetFactory;
+import org.apache.drill.exec.physical.impl.scan.project.projSet.TypeConverter;
+import org.apache.drill.exec.physical.rowSet.ProjectionSet;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -66,11 +70,15 @@ public class TestResultSetLoaderTypeConversion extends SubOperatorTest {
     TestColumnConverter.setConverterProp(schema.metadata("n3"),
         TestColumnConverter.CONVERT_TO_INT);
 
-    SchemaTransformer schemaTransform = new DefaultSchemaTransformer(new ConverterFactory());
+   ProjectionSet projSet = new ProjectionSetBuilder()
+        .typeConverter(TypeConverter.builder()
+            .transform(ProjectionSetFactory.simpleTransform(new ConverterFactory()))
+            .build())
+        .build();
     ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
         .setSchema(schema)
         .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
-        .setSchemaTransform(schemaTransform)
+        .setProjection(projSet)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
     rsLoader.startBatch();
@@ -117,10 +125,13 @@ public class TestResultSetLoaderTypeConversion extends SubOperatorTest {
         .addArray("n3", MinorType.VARCHAR)
         .buildSchema();
 
+    ProjectionSet projSet = new ProjectionSetBuilder()
+        .outputSchema(outputSchema)
+        .build();
     ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
         .setSchema(inputSchema)
         .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
-        .setSchemaTransform(new SchemaTransformerImpl(outputSchema, null))
+        .setProjection(projSet)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
     rsLoader.startBatch();
@@ -167,10 +178,13 @@ public class TestResultSetLoaderTypeConversion extends SubOperatorTest {
         .add("n2", MinorType.VARCHAR)
         .buildSchema();
 
+    ProjectionSet projSet = new ProjectionSetBuilder()
+        .outputSchema(outputSchema)
+        .build();
     ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
         .setSchema(inputSchema)
         .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
-        .setSchemaTransform(new SchemaTransformerImpl(outputSchema, null))
+        .setProjection(projSet)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
     rsLoader.startBatch();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/project/TestProjectedTuple.java
similarity index 83%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/project/TestProjectedTuple.java
index 424ee6f..91bbee2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/project/TestProjectedTuple.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.rowSet.impl;
+package org.apache.drill.exec.physical.rowSet.project;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -30,14 +30,23 @@ import java.util.List;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
-import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.TupleProjectionType;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+/**
+ * Test the projection list parser: parses a list of SchemaPath
+ * items into a detailed structure, handling duplicate or overlapping
+ * items. Special cases the select-all (SELECT *) and select none
+ * (SELECT COUNT(*)) cases.
+ * <p>
+ * These tests should verify everything about (runtime) projection
+ * parsing; the only bits not tested here is that which is
+ * inherently specific to some use case.
+ */
+
 @Category(RowSetTests.class)
 public class TestProjectedTuple {
 
@@ -47,8 +56,13 @@ public class TestProjectedTuple {
     // Null map means everything is projected
 
     RequestedTuple projSet = RequestedTupleImpl.parse(null);
+    assertEquals(TupleProjectionType.ALL, projSet.type());
+    // Not defined well; the tuple contains a wildcard
+    // assertEquals(ProjectionType.GENERAL, projSet.projectionType("foo"));
+
+    projSet = ImpliedTupleRequest.ALL_MEMBERS;
     assertTrue(projSet instanceof ImpliedTupleRequest);
-    assertEquals(ProjectionType.UNSPECIFIED, projSet.projectionType("foo"));
+    assertEquals(ProjectionType.GENERAL, projSet.projectionType("foo"));
   }
 
   /**
@@ -62,6 +76,7 @@ public class TestProjectedTuple {
     // Empty list means nothing is projected
 
     RequestedTuple projSet = RequestedTupleImpl.parse(new ArrayList<SchemaPath>());
+    assertEquals(TupleProjectionType.NONE, projSet.type());
     assertTrue(projSet instanceof ImpliedTupleRequest);
     List<RequestedColumn> cols = projSet.projections();
     assertEquals(0, cols.size());
@@ -76,8 +91,8 @@ public class TestProjectedTuple {
     RequestedTuple projSet = RequestedTupleImpl.parse(
         RowSetTestUtils.projectList("a", "b", "c"));
     assertTrue(projSet instanceof RequestedTupleImpl);
-    assertEquals(ProjectionType.UNSPECIFIED, projSet.projectionType("a"));
-    assertEquals(ProjectionType.UNSPECIFIED, projSet.projectionType("b"));
+    assertEquals(ProjectionType.GENERAL, projSet.projectionType("a"));
+    assertEquals(ProjectionType.GENERAL, projSet.projectionType("b"));
     assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("d"));
 
     List<RequestedColumn> cols = projSet.projections();
@@ -85,18 +100,24 @@ public class TestProjectedTuple {
 
     RequestedColumn a = cols.get(0);
     assertEquals("a", a.name());
-    assertEquals(ProjectionType.UNSPECIFIED, a.type());
+    assertEquals(ProjectionType.GENERAL, a.type());
     assertTrue(a.isSimple());
     assertFalse(a.isWildcard());
-    assertNull(a.mapProjection());
+
+    // We don't know if a is a map or not (the simple term "a" under-determines
+    // the column type.) In case it is a map, we assume all of the map is
+    // projected.
+
+    assertNotNull(a.mapProjection());
+    assertEquals(TupleProjectionType.ALL, a.mapProjection().type());
     assertNull(a.indexes());
 
     assertEquals("b", cols.get(1).name());
-    assertEquals(ProjectionType.UNSPECIFIED, cols.get(1).type());
+    assertEquals(ProjectionType.GENERAL, cols.get(1).type());
     assertTrue(cols.get(1).isSimple());
 
     assertEquals("c", cols.get(2).name());
-    assertEquals(ProjectionType.UNSPECIFIED, cols.get(2).type());
+    assertEquals(ProjectionType.GENERAL, cols.get(2).type());
     assertTrue(cols.get(2).isSimple());
   }
 
@@ -112,12 +133,12 @@ public class TestProjectedTuple {
     RequestedTuple projSet = RequestedTupleImpl.parse(projCols);
 
     assertTrue(projSet instanceof RequestedTupleImpl);
-    assertEquals(ProjectionType.UNSPECIFIED, projSet.projectionType("map"));
+    assertEquals(ProjectionType.GENERAL, projSet.projectionType("map"));
     assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("another"));
     RequestedTuple mapProj = projSet.mapProjection("map");
     assertNotNull(mapProj);
     assertTrue(mapProj instanceof ImpliedTupleRequest);
-    assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("foo"));
+    assertEquals(ProjectionType.GENERAL, mapProj.projectionType("foo"));
     assertNotNull(projSet.mapProjection("another"));
     assertEquals(ProjectionType.UNPROJECTED, projSet.mapProjection("another").projectionType("anyCol"));
   }
@@ -140,8 +161,8 @@ public class TestProjectedTuple {
 
     RequestedTuple mapProj = projSet.mapProjection("map");
     assertTrue(mapProj instanceof RequestedTupleImpl);
-    assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("a"));
-    assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("b"));
+    assertEquals(ProjectionType.GENERAL, mapProj.projectionType("a"));
+    assertEquals(ProjectionType.GENERAL, mapProj.projectionType("b"));
     assertEquals(ProjectionType.TUPLE, mapProj.projectionType("map2"));
     assertEquals(ProjectionType.UNPROJECTED, mapProj.projectionType("bogus"));
 
@@ -150,14 +171,14 @@ public class TestProjectedTuple {
     RequestedTuple bMapProj = mapProj.mapProjection("b");
     assertNotNull(bMapProj);
     assertTrue(bMapProj instanceof ImpliedTupleRequest);
-    assertEquals(ProjectionType.UNSPECIFIED, bMapProj.projectionType("foo"));
+    assertEquals(ProjectionType.GENERAL, bMapProj.projectionType("foo"));
 
     // Map2, an nested map, has an explicit projection
 
     RequestedTuple map2Proj = mapProj.mapProjection("map2");
     assertNotNull(map2Proj);
     assertTrue(map2Proj instanceof RequestedTupleImpl);
-    assertEquals(ProjectionType.UNSPECIFIED, map2Proj.projectionType("x"));
+    assertEquals(ProjectionType.GENERAL, map2Proj.projectionType("x"));
     assertEquals(ProjectionType.UNPROJECTED, map2Proj.projectionType("bogus"));
   }
 
@@ -177,11 +198,11 @@ public class TestProjectedTuple {
 
       RequestedTuple mapProj = projSet.mapProjection("map");
       assertTrue(mapProj instanceof ImpliedTupleRequest);
-      assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("a"));
+      assertEquals(ProjectionType.GENERAL, mapProj.projectionType("a"));
 
       // Didn't ask for b, but did ask for whole map.
 
-      assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("b"));
+      assertEquals(ProjectionType.GENERAL, mapProj.projectionType("b"));
     }
 
     // Now the other way around.
@@ -197,8 +218,8 @@ public class TestProjectedTuple {
 
       RequestedTuple mapProj = projSet.mapProjection("map");
       assertTrue(mapProj instanceof ImpliedTupleRequest);
-      assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("a"));
-      assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("b"));
+      assertEquals(ProjectionType.GENERAL, mapProj.projectionType("a"));
+      assertEquals(ProjectionType.GENERAL, mapProj.projectionType("b"));
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/project/TestProjectionType.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/project/TestProjectionType.java
new file mode 100644
index 0000000..98de0c8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/project/TestProjectionType.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(RowSetTests.class)
+public class TestProjectionType {
+
+  @Test
+  public void testQueries() {
+    assertFalse(ProjectionType.UNPROJECTED.isTuple());
+    assertFalse(ProjectionType.WILDCARD.isTuple());
+    assertFalse(ProjectionType.GENERAL.isTuple());
+    assertFalse(ProjectionType.SCALAR.isTuple());
+    assertTrue(ProjectionType.TUPLE.isTuple());
+    assertFalse(ProjectionType.ARRAY.isTuple());
+    assertTrue(ProjectionType.TUPLE_ARRAY.isTuple());
+
+    assertFalse(ProjectionType.UNPROJECTED.isArray());
+    assertFalse(ProjectionType.WILDCARD.isArray());
+    assertFalse(ProjectionType.GENERAL.isArray());
+    assertFalse(ProjectionType.SCALAR.isArray());
+    assertFalse(ProjectionType.TUPLE.isArray());
+    assertTrue(ProjectionType.ARRAY.isArray());
+    assertTrue(ProjectionType.TUPLE_ARRAY.isArray());
+
+    assertFalse(ProjectionType.UNPROJECTED.isMaybeScalar());
+    assertFalse(ProjectionType.WILDCARD.isMaybeScalar());
+    assertTrue(ProjectionType.GENERAL.isMaybeScalar());
+    assertTrue(ProjectionType.SCALAR.isMaybeScalar());
+    assertFalse(ProjectionType.TUPLE.isMaybeScalar());
+    assertFalse(ProjectionType.ARRAY.isMaybeScalar());
+    assertFalse(ProjectionType.TUPLE_ARRAY.isMaybeScalar());
+  }
+
+  @Test
+  public void testLabel() {
+
+    // Only worry about the types that could conflict and thus
+    // would show up in error messages.
+
+    assertEquals(ProjectionType.UNPROJECTED.name(), ProjectionType.UNPROJECTED.label());
+    assertEquals("wildcard (*)", ProjectionType.WILDCARD.label());
+    assertEquals(ProjectionType.GENERAL.name(), ProjectionType.GENERAL.label());
+    assertEquals("scalar (a)", ProjectionType.SCALAR.label());
+    assertEquals("tuple (a.x)", ProjectionType.TUPLE.label());
+    assertEquals("array (a[n])", ProjectionType.ARRAY.label());
+    assertEquals("tuple array (a[n].x)", ProjectionType.TUPLE_ARRAY.label());
+  }
+
+  @Test
+  public void testTypeFor() {
+
+    // Test the return of the projection type most specific
+    // for a data type. The projection type under-specifies
+    // the data type, but is a hint.
+
+    assertEquals(ProjectionType.TUPLE, ProjectionType.typeFor(Types.required(MinorType.MAP)));
+    assertEquals(ProjectionType.TUPLE_ARRAY, ProjectionType.typeFor(Types.repeated(MinorType.MAP)));
+    assertEquals(ProjectionType.ARRAY, ProjectionType.typeFor(Types.repeated(MinorType.INT)));
+    assertEquals(ProjectionType.ARRAY, ProjectionType.typeFor(Types.required(MinorType.LIST)));
+    assertEquals(ProjectionType.SCALAR, ProjectionType.typeFor(Types.required(MinorType.INT)));
+  }
+
+  @Test
+  public void testCompatibility() {
+
+    // Only SCALAR, TUPLE, ARRAY and TUPLE_ARRAY are expected for the
+    // argument, but we check all cases for completeness.
+    // Note that the cases are not always symmetrical:
+    // a map array column is compatible with a map projection,
+    // but a map column is not compatible with a map array projection.
+
+    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.UNPROJECTED));
+    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.WILDCARD));
+    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.GENERAL));
+    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.SCALAR));
+    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.TUPLE));
+    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.ARRAY));
+    assertTrue(ProjectionType.UNPROJECTED.isCompatible(ProjectionType.TUPLE_ARRAY));
+
+    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.UNPROJECTED));
+    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.WILDCARD));
+    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.GENERAL));
+    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.SCALAR));
+    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.TUPLE));
+    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.ARRAY));
+    assertTrue(ProjectionType.WILDCARD.isCompatible(ProjectionType.TUPLE_ARRAY));
+
+    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.UNPROJECTED));
+    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.WILDCARD));
+    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.GENERAL));
+    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.SCALAR));
+    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.TUPLE));
+    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.ARRAY));
+    assertTrue(ProjectionType.GENERAL.isCompatible(ProjectionType.TUPLE_ARRAY));
+
+    assertTrue(ProjectionType.SCALAR.isCompatible(ProjectionType.UNPROJECTED));
+    assertTrue(ProjectionType.SCALAR.isCompatible(ProjectionType.WILDCARD));
+    assertTrue(ProjectionType.SCALAR.isCompatible(ProjectionType.GENERAL));
+    assertTrue(ProjectionType.SCALAR.isCompatible(ProjectionType.SCALAR));
+    assertFalse(ProjectionType.SCALAR.isCompatible(ProjectionType.TUPLE));
+    assertFalse(ProjectionType.SCALAR.isCompatible(ProjectionType.ARRAY));
+    assertFalse(ProjectionType.SCALAR.isCompatible(ProjectionType.TUPLE_ARRAY));
+
+    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.UNPROJECTED));
+    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.WILDCARD));
+    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.GENERAL));
+    assertFalse(ProjectionType.TUPLE.isCompatible(ProjectionType.SCALAR));
+    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.TUPLE));
+    assertFalse(ProjectionType.TUPLE.isCompatible(ProjectionType.ARRAY));
+    assertTrue(ProjectionType.TUPLE.isCompatible(ProjectionType.TUPLE_ARRAY));
+
+    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.UNPROJECTED));
+    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.WILDCARD));
+    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.GENERAL));
+    assertFalse(ProjectionType.ARRAY.isCompatible(ProjectionType.SCALAR));
+    assertFalse(ProjectionType.ARRAY.isCompatible(ProjectionType.TUPLE));
+    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.ARRAY));
+    assertTrue(ProjectionType.ARRAY.isCompatible(ProjectionType.TUPLE_ARRAY));
+
+    assertTrue(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.UNPROJECTED));
+    assertTrue(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.WILDCARD));
+    assertTrue(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.GENERAL));
+    assertFalse(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.SCALAR));
+    assertFalse(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.TUPLE));
+    assertFalse(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.ARRAY));
+    assertTrue(ProjectionType.TUPLE_ARRAY.isCompatible(ProjectionType.TUPLE_ARRAY));
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestMetadataProperties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestMetadataProperties.java
index 0ac3638..4834ae7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestMetadataProperties.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestMetadataProperties.java
@@ -159,26 +159,6 @@ public class TestMetadataProperties {
   }
 
   @Test
-  public void testProjected() {
-    PrimitiveColumnMetadata col = new PrimitiveColumnMetadata("c", MinorType.INT, DataMode.OPTIONAL);
-    AbstractPropertied props = col;
-    assertTrue(col.isProjected());
-    col.setProjected(true);
-    assertTrue(col.isProjected());
-
-    // Projected is the default, so no properties set.
-    assertFalse(props.hasProperties());
-
-    col.setProjected(false);
-    assertFalse(col.isProjected());
-    assertTrue(props.hasProperties());
-
-    // Sanity check that the expected prop was set
-
-    assertEquals("false", col.property(ColumnMetadata.PROJECTED_PROP));
-  }
-
-  @Test
   public void testFormat() {
     PrimitiveColumnMetadata col = new PrimitiveColumnMetadata("c", MinorType.INT, DataMode.OPTIONAL);
     AbstractPropertied props = col;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
index 1b7efb2..3aa5584 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
@@ -161,6 +161,4 @@ public class BaseCsvTest extends ClusterTest {
     resetV3();
     resetSchema();
   }
-
->>>>>>> ea212504f... DRILL-7279: Enable provided schema for text files without headers
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
index b98f82d..9b5d109 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
@@ -17,6 +17,11 @@
  */
 package org.apache.drill.exec.store.log;
 
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.BatchSchema;
@@ -31,15 +36,14 @@ import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
+// Log reader now hosted on the row set framework
+@Category(RowSetTests.class)
 public class TestLogReader extends ClusterTest {
 
   public static final String DATE_ONLY_PATTERN = "(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) .*";
@@ -92,7 +96,6 @@ public class TestLogReader extends ClusterTest {
     logConfig.getSchema().add( new LogFormatField("module"));
     logConfig.getSchema().add( new LogFormatField("message"));
 
-
     //Set up additional configs to check the time/date formats
     LogFormatConfig logDateConfig = new LogFormatConfig();
     logDateConfig.setExtension("log2");
@@ -111,7 +114,6 @@ public class TestLogReader extends ClusterTest {
     mysqlLogConfig.setExtension("sqllog");
     mysqlLogConfig.setRegex("(\\d{6})\\s(\\d{2}:\\d{2}:\\d{2})\\s+(\\d+)\\s(\\w+)\\s+(.+)");
 
-
     // Define a temporary format plugin for the "cp" storage plugin.
     Drillbit drillbit = cluster.drillbit();
     final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
@@ -122,7 +124,6 @@ public class TestLogReader extends ClusterTest {
     pluginConfig.getFormats().put("date-log",logDateConfig);
     pluginConfig.getFormats().put( "mysql-log", mysqlLogConfig);
     pluginRegistry.createOrUpdate("cp", pluginConfig, false);
-
   }
 
   @Test
@@ -142,7 +143,7 @@ public class TestLogReader extends ClusterTest {
         .addRow(2017, 12, 19)
         .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
@@ -150,12 +151,6 @@ public class TestLogReader extends ClusterTest {
     String sql = "SELECT * FROM cp.`regex/large.log1`";
     List<QueryDataBatch> batches = client.queryBuilder().sql(sql).results();
 
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .addNullable("year", MinorType.INT)
-        .addNullable("month", MinorType.INT)
-        .addNullable("day", MinorType.INT)
-        .build();
-
     for (QueryDataBatch queryDataBatch : batches) {
       queryDataBatch.release();
     }
@@ -179,7 +174,7 @@ public class TestLogReader extends ClusterTest {
 
 //    results.print();
 //    expected.print();
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
@@ -201,7 +196,7 @@ public class TestLogReader extends ClusterTest {
 
 //    results.print();
 //    expected.print();
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
@@ -218,10 +213,9 @@ public class TestLogReader extends ClusterTest {
         .addRow("2017-12-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.")
         .addRow("2017-12-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.")
         .build();
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
-
   @Test
   public void testDate() throws RpcException {
     String sql = "SELECT TYPEOF(`entry_date`) AS entry_date FROM cp.`regex/simple.log2` LIMIT 1";
@@ -235,8 +229,7 @@ public class TestLogReader extends ClusterTest {
         .addRow("TIMESTAMP")
         .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(results);
-
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
@@ -249,7 +242,21 @@ public class TestLogReader extends ClusterTest {
   @Test
   public void testFull() throws RpcException {
     String sql = "SELECT * FROM cp.`regex/simple.log1`";
-    client.queryBuilder().sql(sql).printCsv();
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("year", MinorType.INT)
+        .addNullable("month", MinorType.INT)
+        .addNullable("day", MinorType.INT)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(2017, 12, 17)
+        .addRow(2017, 12, 18)
+        .addRow(2017, 12, 19)
+        .build();
+
+    RowSetUtilities.verify(expected, results);
   }
 
   //This section tests log queries without a defined schema
@@ -276,7 +283,7 @@ public class TestLogReader extends ClusterTest {
 
     //results.print();
     //expected.print();
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
@@ -300,7 +307,7 @@ public class TestLogReader extends ClusterTest {
         .addRow("070917", "16:29:12", "21", "Query","select * from location where id = 1 LIMIT 1" )
         .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
@@ -321,7 +328,7 @@ public class TestLogReader extends ClusterTest {
         .addRow("070917", "select * from location where id = 1 LIMIT 1" )
         .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
@@ -341,13 +348,14 @@ public class TestLogReader extends ClusterTest {
         .addRow("070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1" )
         .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
   public void testUMNoSchema() throws RpcException {
     String sql = "SELECT _unmatched_rows FROM cp.`regex/mysql.sqllog`";
     RowSet results = client.queryBuilder().sql(sql).rowSet();
+    results.print();
 
     BatchSchema expectedSchema = new SchemaBuilder()
         .addNullable("_unmatched_rows", MinorType.VARCHAR)
@@ -357,7 +365,7 @@ public class TestLogReader extends ClusterTest {
         .addRow("dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl")
         .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
 
   @Test
@@ -379,7 +387,6 @@ public class TestLogReader extends ClusterTest {
         .addRow( null, "dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl")
         .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(results);
+    RowSetUtilities.verify(expected, results);
   }
-
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestDummyWriter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestDummyWriter.java
index 5f8dd16..a59b83c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestDummyWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestDummyWriter.java
@@ -18,10 +18,12 @@
 package org.apache.drill.test.rowSet.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -33,7 +35,9 @@ import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
 import org.apache.drill.exec.vector.accessor.writer.MapWriter;
 import org.apache.drill.test.SubOperatorTest;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+@Category(RowSetTests.class)
 public class TestDummyWriter extends SubOperatorTest {
 
   /**
@@ -130,15 +134,9 @@ public class TestDummyWriter extends SubOperatorTest {
         .buildSchema();
     List<AbstractObjectWriter> writers = new ArrayList<>();
 
-    // Mark schema as non-projected
-
-    schema.metadata("m1").setProjected(false);
-    schema.metadata("m2").setProjected(false);
-
     // Create the writers
 
     {
-      schema.metadata("m1").setProjected(false);
       TupleMetadata mapSchema = schema.metadata("m1").mapSchema();
       List<AbstractObjectWriter> members = new ArrayList<>();
       members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("a"), null, null));
@@ -147,7 +145,6 @@ public class TestDummyWriter extends SubOperatorTest {
     }
 
     {
-      schema.metadata("m2").setProjected(false);
       TupleMetadata mapSchema = schema.metadata("m2").mapSchema();
       List<AbstractObjectWriter> members = new ArrayList<>();
       members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("c"), null, null));
@@ -161,6 +158,15 @@ public class TestDummyWriter extends SubOperatorTest {
     rootWriter.startWrite();
     rootWriter.startRow();
 
+    // Nothing is projected
+
+    assertFalse(rootWriter.tuple("m1").isProjected());
+    assertFalse(rootWriter.tuple("m1").scalar("a").isProjected());
+    assertFalse(rootWriter.tuple("m1").array("b").isProjected());
+    assertFalse(rootWriter.array("m2").isProjected());
+    assertFalse(rootWriter.array("m2").tuple().isProjected());
+    assertFalse(rootWriter.array("m2").tuple().scalar("c").isProjected());
+
     // Dummy columns seem real.
 
     rootWriter.tuple("m1").scalar("a").setInt(20);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
index 78eeefa..c77574a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
@@ -86,4 +86,13 @@ public class AbstractPropertied implements Propertied {
     String value = property(key);
     return value == null ? defaultValue : Boolean.parseBoolean(value);
   }
+
+  @Override
+  public void setBooleanProperty(String key, boolean value) {
+    if (value) {
+      setProperty(key, Boolean.toString(value));
+    } else {
+      setProperty(key, null);
+    }
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
index fda8fe9..c72de0b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
@@ -52,12 +52,6 @@ public interface ColumnMetadata extends Propertied {
   String FORMAT_PROP = DRILL_PROP_PREFIX + "format";
 
   /**
-   * Indicates if the column is projected. Used only for internal
-   * reader-provided schemas.
-   */
-  String PROJECTED_PROP = DRILL_PROP_PREFIX + "projected";
-
-  /**
    * Indicates how to handle blanks. Must be one of the valid values defined
    * in AbstractConvertFromString. Normally set on the converter by the plugin
    * rather than by the user in the schema.
@@ -65,6 +59,21 @@ public interface ColumnMetadata extends Propertied {
   String BLANK_AS_PROP = DRILL_PROP_PREFIX + "blank-as";
 
   /**
+   * Indicates whether to project the column in a wildcard (*) query.
+   * Special columns may be excluded from projection. Certain "special"
+   * columns may be available only when explicitly requested. For example,
+   * the log reader has a "_raw" column which includes the entire input
+   * line before parsing. This column can be requested explicitly:<br>
+   * <tt>SELECT foo, bar, _raw FROM ...</tt><br>
+   * but the column will <i>not</i> be included when using the wildcard:<br>
+   * <tt>SELECT * FROM ...</tt>
+   * <p>
+   * Marking a column (either in the provided schema or the reader schema)
+   * will prevent that column from appearing in a wildcard expansion.
+   */
+  String EXCLUDE_FROM_WILDCARD = DRILL_PROP_PREFIX + "special";
+
+  /**
    * Rough characterization of Drill types into metadata categories.
    * Various aspects of Drill's type system are very, very messy.
    * However, Drill is defined by its code, not some abstract design,
@@ -268,15 +277,6 @@ public interface ColumnMetadata extends Propertied {
 
   ColumnMetadata cloneEmpty();
 
-  /**
-   * Reports whether, in this context, the column is projected outside
-   * of the context. (That is, whether the column is backed by an actual
-   * value vector.)
-   */
-
-  boolean isProjected();
-  void setProjected(boolean projected);
-
   int precision();
   int scale();
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java
deleted file mode 100644
index 7b523ad..0000000
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ProjectionType.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.record.metadata;
-
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-
-public enum ProjectionType {
-  UNPROJECTED,
-  WILDCARD,     // *
-  UNSPECIFIED,  // x
-  SCALAR,       // x (from schema)
-  TUPLE,        // x.y
-  ARRAY,        // x[0]
-  TUPLE_ARRAY;  // x[0].y
-
-  public boolean isTuple() {
-    return this == ProjectionType.TUPLE || this == ProjectionType.TUPLE_ARRAY;
-  }
-
-  public boolean isArray() {
-    return this == ProjectionType.ARRAY || this == ProjectionType.TUPLE_ARRAY;
-  }
-
-  public boolean isMaybeScalar() {
-    return this == UNSPECIFIED || this == SCALAR;
-  }
-
-  public static ProjectionType typeFor(MajorType majorType) {
-    if (majorType.getMinorType() == MinorType.MAP) {
-      if (majorType.getMode() == DataMode.REPEATED) {
-        return TUPLE_ARRAY;
-      } else {
-        return TUPLE;
-      }
-    }
-    if (majorType.getMode() == DataMode.REPEATED) {
-      return ARRAY;
-    }
-    return SCALAR;
-  }
-
-  public boolean isCompatible(ProjectionType other) {
-    switch (other) {
-    case UNPROJECTED:
-    case UNSPECIFIED:
-    case WILDCARD:
-      return true;
-    default:
-      break;
-    }
-
-    switch (this) {
-    case ARRAY:
-    case TUPLE_ARRAY:
-      return other == ARRAY || other == TUPLE_ARRAY;
-    case SCALAR:
-      return other == SCALAR;
-    case TUPLE:
-      return other == TUPLE;
-    case UNPROJECTED:
-    case UNSPECIFIED:
-    case WILDCARD:
-      return true;
-    default:
-      throw new IllegalStateException(toString());
-    }
-  }
-
-  public String label() {
-    switch (this) {
-    case SCALAR:
-      return "scalar (a)";
-    case ARRAY:
-      return "array (a[n])";
-    case TUPLE:
-      return "tuple (a.x)";
-    case TUPLE_ARRAY:
-      return "tuple array (a[n].x)";
-    case WILDCARD:
-      return "wildcard (*)";
-    default:
-      return name();
-    }
-  }
-}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
index fe39338..c13adb3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
@@ -46,4 +46,6 @@ public interface Propertied {
   void setProperty(String key, String value);
   boolean booleanProperty(String key);
   boolean booleanProperty(String key, boolean defaultValue);
+  void setBooleanProperty(String key, boolean value);
 }
+
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
index 5117782..6466778 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
@@ -53,6 +53,16 @@ public interface ColumnWriter {
   boolean nullable();
 
   /**
+   * Whether this writer is projected (is backed by a materialized vector),
+   * or is unprojected (is just a dummy writer.) In most cases, clients can
+   * ignore whether the column is projected and just write to the writer.
+   * This flag handles those special cases where it is helpful to know if
+   * the column is projected or not.
+   */
+
+  boolean isProjected();
+
+  /**
    * Returns the schema of the column associated with this writer.
    *
    * @return schema for this writer's column
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
index 05ff2fc..2f5c5df 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.vector.accessor;
 
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 /**
@@ -70,20 +69,6 @@ public interface TupleWriter extends ColumnWriter {
   }
 
   /**
-   * Allows a client to "sniff" the projection set to determine if a
-   * field is projected. Some clients can omit steps if they know that
-   * a field is not needed. Others will simply create the column, allowing
-   * the implementation to create a dummy writer if the column is not
-   * projected.
-   *
-   * @param columnName name of an existing or new column
-   * @return whether the column is projected, and, if so, the implied
-   * type of the projected column
-   */
-
-  ProjectionType projectionType(String columnName);
-
-  /**
    * Add a column to the tuple (row or map) that backs this writer. Support for
    * this operation depends on whether the client code has registered a listener
    * to implement the addition. Throws an exception if no listener is
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
index f92eed6..a3f61e0 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
@@ -59,6 +59,11 @@ public abstract class AbstractWriteConverter extends AbstractScalarWriter {
   }
 
   @Override
+  public boolean isProjected() {
+    return baseWriter.isProjected();
+  }
+
+  @Override
   public boolean nullable() {
     return baseWriter.nullable();
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
index b8ec266..9d966f7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
@@ -322,6 +322,9 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
   public boolean nullable() { return false; }
 
   @Override
+  public boolean isProjected() { return true; }
+
+  @Override
   public void setNull() {
     throw new IllegalStateException("Not nullable");
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
index c4b5149..29bcef7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
@@ -79,6 +79,9 @@ public abstract class AbstractObjectWriter implements ObjectWriter {
   @Override
   public void setObject(Object value) { writer().setObject(value); }
 
+  @Override
+  public boolean isProjected() { return writer().isProjected(); }
+
   public abstract void dump(HierarchicalFormatter format);
 
   protected static ScalarWriter convertWriter(
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java
index 135e46a..0ae37c3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java
@@ -127,6 +127,9 @@ public abstract class AbstractScalarWriterImpl extends AbstractScalarWriter impl
   public void saveRow() { }
 
   @Override
+  public boolean isProjected() { return true; }
+
+  @Override
   public void dump(HierarchicalFormatter format) {
     format
       .startObject(this)
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 0a7c594..53a153f 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriter;
@@ -142,8 +141,6 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
     ObjectWriter addColumn(TupleWriter tuple, ColumnMetadata column);
 
     ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);
-
-    ProjectionType projectionType(String columnName);
   }
 
   protected final TupleMetadata tupleSchema;
@@ -207,12 +204,6 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
   }
 
   @Override
-  public ProjectionType projectionType(String columnName) {
-    return listener == null ? ProjectionType.UNSPECIFIED
-        : listener.projectionType(columnName);
-  }
-
-  @Override
   public int addColumn(ColumnMetadata column) {
     if (listener == null) {
       throw new UnsupportedOperationException("addColumn");
@@ -422,6 +413,9 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
   }
 
   @Override
+  public boolean isProjected() { return true; }
+
+  @Override
   public int lastWriteIndex() {
     return vectorIndex.vectorIndex();
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
index fd6e7c4..e9e26d9 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
@@ -106,6 +105,9 @@ public abstract class MapWriter extends AbstractTupleWriter {
       super.preRollover();
       mapVector.setMapValueCount(vectorIndex.rowStartIndex());
     }
+
+    @Override
+    public boolean isProjected() { return true; }
   }
 
   /**
@@ -133,6 +135,9 @@ public abstract class MapWriter extends AbstractTupleWriter {
 
       bindIndex(index, new MemberWriterIndex(index));
     }
+
+    @Override
+    public boolean isProjected() { return true; }
   }
 
   protected static class DummyMapWriter extends MapWriter {
@@ -143,7 +148,7 @@ public abstract class MapWriter extends AbstractTupleWriter {
     }
 
     @Override
-    public ProjectionType projectionType(String columnName) { return ProjectionType.UNPROJECTED; }
+    public boolean isProjected() { return false; }
   }
 
   protected static class DummyArrayMapWriter extends MapWriter {
@@ -154,7 +159,7 @@ public abstract class MapWriter extends AbstractTupleWriter {
     }
 
     @Override
-    public ProjectionType projectionType(String columnName) { return ProjectionType.UNPROJECTED; }
+    public boolean isProjected() { return false; }
   }
 
   protected final ColumnMetadata mapColumnSchema;
@@ -167,14 +172,13 @@ public abstract class MapWriter extends AbstractTupleWriter {
   public static TupleObjectWriter buildMap(ColumnMetadata schema, MapVector vector,
       List<AbstractObjectWriter> writers) {
     MapWriter mapWriter;
-    if (schema.isProjected()) {
+    if (vector != null) {
 
       // Vector is not required for a map writer; the map's columns
       // are written, but not the (non-array) map.
 
       mapWriter = new SingleMapWriter(schema, vector, writers);
     } else {
-      assert vector == null;
       mapWriter = new DummyMapWriter(schema, writers);
     }
     return new TupleObjectWriter(mapWriter);
@@ -184,16 +188,14 @@ public abstract class MapWriter extends AbstractTupleWriter {
       RepeatedMapVector mapVector,
       List<AbstractObjectWriter> writers) {
     MapWriter mapWriter;
-    if (schema.isProjected()) {
-      assert mapVector != null;
+    if (mapVector != null) {
       mapWriter = new ArrayMapWriter(schema, writers);
     } else {
-      assert mapVector == null;
       mapWriter = new DummyArrayMapWriter(schema, writers);
     }
     TupleObjectWriter mapArray = new TupleObjectWriter(mapWriter);
     AbstractArrayWriter arrayWriter;
-    if (schema.isProjected()) {
+    if (mapVector != null) {
       arrayWriter = new ObjectArrayWriter(schema,
           mapVector.getOffsetVector(),
           mapArray);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
index a308cea..4c599ce 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
@@ -268,6 +268,9 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents {
   }
 
   @Override
+  public boolean isProjected() { return true; }
+
+  @Override
   public void startWrite() {
     assert state == State.IDLE;
     state = State.IN_WRITE;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
index 3d64bb6..b5b01bb 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyArrayWriter.java
@@ -93,4 +93,7 @@ public class DummyArrayWriter extends AbstractArrayWriter {
 
   @Override
   public void bindIndex(ColumnWriterIndex index) { }
+
+  @Override
+  public boolean isProjected() { return false; }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
index 2d52c3e..d110d1b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
@@ -118,4 +118,7 @@ public class DummyScalarWriter extends AbstractScalarWriterImpl {
 
   @Override
   public void setDefaultValue(Object value) { }
+
+  @Override
+  public boolean isProjected() { return false; }
 }