You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2019/03/05 15:38:28 UTC

[drill] 01/02: DRILL-7074: Scan framework fixes and enhancements

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 7e3b45967dbb97da18ba49a2fa6a67a48e33b092
Author: Paul Rogers <pr...@cloudera.com>
AuthorDate: Sun Mar 3 17:55:22 2019 -0800

    DRILL-7074: Scan framework fixes and enhancements
    
    Roll-up of fixes an enhancements that emerged from the effort to host the CSV reader on the new framework.
    
    closes #1676
---
 .../drill/exec/physical/impl/scan/ReaderState.java |  26 +-
 .../exec/physical/impl/scan/RowBatchReader.java    |  20 +-
 .../physical/impl/scan/ScanOperatorEvents.java     |  42 +-
 .../impl/scan/columns/ColumnsArrayParser.java      |  52 +-
 .../impl/scan/file/FileMetadataColumnsParser.java  |  71 +-
 .../impl/scan/file/FileMetadataManager.java        |  34 +
 .../physical/impl/scan/file/FileScanFramework.java |  37 +-
 .../physical/impl/scan/file/MetadataColumn.java    |  13 +
 .../impl/scan/framework/AbstractScanFramework.java |  76 ++
 .../impl/scan/framework/SchemaNegotiator.java      |   6 +-
 .../impl/scan/framework/SchemaNegotiatorImpl.java  |   6 +-
 .../impl/scan/framework/ShimBatchReader.java       |  15 +-
 .../physical/impl/scan/framework/package-info.java |  26 +-
 .../exec/physical/impl/scan/package-info.java      |  85 +-
 .../scan/project/ExplicitSchemaProjection.java     |   1 +
 .../scan/project/ReaderSchemaOrchestrator.java     | 230 +++++
 .../impl/scan/project/ScanLevelProjection.java     |  66 +-
 .../impl/scan/project/ScanSchemaOrchestrator.java  | 241 +-----
 .../impl/scan/project/SchemaLevelProjection.java   |  11 +-
 .../physical/impl/scan/project/package-info.java   |  50 ++
 .../rowSet/project/ImpliedTupleRequest.java        |   2 +-
 .../rowSet/project/RequestedTupleImpl.java         |  44 +-
 .../exec/physical/impl/scan/TestColumnsArray.java  | 172 +++-
 .../impl/scan/TestColumnsArrayFramework.java       |  19 +-
 .../physical/impl/scan/TestColumnsArrayParser.java |  46 +-
 .../impl/scan/TestConstantColumnLoader.java        | 153 ----
 .../impl/scan/TestFileMetadataColumnParser.java    | 170 +++-
 .../impl/scan/TestFileMetadataProjection.java      |   4 +-
 .../physical/impl/scan/TestFileScanFramework.java  |  11 +-
 .../physical/impl/scan/TestNullColumnLoader.java   | 281 ------
 .../physical/impl/scan/TestRowBatchMerger.java     | 459 ----------
 .../physical/impl/scan/TestScanBatchWriters.java   |   3 +
 .../impl/scan/TestScanLevelProjection.java         | 223 -----
 .../physical/impl/scan/TestScanOperatorExec.java   |  11 +-
 .../impl/scan/TestScanOrchestratorEarlySchema.java | 255 ++----
 .../impl/scan/TestScanOrchestratorLateSchema.java  |   6 +-
 .../impl/scan/TestScanOrchestratorMetadata.java    |  77 +-
 .../impl/scan/TestSchemaLevelProjection.java       | 557 ------------
 .../physical/impl/scan/TestSchemaSmoothing.java    | 946 ---------------------
 .../scan/project/TestConstantColumnLoader.java     |  45 +
 .../impl/scan/project/TestScanLevelProjection.java | 113 ++-
 .../impl/scan/project/TestSchemaSmoothing.java     | 250 +++++-
 42 files changed, 1726 insertions(+), 3229 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index bbf12d4..b366d34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -238,8 +238,11 @@ class ReaderState {
 
   /**
    * Prepare the schema for this reader. Called for the first reader within a
-   * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>. If
-   * this is an early-schema reader, then the result set loader already has
+   * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>.
+   * Asks the reader if it can provide a schema-only empty batch by calling
+   * the reader's <tt>defineSchema()</tt> method. If this is an early-schema
+   * reader, and it can provide a schema, then it should create an empty
+   * batch so that the the result set loader already has
    * the proper value vectors set up. If this is a late-schema reader, we must
    * read one batch to get the schema, then set aside the data for the next
    * call to <tt>next()</tt>.
@@ -255,9 +258,10 @@ class ReaderState {
    * <li>If if turned out that the file was
    * empty when trying to read the schema, <tt>open()</tt> returned false
    * and this method should never be called.</tt>
-   * <li>Otherwise, if a schema was available, then the schema is already
-   * set up in the result set loader as the result of schema negotiation, and
-   * this method simply returns <tt>true</tt>.
+   * <li>Otherwise, the reader does not know if it is the first reader or
+   * not. The call to <tt>defineSchema()</tt> notifies the reader that it
+   * is the first one. The reader should set up in the result set loader
+   * with an empty batch.
    * </ul>
    * <p>
    * Semantics for late-schema readers:
@@ -280,14 +284,12 @@ class ReaderState {
 
   protected boolean buildSchema() {
 
-    VectorContainer container = reader.output();
-
-    if (container != null) {
+    if (reader.defineSchema()) {
 
       // Bind the output container to the output of the scan operator.
       // This returns an empty batch with the schema filled in.
 
-      scanOp.containerAccessor.setContainer(container);
+      scanOp.containerAccessor.setContainer(reader.output());
       schemaVersion = reader.schemaVersion();
       return true;
     }
@@ -297,7 +299,8 @@ class ReaderState {
     if (! next()) {
       return false;
     }
-    container = reader.output();
+    VectorContainer container = reader.output();
+    schemaVersion = reader.schemaVersion();
     if (container.getRecordCount() == 0) {
       return true;
     }
@@ -374,8 +377,7 @@ class ReaderState {
 
   private boolean readBatch() {
 
-    // Try to read a batch. This may fail. If so, clean up the
-    // mess.
+    // Try to read a batch. This may fail. If so, clean up the mess.
 
     boolean more;
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
index c0985b5..61de584 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
@@ -84,8 +84,6 @@ import org.apache.drill.exec.record.VectorContainer;
 
 public interface RowBatchReader {
 
-  enum Result { OK, LAST_BATCH, EOF }
-
   /**
    * Name used when reporting errors. Can simply be the class name.
    *
@@ -111,6 +109,22 @@ public interface RowBatchReader {
   boolean open();
 
   /**
+   * Called for the first reader within a scan. Allows the reader to
+   * provide an empty batch with only the schema filled in. Readers that
+   * are "early schema" (know the schema up front) should return true
+   * and create an empty batch. Readers that are "late schema" should
+   * return false. In that case, the scan operator will ask the reader
+   * to load an actual data batch, and infer the schema from that batch.
+   * <p>
+   * This step is optional and is purely for performance.
+   *
+   * @return true if this reader can (and has) defined an empty batch
+   * to describe the schema, false otherwise
+   */
+
+  boolean defineSchema();
+
+  /**
    * Read the next batch. Reading continues until either EOF,
    * or until the mutator indicates that the batch is full.
    * The batch is considered valid if it is non-empty. Returning
@@ -129,7 +143,7 @@ public interface RowBatchReader {
    * <tt>next()</tt> should be called again, <tt>false</tt> to indicate
    * that EOF was reached
    *
-   * @throws RutimeException (<tt>UserException</tt> preferred) if an
+   * @throws RuntimeException (<tt>UserException</tt> preferred) if an
    * error occurs that should fail the query.
    */
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
index 9e17414..04b2c7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
@@ -20,15 +20,23 @@ package org.apache.drill.exec.physical.impl.scan;
 import org.apache.drill.exec.ops.OperatorContext;
 
 /**
- * Interface to the set of readers, and reader schema, that the
- * scan operator manages. The reader factory creates and returns
- * the readers to use for the scan, as determined by the specific
- * physical plan. The reader factory also
- * translates from the select list provided
- * in the physical plan to the actual columns returned from the
- * scan operator. The translation is reader-specific; this
- * interface allows the scan operator to trigger various
- * lifecycle events.
+ * Interface to the set of readers, and reader schema, that the scan operator
+ * manages. The reader factory creates and returns the readers to use for the
+ * scan, as determined by the specific physical plan. The reader factory also
+ * translates from the select list provided in the physical plan to the actual
+ * columns returned from the scan operator. The translation is reader-specific;
+ * this interface allows the scan operator to trigger various lifecycle events.
+ * <p>
+ * This interface decouples the scan implementation from the generic tasks
+ * needed to implement Drill's Volcano iterator protocol for operators, and
+ * Drill's schema and batch semantics. A scan implementation need only
+ * implement this interface to add plugin-specific scan behavior.
+ * <p>
+ * While this interface allows a wide variety of implementations, the intent is
+ * that most actual scanners will use the "managed" framework that handles the
+ * routine projection, vector management and other tasks that tend to be common
+ * across scanners. See {@link ScanSchemaOrchestrator} for the managed
+ * framework.
  */
 
 public interface ScanOperatorEvents {
@@ -46,11 +54,25 @@ public interface ScanOperatorEvents {
 
   void bind(OperatorContext context);
 
+  /**
+   * A scanner typically readers multiple data sources (such as files or
+   * file blocks.) A batch reader handles each read. This method returns
+   * the next reader in whatever sequence that this scan defines.
+   * <p>
+   * The preferred implementation is to create each batch reader in this
+   * call to minimize resource usage. Production queries may read
+   * thousands of files or blocks, so incremental reader creation can be
+   * far more efficient than creating readers at the start of the scan.
+   *
+   * @return a batch reader for one of the scan elements within the
+   * scan physical plan for this scan operator
+   */
+
   RowBatchReader nextReader();
 
   /**
    * Called when the scan operator itself is closed. Indicates that no more
-   * readers are available (or will be opened).
+   * readers are available.
    */
 
   void close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index f6c72b1..966a039 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -35,14 +35,24 @@ import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput;
  * expands to `columns`.</li>
  * <li>If the columns array appears, then no other table columns
  * can appear.</li>
- * <li>If the columns array appears, then the wildcard cannot also
- * appear, unless that wildcard expanded to be `columns` as
- * described above.</li>
+ * <li>Both 'columns' and the wildcard can appear for queries such
+ * as:<code><pre>
+ * select * from dfs.`multilevel/csv`
+ * where columns[1] < 1000</pre>
+ * </code></li>
  * <li>The query can select specific elements such as `columns`[2].
  * In this case, only array elements can appear, not the unindexed
  * `columns` column.</li>
+ * <li>If is possible for `columns` to appear twice. In this case,
+ * the project operator will make a copy.</li>
  * </ul>
  * <p>
+ * To handle these cases, the general rule is: allow any number
+ * of wildcard or `columns` appearances in the input projection, but
+ * collapse them all down to a single occurrence of `columns` in the
+ * output projection. (Upstream code will prevent `columns` from
+ * appearing twice in its non-indexed form.)
+ * <p>
  * It falls to this parser to detect a not-uncommon user error, a
  * query such as the following:<pre><code>
  * SELECT max(columns[1]) AS col1
@@ -83,7 +93,8 @@ public class ColumnsArrayParser implements ScanProjectionParser {
   @Override
   public boolean parse(RequestedColumn inCol) {
     if (requireColumnsArray && inCol.isWildcard()) {
-      expandWildcard();
+      createColumnsCol(
+          new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL));
       return true;
     }
     if (! inCol.nameEquals(ColumnsArrayManager.COLUMNS_COL)) {
@@ -113,41 +124,24 @@ public class ColumnsArrayParser implements ScanProjectionParser {
           .build(logger);
       }
     }
-
-    // Special `columns` array column.
-
-    columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol);
-    builder.addTableColumn(columnsArrayCol);
+    createColumnsCol(inCol);
     return true;
   }
 
-  /**
-   * Query contained SELECT *, and we know that the reader supports only
-   * the `columns` array; go ahead and expand the wildcard to the only
-   * possible column.
-   */
+  private void createColumnsCol(RequestedColumn inCol) {
+
+    // Special `columns` array column. Allow multiple, but
+    // project only one.
 
-  private void expandWildcard() {
     if (columnsArrayCol != null) {
-      throw UserException
-        .validationError()
-        .message("Cannot select columns[] and `*` together")
-        .build(logger);
+      return;
     }
-    columnsArrayCol = new UnresolvedColumnsArrayColumn(
-        new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL));
+    columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol);
     builder.addTableColumn(columnsArrayCol);
   }
 
   @Override
-  public void validate() {
-    if (builder.hasWildcard() && columnsArrayCol != null) {
-      throw UserException
-        .validationError()
-        .message("Cannot select `columns` and `*` together")
-        .build(logger);
-    }
-  }
+  public void validate() { }
 
   @Override
   public void validateColumn(ColumnProjection col) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
index f9674dc..ae8502b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.scan.file;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -38,6 +40,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
   private final FileMetadataManager metadataManager;
   private final Pattern partitionPattern;
   private ScanLevelProjection builder;
+  private final Set<Integer> referencedPartitions = new HashSet<>();
 
   // Output
 
@@ -64,6 +67,11 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
     if (defn != null) {
       return buildMetadataColumn(defn, inCol);
     }
+    if (inCol.isWildcard()) {
+      buildWildcard();
+
+      // Don't consider this a match.
+    }
     return false;
   }
 
@@ -80,11 +88,18 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
 
     // Partition column
 
-    builder.addMetadataColumn(
-        new PartitionColumn(
-          inCol.name(),
-          Integer.parseInt(m.group(1))));
-    hasImplicitCols = true;
+    int partitionIndex = Integer.parseInt(m.group(1));
+    if (! referencedPartitions.contains(partitionIndex)) {
+      builder.addMetadataColumn(
+          new PartitionColumn(
+            inCol.name(),
+            partitionIndex));
+
+      // Remember the partition for later wildcard expansion
+
+      referencedPartitions.add(partitionIndex);
+      hasImplicitCols = true;
+    }
     return true;
   }
 
@@ -107,8 +122,52 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
     return true;
   }
 
+  private void buildWildcard() {
+    if (metadataManager.useLegacyWildcardExpansion &&
+        metadataManager.useLegacyExpansionLocation) {
+
+      // Star column: this is a SELECT * query.
+
+      // Old-style wildcard handling inserts all partition columns in
+      // the scanner, removes them in Project.
+      // Fill in the file metadata columns. Can do here because the
+      // set is constant across all files.
+
+      expandPartitions();
+    }
+  }
+
   @Override
-  public void validate() { }
+  public void validate() {
+
+    // Expand partitions if using a wildcard appears, if using the
+    // feature to expand partitions for wildcards, and we want the
+    // partitions after data columns.
+
+    if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion &&
+        ! metadataManager.useLegacyExpansionLocation) {
+      expandPartitions();
+    }
+  }
+
+  private void expandPartitions() {
+
+    // Legacy wildcard expansion: include the file partitions for this file.
+    // This is a disadvantage for a * query: files at different directory
+    // levels will have different numbers of columns. Would be better to
+    // return this data as an array at some point.
+    // Append this after the *, keeping the * for later expansion.
+
+    for (int i = 0; i < metadataManager.partitionCount(); i++) {
+      if (referencedPartitions.contains(i)) {
+        continue;
+      }
+      builder.addMetadataColumn(new PartitionColumn(
+          metadataManager.partitionName(i), i));
+      referencedPartitions.add(i);
+    }
+    hasImplicitCols = true;
+  }
 
   @Override
   public void validateColumn(ColumnProjection outCol) { }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
index fe4332a..ba49a9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
@@ -53,6 +53,31 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
   protected final String partitionDesignator;
   protected List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>();
   protected Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap();
+
+  /**
+   * Indicates whether to expand partition columns when the query contains a wildcard.
+   * Supports queries such as the following:<code><pre>
+   * select * from dfs.`partitioned-dir`
+   * </pre><code>
+   * In which the output columns will be (columns, dir0) if the partitioned directory
+   * has one level of nesting.
+   *
+   * See {@link TestImplicitFileColumns#testImplicitColumns}
+   */
+  protected final boolean useLegacyWildcardExpansion;
+
+  /**
+   * In legacy mode, above, Drill expands partition columns whenever the
+   * wildcard appears. Drill 1.1 - 1.11 put expanded partition columns after
+   * data columns. This is actually a better position as it minimizes changes
+   * the row layout for files at different depths. Drill 1.12 moved them before
+   * data columns: at the location of the wildcard.
+   * <p>
+   * This flag, when set, uses the Drill 1.12 position. Later enhancements
+   * can unset this flag to go back to the future: use the preferred location
+   * after other columns.
+   */
+  protected final boolean useLegacyExpansionLocation;
   private final FileMetadataColumnsParser parser;
 
   // Internal state
@@ -84,7 +109,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
    */
 
   public FileMetadataManager(OptionSet optionManager,
+      boolean useLegacyWildcardExpansion,
+      boolean useLegacyExpansionLocation,
       Path rootDir, List<Path> files) {
+    this.useLegacyWildcardExpansion = useLegacyWildcardExpansion;
+    this.useLegacyExpansionLocation = useLegacyExpansionLocation;
     scanRootDir = rootDir;
 
     partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
@@ -117,6 +146,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
     }
   }
 
+  public FileMetadataManager(OptionSet optionManager,
+      Path rootDir, List<Path> files) {
+    this(optionManager, false, false, rootDir, files);
+  }
+
   private int computeMaxPartition(List<Path> files) {
     int maxLen = 0;
     for (Path filePath : files) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
index 609e9f0..6ecf0cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
@@ -32,19 +32,46 @@ import org.apache.hadoop.mapred.FileSplit;
 
 /**
  * The file scan framework adds into the scan framework support for implicit
- * file metadata columns.
+ * file metadata columns. The file scan framework brings together a number of
+ * components:
+ * <ul>
+ * <li>The projection list provided by the physical operator definition. This
+ * list identifies the set of "output" columns whih this framework is obliged
+ * to produce.</li>
+ * <li>The set of files and/or blocks to read.</li>
+ * <li>The file system configuration to use for working with the files
+ * or blocks.</li>
+ * <li>The factory class to create a reader for each of the files or blocks
+ * defined above. (Readers are created one-by-one as files are read.)</li>
+ * <li>Options as defined by the base class.</li>
+ * </ul>
+ * <p>
+ * @See {AbstractScanFramework} for details.
  */
 
 public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiator> {
 
-  public interface FileReaderCreator {
+  /**
+   * Creates a batch reader on demand. Unlike earlier versions of Drill,
+   * this framework creates readers one by one, when they are needed.
+   * Doing so avoids excessive resource demands that come from creating
+   * potentially thousands of readers up front.
+   * <p>
+   * The reader itself is unique to each file type. This interface
+   * provides a common interface that this framework can use to create the
+   * file-specific reader on demand.
+   */
+
+  public interface FileReaderFactory {
     ManagedReader<FileSchemaNegotiator> makeBatchReader(
         DrillFileSystem dfs,
         FileSplit split) throws ExecutionSetupException;
   }
 
   /**
-   * Implementation of the file-level schema negotiator.
+   * Implementation of the file-level schema negotiator. At present, no
+   * file-specific features exist. This class shows, however, where we would
+   * add such features.
    */
 
   public static class FileSchemaNegotiatorImpl extends SchemaNegotiatorImpl
@@ -55,12 +82,12 @@ public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiato
     }
   }
 
-  private final FileReaderCreator readerCreator;
+  private final FileReaderFactory readerCreator;
 
   public FileScanFramework(List<SchemaPath> projection,
       List<? extends FileWork> files,
       Configuration fsConf,
-      FileReaderCreator readerCreator) {
+      FileReaderFactory readerCreator) {
     super(projection, files, fsConf);
     this.readerCreator = readerCreator;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java
index 4a15ff7..bf13265 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java
@@ -44,4 +44,17 @@ public abstract class MetadataColumn extends ResolvedColumn implements ConstantC
   public String name() { return schema.getName(); }
 
   public abstract MetadataColumn resolve(FileMetadata fileInfo, VectorSource source, int sourceIndex);
+
+  @Override
+  public String toString() {
+    return new StringBuilder()
+        .append("[")
+        .append(getClass().getSimpleName())
+        .append(" schema=\"")
+        .append(schema.toString())
+        .append(", value=")
+        .append(value)
+        .append("]")
+        .toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java
index 5407901..d285261 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java
@@ -40,15 +40,91 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
  * config options, and implements the matching "managed reader". All details
  * of setup, projection, and so on are handled by the framework and the components
  * that the framework builds upon.
+ *
+ * <h4>Inputs</h4>
+ *
+ * At this basic level, a scan framework requires just a few simple inputs:
+ * <ul>
+ * <li>The projection list provided by the physical operator definition. This
+ * list identifies the set of "output" columns whih this framework is obliged
+ * to produce.</li>
+ * <li>The operator context which provides access to a memory allocator and
+ * other plumbing items.</li>
+ * <li>A method to create a reader for each of the files or blocks
+ * defined above. (Readers are created one-by-one as files are read.)</li>
+ * <li>The data type to use for projected columns which the reader cannot
+ * provide. (Drill allows such columns and fills in null values: traditionally
+ * nullable Int, but customizable here.)
+ * <li>Various other options.</li>
+ * </ul>
+ *
+ * <h4>Orchestration</h4>
+ *
+ * The above is sufficient to drive the entire scan operator functionality.
+ * Projection is done generically and is the same for all files. Only the
+ * reader (created via the factory class) differs from one type of file to
+ * another.
+ * <p>
+ * The framework achieves the work described below= by composing a large
+ * set of detailed classes, each of which performs some specific task. This
+ * structure leaves the reader to simply infer schema and read data.
+ * <p>
+ * In particular, rather than do all the orchestration here (which would tie
+ * that logic to the scan operation), the detailed work is delegated to the
+ * {@link ScanSchemaOrchestrator} class, with this class as a "shim" between
+ * the the Scan events API and the schema orchestrator implementation.
+ *
+ * <h4>Reader Integration</h4>
+ *
+ * The details of how a file is structured, how a schema is inferred, how
+ * data is decoded: all that is encapsulated in the reader. The only real
+ * Interaction between the reader and the framework is:
+ * <ul>
+ * <li>The reader "negotiates" a schema with the framework. The framework
+ * knows the projection list from the query plan, knows something about
+ * data types (whether a column should be scalar, a map or an array), and
+ * knows about the schema already defined by prior readers. The reader knows
+ * what schema it can produce (if "early schema.") The schema negotiator
+ * class handles this task.</li>
+ * <li>The reader reads data from the file and populates value vectors a
+ * batch at a time. The framework creates the result set loader to use for
+ * this work. The schema negotiator returns that loader to the reader, which
+ * uses it during read.
+ * <p>
+ * It is important to note that the result set loader also defines a schema:
+ * the schema requested by the reader. If the reader wants to read three
+ * columns, a, b, and c, then that is the schema that the result set loader
+ * supports. This is true even if the query plan only wants column a, or
+ * wants columns c, a. The framework handles the projection task so the
+ * reader does not have to worry about it. Reading an unwanted column
+ * is low cost: the result set loader will have provided a "dummy" column
+ * writer that simply discards the value. This is just as fast as having the
+ * reader use if-statements or a table to determine which columns to save.
+ * <p>
+ * A reader may be "late schema", true "schema on read." In this case, the
+ * reader simply tells the result set loader to create a new column reader
+ * on the fly. The framework will work out if that new column is to be
+ * projected and will return either a real column writer (projected column)
+ * or a dummy column writer (unprojected column.)</li>
+ * <li>The reader then reads batches of data until all data is read. The
+ * result set loader signals when a batch is full; the reader should not
+ * worry about this detail itself.</li>
+ * <li>The reader then releases its resources.</li>
+ * </ul>
  */
 
 public abstract class AbstractScanFramework<T extends SchemaNegotiator> implements ScanOperatorEvents {
 
+  // Inputs
+
   protected final List<SchemaPath> projection;
   protected MajorType nullType;
   protected int maxBatchRowCount;
   protected int maxBatchByteCount;
   protected OperatorContext context;
+
+  // Internal state
+
   protected ScanSchemaOrchestrator scanOrchestrator;
 
   public AbstractScanFramework(List<SchemaPath> projection) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index 6318812..dead9cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -65,9 +65,13 @@ public interface SchemaNegotiator {
    * columns during the read.
    *
    * @param schema the table schema if known at open time
+   * @param isComplete true if the schema is complete: if it can be used
+   * to define an empty schema-only batch for the first reader. Set to
+   * false if the schema is partial: if the reader must read rows to
+   * determine the full schema
    */
 
-  void setTableSchema(TupleMetadata schema);
+  void setTableSchema(TupleMetadata schema, boolean isComplete);
 
   /**
    * Set the preferred batch size (which may be overridden by the
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
index 46f363d..0841049 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
@@ -53,6 +53,7 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
   protected final AbstractScanFramework<?> basicFramework;
   private final ShimBatchReader<? extends SchemaNegotiator> shim;
   protected TupleMetadata tableSchema;
+  protected boolean isSchemaComplete;
   protected int batchSize = ValueVector.MAX_ROW_COUNT;
 
   public SchemaNegotiatorImpl(AbstractScanFramework<?> framework, ShimBatchReader<? extends SchemaNegotiator> shim) {
@@ -66,8 +67,9 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
   }
 
   @Override
-  public void setTableSchema(TupleMetadata schema) {
+  public void setTableSchema(TupleMetadata schema, boolean isComplete) {
     tableSchema = schema;
+    this.isSchemaComplete = schema != null && isComplete;
   }
 
   @Override
@@ -97,4 +99,6 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
   public boolean isProjectionEmpty() {
     return basicFramework.scanOrchestrator().isProjectNone();
   }
+
+  public boolean isSchemaComplete() { return isSchemaComplete; }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
index 0dc3c57..a97b329 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.framework;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.record.VectorContainer;
 
@@ -44,6 +44,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead
   protected final AbstractScanFramework<T> manager;
   protected final ManagedReader<T> reader;
   protected final ReaderSchemaOrchestrator readerOrchestrator;
+  protected SchemaNegotiatorImpl schemaNegotiator;
   protected ResultSetLoader tableLoader;
 
   /**
@@ -96,10 +97,19 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead
   }
 
   @Override
+  public boolean defineSchema() {
+    if (schemaNegotiator.isSchemaComplete()) {
+      readerOrchestrator.defineSchema();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
   public boolean next() {
 
     // The reader may report EOF, but the result set loader might
-    // have a lookhead row.
+    // have a lookahead row.
 
     if (eof && ! tableLoader.hasRows()) {
       return false;
@@ -181,6 +191,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead
   }
 
   public ResultSetLoader build(SchemaNegotiatorImpl schemaNegotiator) {
+    this.schemaNegotiator = schemaNegotiator;
     readerOrchestrator.setBatchSize(schemaNegotiator.batchSize);
     tableLoader = readerOrchestrator.makeTableLoader(schemaNegotiator.tableSchema);
     return tableLoader;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java
index efd881b..096b844 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java
@@ -29,7 +29,7 @@
  * is an old version without a new column c, while file B includes the column.
  * And so on.
  * <p>
- * The scan operator here works to ensure schema continuity as much as
+ * The scan operator works to ensure schema continuity as much as
  * possible, smoothing out "soft" schema changes that are simply artifacts of
  * reading a collection of files. Only "hard" changes (true changes) are
  * passed downstream.
@@ -157,5 +157,29 @@
  * output batch in the order specified by the original SELECT list (or table order,
  * if the original SELECT had a wildcard.) Fortunately, this is just involves
  * moving around pointers to vectors; no actual data is moved during projection.
+ *
+ * <h4>Class Structure</h4>
+ *
+ * Some of the key classes here include:
+ * <ul>
+ * <li>{@link RowBatchReader} an extremely simple interface for reading data.
+ * We would like many developers to create new plugins and readers. The simplified
+ * interface pushes all complexity into the scan framework, leaving the reader to
+ * just read.</li>
+ * <li>{@link ShimBatchReader} an implementation of the above that converts from
+ * the simplified API to add additional structure to work with the result set loader.
+ * (The base interface is agnostic about how rows are read.)</li>
+ * <li>{@link ScheamNegotiator} and interface that allows a batch reader to
+ * "negotiate" a schema with the scan framework. The scan framework knows the
+ * columns that are to be projected. The reader knows what columns it can offer.
+ * The schema negotiator works out how to combine the two. It expresses the result
+ * as a result set loader. Column writers are defined for all columns that the
+ * reader wants to read, but only the materialized (projected) columns have actual
+ * vectors behind them. The non-projected columns are "free-wheeling" "dummy"
+ * writers.
+ * </li>
+ *
+ * And, yes, sorry for the terminology. File "readers" read from files, but
+ * use column "writers" to write to value vectors.
  */
 package org.apache.drill.exec.physical.impl.scan.framework;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java
index 3e302a1..d7de30a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java
@@ -36,9 +36,90 @@
  * <p>
  * See {@link ScanOperatorExec} for details of the scan operator protocol
  * and components.
+ *
+ * <h4>Traditional Class Structure<h4>
+ * The original design was simple: but required each reader to handle many
+ * detailed tasks.
+ * <pre><code>
+ *  +------------+          +-----------+
+ *  | Scan Batch |    +---> | ScanBatch |
+ *  |  Creator   |    |     +-----------+
+ *  +------------+    |           |
+ *         |          |           |
+ *         v          |           |
+ *  +------------+    |           v
+ *  |   Format   | ---+   +---------------+
+ *  |   Plugin   | -----> | Record Reader |
+ *  +------------+        +---------------+
+ *
+ * </code></pre>
+ *
+ * The scan batch creator is unique to each storage plugin and is created
+ * based on the physical operator configuration ("pop config"). The
+ * scan batch creator delegates to the format plugin to create both the
+ * scan batch (the scan operator) and the set of readers which the scan
+ * batch will manage.
+ * <p>
+ * The scan batch
+ * provides a <code>Mutator</code> that creates the vectors used by the
+ * record readers. Schema continuity comes from reusing the Mutator from one
+ * file/block to the next.
+ * <p>
+ * One characteristic of this system is that all the record readers are
+ * created up front. If we must read 1000 blocks, we'll create 1000 record
+ * readers. Developers must be very careful to only allocate resources when
+ * the reader is opened, and release resources when the reader is closed.
+ * Else, resource bloat becomes a large problem.
+ *
+ * <h4>Revised Class Structure</h4>
+ *
+ * The new design is more complex because it divides tasks up into separate
+ * classes. The class structure is larger, but each class is smaller, more
+ * focused and does just one task.
+ * <pre><code>
+ *  +------------+          +---------------+
+ *  | Scan Batch | -------> | Format Plugin |
+ *  |  Creator   |          +---------------+
+ *  +------------+          /        |       \
+ *                         /         |        \
+ *    +---------------------+        |         \ +---------------+
+ *    | OperatorRecordBatch |        |     +---->| ScanFramework |
+ *    +---------------------+        |     |     +---------------+
+ *                                   v     |            |
+ *                         +------------------+         |
+ *                         | ScanOperatorExec |         |
+ *                         +------------------+         v
+ *                                   |            +--------------+
+ *                                   +----------> | Batch Reader |
+ *                                                +--------------+
+ * </code></pre>
+ *
+ * Here, the scan batch creator again delegates to the format plugin. The
+ * format plugin creates three objects:
+ * <ul>
+ * <li>The <code>OperatorRecordBatch</code>, which encapsulates the Volcano
+ * iterator protocol. It also holds onto the output batch. This allows the
+ * operator implementation to just focus on its specific job.</li>
+ * <li>The <code>ScanOperatorExec</code> is the operator implementation for
+ * the new result-set-loader based scan.</li>
+ * <li>The scan framework is specific to each kind of reader. It handles
+ * everything which is unique to that reader. Rather than inheriting from
+ * the scan itself, the framework follows the strategy pattern: it says how
+ * to do a scan for the target format.<li>
+ * </ul>
+ *
+ * The overall structure uses the "composition" pattern: what is combined
+ * into a small set of classes in the traditional model is broken out into
+ * focused classes in the revised model.
+ * <p>
+ * A key part of the scan strategy is the batch reader. ("Batch" because
+ * it reads an entire batch at a time, using the result set loader.) The
+ * framework creates batch readers one by one as needed. Resource bloat
+ * is less of an issue because only one batch reader instance exists at
+ * any time for each scan operator instance.
  * <p>
- * See the "managed" package for a reusable framework for handling the
- * details of batches, schema and so on.
+ * Each of the above is further broken down into additional classes to
+ * handle projection and so on.
  */
 
 package org.apache.drill.exec.physical.impl.scan;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
index 41cc595..c0bcfa3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
  */
 
 public class ExplicitSchemaProjection extends SchemaLevelProjection {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplicitSchemaProjection.class);
 
   public ExplicitSchemaProjection(ScanLevelProjection scanProj,
       TupleMetadata tableSchema,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
new file mode 100644
index 0000000..029b6a0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Orchestrates projection tasks for a single reader within the set that the
+ * scan operator manages. Vectors are reused across readers, but via a vector
+ * cache. All other state is distinct between readers.
+ */
+
+public class ReaderSchemaOrchestrator implements VectorSource {
+
+  private final ScanSchemaOrchestrator scanOrchestrator;
+  private int readerBatchSize;
+  private ResultSetLoaderImpl tableLoader;
+  private int prevTableSchemaVersion = -1;
+
+  /**
+   * Assembles the table, metadata and null columns into the final output
+   * batch to be sent downstream. The key goal of this class is to "smooth"
+   * schema changes in this output batch by absorbing trivial schema changes
+   * that occur across readers.
+   */
+
+  private ResolvedRow rootTuple;
+  private VectorContainer tableContainer;
+
+  public ReaderSchemaOrchestrator(ScanSchemaOrchestrator scanSchemaOrchestrator) {
+    scanOrchestrator = scanSchemaOrchestrator;
+    readerBatchSize = scanOrchestrator.scanBatchRecordLimit;
+  }
+
+  public void setBatchSize(int size) {
+    if (size > 0) {
+      readerBatchSize = Math.min(size, scanOrchestrator.scanBatchRecordLimit);
+    }
+  }
+
+  public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) {
+    OptionBuilder options = new OptionBuilder();
+    options.setRowCountLimit(readerBatchSize);
+    options.setVectorCache(scanOrchestrator.vectorCache);
+    options.setBatchSizeLimit(scanOrchestrator.scanBatchByteLimit);
+
+    // Set up a selection list if available and is a subset of
+    // table columns. (Only needed for non-wildcard queries.)
+    // The projection list includes all candidate table columns
+    // whether or not they exist in the up-front schema. Handles
+    // the odd case where the reader claims a fixed schema, but
+    // adds a column later.
+
+    if (! scanOrchestrator.scanProj.projectAll()) {
+      options.setProjectionSet(scanOrchestrator.scanProj.readerProjection());
+    }
+    options.setSchema(tableSchema);
+
+    // Create the table loader
+
+    tableLoader = new ResultSetLoaderImpl(scanOrchestrator.allocator, options.build());
+    return tableLoader;
+  }
+
+  public boolean hasSchema() {
+    return prevTableSchemaVersion >= 0;
+  }
+
+  public void defineSchema() {
+    tableLoader.startEmptyBatch();
+    endBatch();
+  }
+
+  public void startBatch() {
+    tableLoader.startBatch();
+  }
+
+  /**
+   * Build the final output batch by projecting columns from the three input sources
+   * to the output batch. First, build the metadata and/or null columns for the
+   * table row count. Then, merge the sources.
+   */
+
+  public void endBatch() {
+
+    // Get the batch results in a container.
+
+    tableContainer = tableLoader.harvest();
+
+    // If the schema changed, set up the final projection based on
+    // the new (or first) schema.
+
+    if (prevTableSchemaVersion < tableLoader.schemaVersion()) {
+      reviseOutputProjection();
+    } else {
+
+      // Fill in the null and metadata columns.
+
+      populateNonDataColumns();
+    }
+    rootTuple.setRowCount(tableContainer.getRecordCount());
+  }
+
+  private void populateNonDataColumns() {
+    int rowCount = tableContainer.getRecordCount();
+    scanOrchestrator.metadataManager.load(rowCount);
+    rootTuple.loadNulls(rowCount);
+  }
+
+  /**
+   * Create the list of null columns by comparing the SELECT list against the
+   * columns available in the batch schema. Create null columns for those that
+   * are missing. This is done for the first batch, and any time the schema
+   * changes. (For early-schema, the projection occurs once as the schema is set
+   * up-front and does not change.) For a SELECT *, the null column check
+   * only need be done if null columns were created when mapping from a prior
+   * schema.
+   */
+
+  private void reviseOutputProjection() {
+
+    // Do the table-schema level projection; the final matching
+    // of projected columns to available columns.
+
+    TupleMetadata tableSchema = tableLoader.harvestSchema();
+    if (scanOrchestrator.schemaSmoother != null) {
+      doSmoothedProjection(tableSchema);
+    } else if (scanOrchestrator.scanProj.hasWildcard()) {
+      doWildcardProjection(tableSchema);
+    } else {
+      doExplicitProjection(tableSchema);
+    }
+
+    // Combine metadata, nulls and batch data to form the final
+    // output container. Columns are created by the metadata and null
+    // loaders only in response to a batch, so create the first batch.
+
+    rootTuple.buildNulls(scanOrchestrator.vectorCache);
+    scanOrchestrator.metadataManager.define();
+    populateNonDataColumns();
+    rootTuple.project(tableContainer, scanOrchestrator.outputContainer);
+    prevTableSchemaVersion = tableLoader.schemaVersion();
+  }
+
+  private void doSmoothedProjection(TupleMetadata tableSchema) {
+    rootTuple = new ResolvedRow(
+        new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns));
+    scanOrchestrator.schemaSmoother.resolve(tableSchema, rootTuple);
+  }
+
+  /**
+   * Query contains a wildcard. The schema-level projection includes
+   * all columns provided by the reader.
+   */
+
+  private void doWildcardProjection(TupleMetadata tableSchema) {
+    rootTuple = new ResolvedRow(null);
+    new WildcardSchemaProjection(scanOrchestrator.scanProj,
+        tableSchema, rootTuple, scanOrchestrator.schemaResolvers);
+  }
+
+  /**
+   * Explicit projection: include only those columns actually
+   * requested by the query, which may mean filling in null
+   * columns for projected columns that don't actually exist
+   * in the table.
+   *
+   * @param tableSchema newly arrived schema
+   */
+
+  private void doExplicitProjection(TupleMetadata tableSchema) {
+    rootTuple = new ResolvedRow(
+        new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns));
+    new ExplicitSchemaProjection(scanOrchestrator.scanProj,
+            tableSchema, rootTuple,
+            scanOrchestrator.schemaResolvers);
+  }
+
+  @Override
+  public ValueVector vector(int index) {
+    return tableContainer.getValueVector(index).getValueVector();
+  }
+
+  public void close() {
+    RuntimeException ex = null;
+    try {
+      if (tableLoader != null) {
+        tableLoader.close();
+        tableLoader = null;
+      }
+    }
+    catch (RuntimeException e) {
+      ex = e;
+    }
+    try {
+      if (rootTuple != null) {
+        rootTuple.close();
+        rootTuple = null;
+      }
+    }
+    catch (RuntimeException e) {
+      ex = ex == null ? e : ex;
+    }
+    scanOrchestrator.metadataManager.endFile();
+    if (ex != null) {
+      throw ex;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
index 83d40a3..f90f722 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -100,8 +100,6 @@ import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
 
 public class ScanLevelProjection {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanLevelProjection.class);
-
   /**
    * Interface for add-on parsers, avoids the need to create
    * a single, tightly-coupled parser for all types of columns.
@@ -128,12 +126,26 @@ public class ScanLevelProjection {
 
   // Internal state
 
+  protected boolean includesWildcard;
   protected boolean sawWildcard;
 
   // Output
 
   protected List<ColumnProjection> outputCols = new ArrayList<>();
+
+  /**
+   * Projection definition for the scan a whole. Parsed form of the input
+   * projection list.
+   */
+
   protected RequestedTuple outputProjection;
+
+  /**
+   * Projection definition passed to each reader. This is the set of
+   * columns that the reader is asked to provide.
+   */
+
+  protected RequestedTuple readerProjection;
   protected boolean hasWildcard;
   protected boolean emptyProjection = true;
 
@@ -158,6 +170,18 @@ public class ScanLevelProjection {
     for (ScanProjectionParser parser : parsers) {
       parser.bind(this);
     }
+
+    // First pass: check if a wildcard exists.
+
+    for (RequestedColumn inCol : outputProjection.projections()) {
+      if (inCol.isWildcard()) {
+        includesWildcard = true;
+        break;
+      }
+    }
+
+    // Second pass: process remaining columns.
+
     for (RequestedColumn inCol : outputProjection.projections()) {
       if (inCol.isWildcard()) {
         mapWildcard(inCol);
@@ -169,6 +193,23 @@ public class ScanLevelProjection {
     for (ScanProjectionParser parser : parsers) {
       parser.build();
     }
+
+    // Create the reader projection which includes either all columns
+    // (saw a wildcard) or just the unresolved columns (which excludes
+    // implicit columns.)
+
+    List<RequestedColumn> outputProj;
+    if (hasWildcard()) {
+      outputProj = null;
+    } else {
+      outputProj = new ArrayList<>();
+      for (ColumnProjection col : outputCols) {
+        if (col instanceof UnresolvedColumn) {
+          outputProj.add(((UnresolvedColumn) col).element());
+        }
+      }
+    }
+    readerProjection = RequestedTupleImpl.build(outputProj);
   }
 
   /**
@@ -181,6 +222,7 @@ public class ScanLevelProjection {
 
     // Wildcard column: this is a SELECT * query.
 
+    assert includesWildcard;
     if (sawWildcard) {
       throw new IllegalArgumentException("Duplicate * entry in project list");
     }
@@ -245,6 +287,15 @@ public class ScanLevelProjection {
       }
     }
 
+    // If the project list has a wildcard, and the column is not one recognized
+    // by the specialized parsers above, then just ignore it. It is likely a duplicate
+    // column name. In any event, it will be processed by the Project operator on
+    // top of this scan.
+
+    if (includesWildcard) {
+      return;
+    }
+
     // This is a desired table column.
 
     addTableColumn(
@@ -281,15 +332,6 @@ public class ScanLevelProjection {
       for (ScanProjectionParser parser : parsers) {
         parser.validateColumn(outCol);
       }
-      switch (outCol.nodeType()) {
-      case UnresolvedColumn.UNRESOLVED:
-        if (hasWildcard()) {
-          throw new IllegalArgumentException("Cannot select table columns and * together");
-        }
-        break;
-      default:
-        break;
-      }
     }
   }
 
@@ -333,6 +375,8 @@ public class ScanLevelProjection {
 
   public RequestedTuple rootProjection() { return outputProjection; }
 
+  public RequestedTuple readerProjection() { return readerProjection; }
+
   @Override
   public String toString() {
     return new StringBuilder()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index fe78f5a..a5d6ca2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -23,15 +23,10 @@ import java.util.List;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
 import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
-import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
-import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
-import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
 import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
 
 /**
@@ -154,212 +149,6 @@ public class ScanSchemaOrchestrator {
   public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE;
   public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
 
-  /**
-   * Orchestrates projection tasks for a single reader with the set that the
-   * scan operator manages. Vectors are reused across readers, but via a vector
-   * cache. All other state is distinct between readers.
-   */
-
-  public class ReaderSchemaOrchestrator implements VectorSource {
-
-    private int readerBatchSize;
-    private ResultSetLoaderImpl tableLoader;
-    private int prevTableSchemaVersion = -1;
-
-    /**
-     * Assembles the table, metadata and null columns into the final output
-     * batch to be sent downstream. The key goal of this class is to "smooth"
-     * schema changes in this output batch by absorbing trivial schema changes
-     * that occur across readers.
-     */
-
-    private ResolvedRow rootTuple;
-    private VectorContainer tableContainer;
-
-    public ReaderSchemaOrchestrator() {
-      readerBatchSize = scanBatchRecordLimit;
-    }
-
-    public void setBatchSize(int size) {
-      if (size > 0) {
-        readerBatchSize = Math.min(size, scanBatchRecordLimit);
-      }
-    }
-
-    public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) {
-      OptionBuilder options = new OptionBuilder();
-      options.setRowCountLimit(readerBatchSize);
-      options.setVectorCache(vectorCache);
-      options.setBatchSizeLimit(scanBatchByteLimit);
-
-      // Set up a selection list if available and is a subset of
-      // table columns. (Only needed for non-wildcard queries.)
-      // The projection list includes all candidate table columns
-      // whether or not they exist in the up-front schema. Handles
-      // the odd case where the reader claims a fixed schema, but
-      // adds a column later.
-
-      if (! scanProj.projectAll()) {
-        options.setProjectionSet(scanProj.rootProjection());
-      }
-      options.setSchema(tableSchema);
-
-      // Create the table loader
-
-      tableLoader = new ResultSetLoaderImpl(allocator, options.build());
-
-      // If a schema is given, create a zero-row batch to announce the
-      // schema downstream in the form of an empty batch.
-
-      if (tableSchema != null) {
-        tableLoader.startEmptyBatch();
-        endBatch();
-      }
-
-      return tableLoader;
-    }
-
-    public boolean hasSchema() {
-      return prevTableSchemaVersion >= 0;
-    }
-
-    public void startBatch() {
-      tableLoader.startBatch();
-    }
-
-    /**
-     * Build the final output batch by projecting columns from the three input sources
-     * to the output batch. First, build the metadata and/or null columns for the
-     * table row count. Then, merge the sources.
-     */
-
-    public void endBatch() {
-
-      // Get the batch results in a container.
-
-      tableContainer = tableLoader.harvest();
-
-      // If the schema changed, set up the final projection based on
-      // the new (or first) schema.
-
-      if (prevTableSchemaVersion < tableLoader.schemaVersion()) {
-        reviseOutputProjection();
-      } else {
-
-        // Fill in the null and metadata columns.
-
-        populateNonDataColumns();
-      }
-      rootTuple.setRowCount(tableContainer.getRecordCount());
-    }
-
-    private void populateNonDataColumns() {
-      int rowCount = tableContainer.getRecordCount();
-      metadataManager.load(rowCount);
-      rootTuple.loadNulls(rowCount);
-    }
-
-    /**
-     * Create the list of null columns by comparing the SELECT list against the
-     * columns available in the batch schema. Create null columns for those that
-     * are missing. This is done for the first batch, and any time the schema
-     * changes. (For early-schema, the projection occurs once as the schema is set
-     * up-front and does not change.) For a SELECT *, the null column check
-     * only need be done if null columns were created when mapping from a prior
-     * schema.
-     */
-
-    private void reviseOutputProjection() {
-
-      // Do the table-schema level projection; the final matching
-      // of projected columns to available columns.
-
-      TupleMetadata tableSchema = tableLoader.harvestSchema();
-      if (schemaSmoother != null) {
-        doSmoothedProjection(tableSchema);
-      } else if (scanProj.hasWildcard()) {
-        doWildcardProjection(tableSchema);
-      } else {
-        doExplicitProjection(tableSchema);
-      }
-
-      // Combine metadata, nulls and batch data to form the final
-      // output container. Columns are created by the metadata and null
-      // loaders only in response to a batch, so create the first batch.
-
-      rootTuple.buildNulls(vectorCache);
-      metadataManager.define();
-      populateNonDataColumns();
-      rootTuple.project(tableContainer, outputContainer);
-      prevTableSchemaVersion = tableLoader.schemaVersion();
-    }
-
-    private void doSmoothedProjection(TupleMetadata tableSchema) {
-      rootTuple = new ResolvedRow(
-          new NullColumnBuilder(nullType, allowRequiredNullColumns));
-      schemaSmoother.resolve(tableSchema, rootTuple);
-    }
-
-    /**
-     * Query contains a wildcard. The schema-level projection includes
-     * all columns provided by the reader.
-     */
-
-    private void doWildcardProjection(TupleMetadata tableSchema) {
-      rootTuple = new ResolvedRow(null);
-      new WildcardSchemaProjection(scanProj,
-          tableSchema, rootTuple, schemaResolvers);
-    }
-
-    /**
-     * Explicit projection: include only those columns actually
-     * requested by the query, which may mean filling in null
-     * columns for projected columns that don't actually exist
-     * in the table.
-     *
-     * @param tableSchema newly arrived schema
-     */
-
-    private void doExplicitProjection(TupleMetadata tableSchema) {
-      rootTuple = new ResolvedRow(
-          new NullColumnBuilder(nullType, allowRequiredNullColumns));
-      new ExplicitSchemaProjection(scanProj,
-              tableSchema, rootTuple,
-              schemaResolvers);
-    }
-
-    @Override
-    public ValueVector vector(int index) {
-      return tableContainer.getValueVector(index).getValueVector();
-    }
-
-    public void close() {
-      RuntimeException ex = null;
-      try {
-        if (tableLoader != null) {
-          tableLoader.close();
-          tableLoader = null;
-        }
-      }
-      catch (RuntimeException e) {
-        ex = e;
-      }
-      try {
-        if (rootTuple != null) {
-          rootTuple.close();
-          rootTuple = null;
-        }
-      }
-      catch (RuntimeException e) {
-        ex = ex == null ? e : ex;
-      }
-      metadataManager.endFile();
-      if (ex != null) {
-        throw ex;
-      }
-    }
-  }
-
   // Configuration
 
   /**
@@ -367,16 +156,16 @@ public class ScanSchemaOrchestrator {
    * not set, the null type is the Drill default.
    */
 
-  private MajorType nullType;
+  MajorType nullType;
 
   /**
    * Creates the metadata (file and directory) columns, if needed.
    */
 
-  private MetadataManager metadataManager;
-  private final BufferAllocator allocator;
-  private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
-  private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
+  MetadataManager metadataManager;
+  final BufferAllocator allocator;
+  int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
+  int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
   private final List<ScanProjectionParser> parsers = new ArrayList<>();
 
   /**
@@ -389,7 +178,7 @@ public class ScanSchemaOrchestrator {
   List<SchemaProjectionResolver> schemaResolvers = new ArrayList<>();
 
   private boolean useSchemaSmoothing;
-  private boolean allowRequiredNullColumns;
+  boolean allowRequiredNullColumns;
 
   // Internal state
 
@@ -402,14 +191,14 @@ public class ScanSchemaOrchestrator {
    * vectors rather than vector instances, this cache can be deprecated.
    */
 
-  private ResultVectorCacheImpl vectorCache;
-  private ScanLevelProjection scanProj;
+  ResultVectorCacheImpl vectorCache;
+  ScanLevelProjection scanProj;
   private ReaderSchemaOrchestrator currentReader;
-  private SchemaSmoother schemaSmoother;
+  SchemaSmoother schemaSmoother;
 
   // Output
 
-  private VectorContainer outputContainer;
+  VectorContainer outputContainer;
 
   public ScanSchemaOrchestrator(BufferAllocator allocator) {
     this.allocator = allocator;
@@ -493,20 +282,12 @@ public class ScanSchemaOrchestrator {
 
     ScanProjectionParser parser = metadataManager.projectionParser();
     if (parser != null) {
-
-      // For compatibility with Drill 1.12, insert the file metadata
-      // parser before others so that, in a wildcard query, metadata
-      // columns appear before others (such as the `columns` column.)
-      // This is temporary and should be removed once the test framework
-      // is restored to Drill 1.11 functionality.
-
       parsers.add(parser);
     }
 
     // Parse the projection list.
 
     scanProj = new ScanLevelProjection(projection, parsers);
-
     if (scanProj.hasWildcard() && useSchemaSmoothing) {
       schemaSmoother = new SchemaSmoother(scanProj, schemaResolvers);
     }
@@ -526,7 +307,7 @@ public class ScanSchemaOrchestrator {
 
   public ReaderSchemaOrchestrator startReader() {
     closeReader();
-    currentReader = new ReaderSchemaOrchestrator();
+    currentReader = new ReaderSchemaOrchestrator(this);
     return currentReader;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
index c7bae27..a756114 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
@@ -61,8 +61,6 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 public class SchemaLevelProjection {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaLevelProjection.class);
-
   /**
    * Schema-level projection is customizable. Implement this interface, and
    * add an instance to the scan orchestrator, to perform custom mappings
@@ -81,10 +79,7 @@ public class SchemaLevelProjection {
 
   protected SchemaLevelProjection(
         List<SchemaProjectionResolver> resolvers) {
-    if (resolvers == null) {
-      resolvers = new ArrayList<>();
-    }
-    this.resolvers = resolvers;
+    this.resolvers = resolvers == null ? new ArrayList<>() : resolvers;
     for (SchemaProjectionResolver resolver : resolvers) {
       resolver.startResolution();
     }
@@ -97,6 +92,8 @@ public class SchemaLevelProjection {
         return;
       }
     }
-    throw new IllegalStateException("No resolver for column: " + col.nodeType());
+    throw new IllegalStateException(
+        String.format("No resolver for column `%s` of type %d",
+            col.name(), col.nodeType()));
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
index a5a52c0..155fcf8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
@@ -22,6 +22,56 @@
  * or may be "missing" with null values applied. The code here prepares
  * a run-time projection plan based on the actual table schema.
  * <p>
+ * Looks at schema as a set of transforms.
+ * <ul>
+ * <li>Scan-level projection list from the query plan: The list of columns
+ * (or the wildcard) as requested by the user in the query. The planner
+ * determines which columns to project. In Drill, projection is speculative:
+ * it is a list of names which the planner hopes will appear in the data
+ * files. The reader must make up columns (the infamous nullable INT) when
+ * it turns out that no such column exists. Else, the reader must figure out
+ * the data type for any columns that does exist.
+ * <p>
+ * The scan project list defines the set of columns which the scan operator
+ * is obliged to send downstream. Ideally, the scan operator sends exactly the
+ * same schema (the project list with types filled in) for all batches. Since
+ * batches may come from different files, the scan operator is obligated to
+ * unify the schemas from those files (or blocks.)</ul>
+ * <li>Reader (file)-level projection occurs for each reader. A single scan
+ * may use multiple readers to read data. Each reader may offer more information
+ * about the schema. For example, a Parquet reader can obtain schema information
+ * from the Parquet headers. A JDBC reader obtains schema information from the
+ * returned schema. This is called "early schema." File-based readers can at least
+ * add implicit file or partition columns.
+ * <p>
+ * The result is a refined schema: the scan level schema with more information
+ * filled in. For Parquet, all projection information can be filled in. For
+ * CSV or JSON, we can only add file metadata information, but not yet the
+ * actual data schema.</ul>
+ * <li>Batch-level schema: once a reader reads actual data, it now knows
+ * exactly what it read. This is the "schema on read model." Thus, after reading
+ * a batch, any remaining uncertainty about the projected schema is removed.
+ * The actual data defined data types and so on.
+ * <p>
+ * Readers such as JSON and CSV are "late schema": they don't know the data
+ * schema until they read the file. This is true "schema on read." Further, for
+ * JSON, the data may change from one batch to the next as the reader "discovers"
+ * fields that did not appear in earlier batches. This requires some amount of
+ * "schema smoothing": the ability to preserve a consistent output schema even
+ * as the input schema jiggles around some.</ul>
+ * </ul>
+ * <p>
+ * The goal of this mechanism is to handle the above use cases cleanly, in a
+ * common set of classes, and to avoid the need for each reader to figure out
+ * all these issues for themselves (as was the case with earlier versions of
+ * Drill.)
+ * <p>
+ * Because these issues are complex, the code itself is complex. To make the
+ * code easier to manage, each bit of functionality is encapsulated in a
+ * distinct class. Classes combine via composition to create a "framework"
+ * suitable for each kind of reader: whether it be early or late schema,
+ * file-based or something else, etc.
+ * <p>
  * The core concept is one of successive refinement of the project
  * list through a set of rewrites:
  * <ul>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
index c1e383e..f464bae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
@@ -37,7 +37,7 @@ public class ImpliedTupleRequest implements RequestedTuple {
       new ImpliedTupleRequest(false);
   public static final List<RequestedColumn> EMPTY_COLS = new ArrayList<>();
 
-  private boolean allProjected;
+  private final boolean allProjected;
 
   public ImpliedTupleRequest(boolean allProjected) {
     this.allProjected = allProjected;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
index cd782c7..4643c57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
@@ -73,7 +73,7 @@ import org.apache.drill.exec.record.metadata.TupleNameSpace;
 
 public class RequestedTupleImpl implements RequestedTuple {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class);
 
   private final RequestedColumnImpl parent;
   private final TupleNameSpace<RequestedColumn> projection = new TupleNameSpace<>();
@@ -86,6 +86,13 @@ public class RequestedTupleImpl implements RequestedTuple {
     this.parent = parent;
   }
 
+  public RequestedTupleImpl(List<RequestedColumn> cols) {
+    parent = null;
+    for (RequestedColumn col : cols) {
+      projection.add(col.name(), col);
+    }
+  }
+
   @Override
   public RequestedColumn get(String colName) {
     return projection.get(colName.toLowerCase());
@@ -119,10 +126,43 @@ public class RequestedTupleImpl implements RequestedTuple {
   }
 
   /**
+   * Create a requested tuple projection from a rewritten top-level
+   * projection list. The columns within the list have already been parsed to
+   * pick out arrays, maps and scalars. The list must not include the
+   * wildcard: a wildcard list must be passed in as a null list. An
+   * empty list means project nothing. Null list means project all, else
+   * project only the columns in the list.
+   *
+   * @param projList top-level, parsed columns
+   * @return the tuple projection for the top-leel row
+   */
+
+  public static RequestedTuple build(List<RequestedColumn> projList) {
+    if (projList == null) {
+      return new ImpliedTupleRequest(true);
+    }
+    if (projList.isEmpty()) {
+      return new ImpliedTupleRequest(false);
+    }
+    return new RequestedTupleImpl(projList);
+  }
+
+  /**
    * Parse a projection list. The list should consist of a list of column names;
-   * any wildcards should have been processed by the caller. An empty list means
+   * or wildcards. An empty list means
    * nothing is projected. A null list means everything is projected (that is, a
    * null list here is equivalent to a wildcard in the SELECT statement.)
+   * <p>
+   * The projection list may include both a wildcard and column names (as in
+   * the case of implicit columns.) This results in a final list that both
+   * says that everything is projected, and provides the list of columns.
+   * <p>
+   * Parsing is used at two different times. First, to parse the list from
+   * the physical operator. This has the case above: an explicit wildcard
+   * and/or additional columns. Then, this class is used again to prepare the
+   * physical projection used when reading. In this case, wildcards should
+   * be removed, implicit columns pulled out, and just the list of read-level
+   * columns should remain.
    *
    * @param projList
    *          the list of projected columns, or null if no projection is to be
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
index 3587e27..88ccd3c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
@@ -22,33 +22,42 @@ import static org.junit.Assert.fail;
 
 import java.util.List;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
+/**
+ * Test the "columns" array mechanism integrated with the scan schema
+ * orchestrator including simulating reading data.
+ */
+
+@Category(RowSetTests.class)
 public class TestColumnsArray extends SubOperatorTest {
 
-  /**
-   * Test columns array. The table must be able to support it by having a
-   * matching column.
-   */
+  private static class MockScanner {
+    ScanSchemaOrchestrator scanner;
+    ReaderSchemaOrchestrator reader;
+    ResultSetLoader loader;
+  }
 
-  @Test
-  public void testColumnsArray() {
+  private MockScanner buildScanner(List<SchemaPath> projList) {
+
+    MockScanner mock = new MockScanner();
 
     // Set up the file metadata manager
 
@@ -64,21 +73,19 @@ public class TestColumnsArray extends SubOperatorTest {
 
     // Configure the schema orchestrator
 
-    ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator());
-    scanner.withMetadata(metadataManager);
-    scanner.addParser(colsManager.projectionParser());
-    scanner.addResolver(colsManager.resolver());
+    mock.scanner = new ScanSchemaOrchestrator(fixture.allocator());
+    mock.scanner.withMetadata(metadataManager);
+    mock.scanner.addParser(colsManager.projectionParser());
+    mock.scanner.addResolver(colsManager.resolver());
 
-    // SELECT filename, columns, dir0 ...
+    // SELECT <proj list> ...
 
-    scanner.build(RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL,
-        ColumnsArrayManager.COLUMNS_COL,
-        ScanTestUtils.partitionColName(0)));
+    mock.scanner.build(projList);
 
     // FROM z.csv
 
     metadataManager.startFile(filePath);
-    ReaderSchemaOrchestrator reader = scanner.startReader();
+    mock.reader = mock.scanner.startReader();
 
     // Table schema (columns: VARCHAR[])
 
@@ -86,7 +93,25 @@ public class TestColumnsArray extends SubOperatorTest {
         .addArray(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR)
         .buildSchema();
 
-    ResultSetLoader loader = reader.makeTableLoader(tableSchema);
+    mock.loader = mock.reader.makeTableLoader(tableSchema);
+
+    // First empty batch
+
+    mock.reader.defineSchema();
+    return mock;
+  }
+
+  /**
+   * Test columns array. The table must be able to support it by having a
+   * matching column.
+   */
+
+  @Test
+  public void testColumnsArray() {
+
+    MockScanner mock = buildScanner(RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL,
+        ColumnsArrayManager.COLUMNS_COL,
+        ScanTestUtils.partitionColName(0)));
 
     // Verify empty batch.
 
@@ -99,18 +124,18 @@ public class TestColumnsArray extends SubOperatorTest {
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
          .build();
 
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-         .verifyAndClearAll(fixture.wrap(scanner.output()));
+      assertNotNull(mock.scanner.output());
+      RowSetUtilities.verify(expected,
+         fixture.wrap(mock.scanner.output()));
     }
 
     // Create a batch of data.
 
-    reader.startBatch();
-    loader.writer()
+    mock.reader.startBatch();
+    mock.loader.writer()
       .addRow(new Object[] {new String[] {"fred", "flintstone"}})
       .addRow(new Object[] {new String[] {"barney", "rubble"}});
-    reader.endBatch();
+    mock. reader.endBatch();
 
     // Verify
 
@@ -120,11 +145,100 @@ public class TestColumnsArray extends SubOperatorTest {
         .addRow("z.csv", new String[] {"barney", "rubble"}, "x")
         .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(mock.scanner.output()));
     }
 
-    scanner.close();
+    mock.scanner.close();
+  }
+
+  @Test
+  public void testWildcard() {
+
+    MockScanner mock = buildScanner(RowSetTestUtils.projectAll());
+
+    // Verify empty batch.
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("columns", MinorType.VARCHAR)
+        .buildSchema();
+    {
+      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+         .build();
+
+      assertNotNull(mock.scanner.output());
+      RowSetUtilities.verify(expected,
+         fixture.wrap(mock.scanner.output()));
+    }
+
+    // Create a batch of data.
+
+    mock.reader.startBatch();
+    mock.loader.writer()
+      .addRow(new Object[] {new String[] {"fred", "flintstone"}})
+      .addRow(new Object[] {new String[] {"barney", "rubble"}});
+    mock. reader.endBatch();
+
+    // Verify
+
+    {
+      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addSingleCol(new String[] {"fred", "flintstone"})
+        .addSingleCol(new String[] {"barney", "rubble"})
+        .build();
+
+      RowSetUtilities.verify(expected,
+          fixture.wrap(mock.scanner.output()));
+    }
+
+    mock.scanner.close();
+  }
+
+  @Test
+  public void testWildcardAndFileMetadata() {
+
+    MockScanner mock = buildScanner(RowSetTestUtils.projectList(
+        ScanTestUtils.FILE_NAME_COL,
+        SchemaPath.DYNAMIC_STAR,
+        ScanTestUtils.partitionColName(0)));
+
+    // Verify empty batch.
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("filename", MinorType.VARCHAR)
+        .addArray("columns", MinorType.VARCHAR)
+        .addNullable("dir0", MinorType.VARCHAR)
+        .buildSchema();
+    {
+      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+         .build();
+
+      assertNotNull(mock.scanner.output());
+      RowSetUtilities.verify(expected,
+         fixture.wrap(mock.scanner.output()));
+    }
+
+    // Create a batch of data.
+
+    mock.reader.startBatch();
+    mock.loader.writer()
+      .addRow(new Object[] {new String[] {"fred", "flintstone"}})
+      .addRow(new Object[] {new String[] {"barney", "rubble"}});
+    mock. reader.endBatch();
+
+    // Verify
+
+    {
+      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("z.csv", new String[] {"fred", "flintstone"}, "x")
+        .addRow("z.csv", new String[] {"barney", "rubble"}, "x")
+        .build();
+
+      RowSetUtilities.verify(expected,
+          fixture.wrap(mock.scanner.output()));
+    }
+
+    mock.scanner.close();
   }
 
   private ScanSchemaOrchestrator buildScan(List<SchemaPath> cols) {
@@ -160,6 +274,7 @@ public class TestColumnsArray extends SubOperatorTest {
     try {
       ReaderSchemaOrchestrator reader = scanner.startReader();
       reader.makeTableLoader(tableSchema);
+      reader.defineSchema();
       fail();
     } catch (IllegalStateException e) {
       // Expected
@@ -180,6 +295,7 @@ public class TestColumnsArray extends SubOperatorTest {
     try {
       ReaderSchemaOrchestrator reader = scanner.startReader();
       reader.makeTableLoader(tableSchema);
+      reader.defineSchema();
       fail();
     } catch (IllegalStateException e) {
       // Expected
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
index 4f32b56..e7a0188 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -46,7 +47,7 @@ import org.apache.drill.test.SubOperatorTest;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 
@@ -54,6 +55,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
  * Test the columns-array specific behavior in the columns scan framework.
  */
 
+@Category(RowSetTests.class)
 public class TestColumnsArrayFramework extends SubOperatorTest {
 
   private static final Path MOCK_FILE_PATH = new Path("file:/w/x/y/z.csv");
@@ -101,7 +103,7 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
     @Override
     public boolean open(ColumnsSchemaNegotiator negotiator) {
       this.negotiator = negotiator;
-      negotiator.setTableSchema(schema);
+      negotiator.setTableSchema(schema, true);
       negotiator.build();
       return true;
     }
@@ -115,6 +117,11 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
     public void close() { }
   }
 
+  /**
+   * Test including a column other than "columns". Occurs when
+   * using implicit columns.
+   */
+
   @Test
   public void testNonColumnsProjection() {
 
@@ -143,6 +150,10 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
     scanFixture.close();
   }
 
+  /**
+   * Test projecting just the `columns` column.
+   */
+
   @Test
   public void testColumnsProjection() {
 
@@ -171,6 +182,10 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
     scanFixture.close();
   }
 
+  /**
+   * Test including a specific index of `columns` such as
+   * `columns`[1].
+   */
   @Test
   public void testColumnsIndexProjection() {
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
index e2c8a21..d1e91a2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
@@ -35,9 +36,10 @@ import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
+@Category(RowSetTests.class)
 public class TestColumnsArrayParser extends SubOperatorTest {
 
   /**
@@ -255,5 +257,45 @@ public class TestColumnsArrayParser extends SubOperatorTest {
     assertEquals(FileMetadataColumn.ID, scanProj.columns().get(2).nodeType());
   }
 
-  // TODO: Test Columns element projection
+  /**
+   * If a query is of the form:
+   * <pre><code>
+   * select * from dfs.`multilevel/csv` where columns[1] < 1000
+   * </code><pre>
+   * Then the projection list passed to the scan operator
+   * includes both the wildcard and the `columns` array.
+   * We can ignore one of them.
+   */
+
+  @Test
+  public void testWildcardAndColumns() {
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList(
+            SchemaPath.DYNAMIC_STAR,
+            ColumnsArrayManager.COLUMNS_COL),
+        ScanTestUtils.parsers(new ColumnsArrayParser(true)));
+
+    assertFalse(scanProj.projectAll());
+    assertEquals(2, scanProj.requestedCols().size());
+
+    assertEquals(1, scanProj.columns().size());
+    assertEquals(ColumnsArrayManager.COLUMNS_COL, scanProj.columns().get(0).name());
+
+    // Verify column type
+
+    assertEquals(UnresolvedColumnsArrayColumn.ID, scanProj.columns().get(0).nodeType());
+  }
+
+  @Test
+  public void testColumnsAsMap() {
+    try {
+        new ScanLevelProjection(
+          RowSetTestUtils.projectList("columns.x"),
+          ScanTestUtils.parsers(new ColumnsArrayParser(true)));
+        fail();
+    }
+    catch (UserException e) {
+      // Expected
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java
deleted file mode 100644
index 330a661..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadata;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumnDefn;
-import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader;
-import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec;
-import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
-import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-public class TestConstantColumnLoader extends SubOperatorTest {
-
-  private static class DummyColumn implements ConstantColumnSpec {
-
-    private String name;
-    private MaterializedField schema;
-    private String value;
-
-    public DummyColumn(String name, MajorType type, String value) {
-      this.name = name;
-      this.schema = MaterializedField.create(name, type);
-      this.value = value;
-    }
-
-    @Override
-    public String name() { return name; }
-
-    @Override
-    public MaterializedField schema() { return schema; }
-
-    @Override
-    public String value() { return value; }
-  }
-
-  /**
-   * Test the static column loader using one column of each type.
-   * The null column is of type int, but the associated value is of
-   * type string. This is a bit odd, but works out because we detect that
-   * the string value is null and call setNull on the writer, and avoid
-   * using the actual data.
-   */
-
-  @Test
-  public void testConstantColumnLoader() {
-
-    MajorType aType = MajorType.newBuilder()
-        .setMinorType(MinorType.VARCHAR)
-        .setMode(DataMode.REQUIRED)
-        .build();
-    MajorType bType = MajorType.newBuilder()
-        .setMinorType(MinorType.VARCHAR)
-        .setMode(DataMode.OPTIONAL)
-        .build();
-
-    List<ConstantColumnSpec> defns = new ArrayList<>();
-    defns.add(
-        new DummyColumn("a", aType, "a-value" ));
-    defns.add(
-        new DummyColumn("b", bType, "b-value" ));
-
-    ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
-    ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns);
-
-    // Create a batch
-
-    staticLoader.load(2);
-
-    // Verify
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("a", aType)
-        .add("b", bType)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow("a-value", "b-value")
-        .addRow("a-value", "b-value")
-        .build();
-
-    new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
-    staticLoader.close();
-  }
-
-  @Test
-  public void testFileMetadata() {
-
-    FileMetadata fileInfo = new FileMetadata(new Path("hdfs:///w/x/y/z.csv"), new Path("hdfs:///w"));
-    List<ConstantColumnSpec> defns = new ArrayList<>();
-    FileMetadataColumnDefn iDefn = new FileMetadataColumnDefn(
-        ScanTestUtils.SUFFIX_COL, ImplicitFileColumns.SUFFIX);
-    FileMetadataColumn iCol = new FileMetadataColumn(ScanTestUtils.SUFFIX_COL,
-        iDefn, fileInfo, null, 0);
-    defns.add(iCol);
-
-    String partColName = ScanTestUtils.partitionColName(1);
-    PartitionColumn pCol = new PartitionColumn(partColName, 1, fileInfo, null, 0);
-    defns.add(pCol);
-
-    ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
-    ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns);
-
-    // Create a batch
-
-    staticLoader.load(2);
-
-    // Verify
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add(ScanTestUtils.SUFFIX_COL, MinorType.VARCHAR)
-        .addNullable(partColName, MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow("csv", "y")
-        .addRow("csv", "y")
-        .build();
-
-    new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
-    staticLoader.close();
-  }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
index 08aeed5..a6de5e6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import java.util.List;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
@@ -33,9 +34,10 @@ import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
+@Category(RowSetTests.class)
 public class TestFileMetadataColumnParser extends SubOperatorTest {
 
   @Test
@@ -135,8 +137,12 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
     assertEquals(PartitionColumn.ID, scanProj.columns().get(0).nodeType());
   }
 
+  /**
+   * Test wildcard expansion.
+   */
+
   @Test
-  public void testWildcard() {
+  public void testRevisedWildcard() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
@@ -153,15 +159,45 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
   }
 
   /**
+   * Legacy (prior version) wildcard expansion always expands partition
+   * columns.
+   */
+
+  @Test
+  public void testLegacyWildcard() {
+    Path filePath = new Path("hdfs:///w/x/y/z.csv");
+    FileMetadataManager metadataManager = new FileMetadataManager(
+        fixture.getOptionManager(),
+        true, // Use legacy wildcard expansion
+        true, // Put partitions at end
+        new Path("hdfs:///w"),
+        Lists.newArrayList(filePath));
+
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        Lists.newArrayList(metadataManager.projectionParser()));
+
+    List<ColumnProjection> cols = scanProj.columns();
+    assertEquals(3, cols.size());
+    assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+    assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+    assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
+    assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+    assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
+  }
+
+  /**
    * Combine wildcard and file metadata columms. The wildcard expands
    * table columns but not metadata columns.
    */
 
   @Test
-  public void testWildcardAndFileMetadata() {
+  public void testLegacyWildcardAndFileMetadata() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
+        true, // Use legacy wildcard expansion
+        false, // Put partitions at end
         new Path("hdfs:///w"),
         Lists.newArrayList(filePath));
 
@@ -173,10 +209,12 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         Lists.newArrayList(metadataManager.projectionParser()));
 
     List<ColumnProjection> cols = scanProj.columns();
-    assertEquals(3, cols.size());
+    assertEquals(5, cols.size());
     assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
     assertEquals(FileMetadataColumn.ID, cols.get(1).nodeType());
     assertEquals(FileMetadataColumn.ID, cols.get(2).nodeType());
+    assertEquals(PartitionColumn.ID, cols.get(3).nodeType());
+    assertEquals(PartitionColumn.ID, cols.get(4).nodeType());
   }
 
   /**
@@ -185,10 +223,12 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
    */
 
   @Test
-  public void testWildcardAndFileMetadataMixed() {
+  public void testLegacyWildcardAndFileMetadataMixed() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
+        true, // Use legacy wildcard expansion
+        false, // Put partitions at end
         new Path("hdfs:///w"),
         Lists.newArrayList(filePath));
 
@@ -200,18 +240,25 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
         Lists.newArrayList(metadataManager.projectionParser()));
 
     List<ColumnProjection> cols = scanProj.columns();
-    assertEquals(3, cols.size());
+    assertEquals(5, cols.size());
     assertEquals(FileMetadataColumn.ID, cols.get(0).nodeType());
     assertEquals(UnresolvedColumn.WILDCARD, cols.get(1).nodeType());
     assertEquals(FileMetadataColumn.ID, cols.get(2).nodeType());
+    assertEquals(PartitionColumn.ID, cols.get(3).nodeType());
+    assertEquals(PartitionColumn.ID, cols.get(4).nodeType());
   }
 
   /**
-   * Include both a wildcard and a partition column.
+   * Include both a wildcard and a partition column. The wildcard, in
+   * legacy mode, will create partition columns for any partitions not
+   * mentioned in the project list.
+   * <p>
+   * Tests proposed functionality: included only requested partition
+   * columns.
    */
 
   @Test
-  public void testWildcardAndPartition() {
+  public void testRevisedWildcardAndPartition() {
     Path filePath = new Path("hdfs:///w/x/y/z.csv");
     FileMetadataManager metadataManager = new FileMetadataManager(
         fixture.getOptionManager(),
@@ -229,6 +276,113 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
       assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
   }
 
+  @Test
+  public void testLegacyWildcardAndPartition() {
+    Path filePath = new Path("hdfs:///w/x/y/z.csv");
+    FileMetadataManager metadataManager = new FileMetadataManager(
+        fixture.getOptionManager(),
+        true, // Use legacy wildcard expansion
+        true, // Put partitions at end
+        new Path("hdfs:///w"),
+        Lists.newArrayList(filePath));
+
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
+            ScanTestUtils.partitionColName(8)),
+        Lists.newArrayList(metadataManager.projectionParser()));
+
+      List<ColumnProjection> cols = scanProj.columns();
+      assertEquals(4, cols.size());
+      assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+      assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+      assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
+      assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+      assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
+      assertEquals(PartitionColumn.ID, cols.get(3).nodeType());
+      assertEquals(8, ((PartitionColumn) cols.get(3)).partition());
+  }
+
+  @Test
+  public void testPreferredPartitionExpansion() {
+    Path filePath = new Path("hdfs:///w/x/y/z.csv");
+    FileMetadataManager metadataManager = new FileMetadataManager(
+        fixture.getOptionManager(),
+        true, // Use legacy wildcard expansion
+        false, // Put partitions at end
+        new Path("hdfs:///w"),
+        Lists.newArrayList(filePath));
+
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
+            ScanTestUtils.partitionColName(8)),
+        Lists.newArrayList(metadataManager.projectionParser()));
+
+      List<ColumnProjection> cols = scanProj.columns();
+      assertEquals(4, cols.size());
+      assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+      assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+      assertEquals(8, ((PartitionColumn) cols.get(1)).partition());
+      assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+      assertEquals(0, ((PartitionColumn) cols.get(2)).partition());
+      assertEquals(PartitionColumn.ID, cols.get(3).nodeType());
+      assertEquals(1, ((PartitionColumn) cols.get(3)).partition());
+  }
+
+  /**
+   * Test a case like:<br>
+   * <code>SELECT *, dir1 FROM ...</code><br>
+   * The projection list includes "dir1". The wildcard will
+   * fill in "dir0".
+   */
+
+  @Test
+  public void testLegacyWildcardAndPartitionWithOverlap() {
+    Path filePath = new Path("hdfs:///w/x/y/z.csv");
+    FileMetadataManager metadataManager = new FileMetadataManager(
+        fixture.getOptionManager(),
+        true, // Use legacy wildcard expansion
+        true, // Put partitions at end
+        new Path("hdfs:///w"),
+        Lists.newArrayList(filePath));
+
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
+            ScanTestUtils.partitionColName(1)),
+        Lists.newArrayList(metadataManager.projectionParser()));
+
+      List<ColumnProjection> cols = scanProj.columns();
+      assertEquals(3, cols.size());
+      assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+      assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+      assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
+      assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+      assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
+  }
+
+  @Test
+  public void testPreferedWildcardExpansionWithOverlap() {
+    Path filePath = new Path("hdfs:///w/x/y/z.csv");
+    FileMetadataManager metadataManager = new FileMetadataManager(
+        fixture.getOptionManager(),
+        true, // Use legacy wildcard expansion
+        false, // Put partitions at end
+        new Path("hdfs:///w"),
+        Lists.newArrayList(filePath));
+
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
+            ScanTestUtils.partitionColName(1)),
+        Lists.newArrayList(metadataManager.projectionParser()));
+
+      List<ColumnProjection> cols = scanProj.columns();
+      assertEquals(3, cols.size());
+      assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+      assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+      assertEquals(1, ((PartitionColumn) cols.get(1)).partition());
+      assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+      assertEquals(0, ((PartitionColumn) cols.get(2)).partition());
+  }
+
   /**
    * Verify that names that look like metadata columns, but appear
    * to be maps or arrays, are not interpreted as metadata. That is,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
index 9161932..314bc2a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 
 import java.util.List;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadata;
@@ -46,9 +47,10 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
+@Category(RowSetTests.class)
 public class TestFileMetadataProjection extends SubOperatorTest {
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index a10e766..2fdf3e6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -32,7 +33,7 @@ import org.apache.drill.exec.physical.impl.scan.TestScanOperatorExec.AbstractSca
 import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework;
 import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderCreator;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
@@ -50,6 +51,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * Tests the file metadata extensions to the file operator framework.
@@ -57,6 +59,7 @@ import org.junit.Test;
  * verified the underlying mechanisms.
  */
 
+@Category(RowSetTests.class)
 public class TestFileScanFramework extends SubOperatorTest {
 
   private static final String MOCK_FILE_NAME = "foo.csv";
@@ -117,7 +120,7 @@ public class TestFileScanFramework extends SubOperatorTest {
     }
   }
 
-  public static class FileScanOpFixture extends BaseFileScanOpFixture implements FileReaderCreator {
+  public static class FileScanOpFixture extends BaseFileScanOpFixture implements FileReaderFactory {
 
     protected final List<MockFileReader> readers = new ArrayList<>();
     protected Iterator<MockFileReader> readerIter;
@@ -252,7 +255,7 @@ public class TestFileScanFramework extends SubOperatorTest {
           .add("a", MinorType.INT)
           .addNullable("b", MinorType.VARCHAR, 10)
           .buildSchema();
-      schemaNegotiator.setTableSchema(schema);
+      schemaNegotiator.setTableSchema(schema, true);
       tableLoader = schemaNegotiator.build();
       return true;
     }
@@ -486,7 +489,7 @@ public class TestFileScanFramework extends SubOperatorTest {
             .add("b", MinorType.INT)
             .resumeSchema()
           .buildSchema();
-      schemaNegotiator.setTableSchema(schema);
+      schemaNegotiator.setTableSchema(schema, true);
       tableLoader = schemaNegotiator.build();
       return true;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java
deleted file mode 100644
index a413052..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import static org.junit.Assert.assertSame;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
-import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
-import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
-import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.junit.Test;
-
-public class TestNullColumnLoader extends SubOperatorTest {
-
-  private ResolvedNullColumn makeNullCol(String name, MajorType nullType) {
-
-    // For this test, we don't need the projection, so just
-    // set it to null.
-
-    return new ResolvedNullColumn(name, nullType, null, 0);
-  }
-
-  private ResolvedNullColumn makeNullCol(String name) {
-    return makeNullCol(name, null);
-  }
-
-  /**
-   * Test the simplest case: default null type, nothing in the vector
-   * cache. Specify no column type, the special NULL type, or a
-   * predefined type. Output types should be set accordingly.
-   */
-
-  @Test
-  public void testBasics() {
-
-    List<ResolvedNullColumn> defns = new ArrayList<>();
-    defns.add(makeNullCol("unspecified", null));
-    defns.add(makeNullCol("nullType", Types.optional(MinorType.NULL)));
-    defns.add(makeNullCol("specifiedOpt", Types.optional(MinorType.VARCHAR)));
-    defns.add(makeNullCol("specifiedReq", Types.required(MinorType.VARCHAR)));
-    defns.add(makeNullCol("specifiedArray", Types.repeated(MinorType.VARCHAR)));
-
-    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
-    NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, null, false);
-
-    // Create a batch
-
-    VectorContainer output = staticLoader.load(2);
-
-    // Verify values and types
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
-        .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
-        .addNullable("specifiedOpt", MinorType.VARCHAR)
-        .addNullable("specifiedReq", MinorType.VARCHAR)
-        .addArray("specifiedArray", MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(null, null, null, null, new String[] {})
-        .addRow(null, null, null, null, new String[] {})
-        .build();
-
-    new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(output));
-    staticLoader.close();
-  }
-
-  @Test
-  public void testCustomNullType() {
-
-    List<ResolvedNullColumn> defns = new ArrayList<>();
-    defns.add(makeNullCol("unspecified", null));
-    defns.add(makeNullCol("nullType", MajorType.newBuilder()
-        .setMinorType(MinorType.NULL)
-        .setMode(DataMode.OPTIONAL)
-        .build()));
-    defns.add(makeNullCol("nullTypeReq", MajorType.newBuilder()
-        .setMinorType(MinorType.NULL)
-        .setMode(DataMode.REQUIRED)
-        .build()));
-
-    // Null type array does not make sense, so is not tested.
-
-    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
-    MajorType nullType = MajorType.newBuilder()
-        .setMinorType(MinorType.VARCHAR)
-        .setMode(DataMode.OPTIONAL)
-        .build();
-    NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
-
-    // Create a batch
-
-    VectorContainer output = staticLoader.load(2);
-
-    // Verify values and types
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("unspecified", nullType)
-        .add("nullType", nullType)
-        .add("nullTypeReq", nullType)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(null, null, null)
-        .addRow(null, null, null)
-        .build();
-
-    new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(output));
-    staticLoader.close();
-  }
-
-  @Test
-  public void testCachedTypesMapToNullable() {
-
-    List<ResolvedNullColumn> defns = new ArrayList<>();
-    defns.add(makeNullCol("req"));
-    defns.add(makeNullCol("opt"));
-    defns.add(makeNullCol("rep"));
-    defns.add(makeNullCol("unk"));
-
-    // Populate the cache with a column of each mode.
-
-    ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
-    cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
-    ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
-    ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
-
-    // Use nullable Varchar for unknown null columns.
-
-    MajorType nullType = MajorType.newBuilder()
-        .setMinorType(MinorType.VARCHAR)
-        .setMode(DataMode.OPTIONAL)
-        .build();
-    NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
-
-    // Create a batch
-
-    VectorContainer output = staticLoader.load(2);
-
-    // Verify vectors are reused
-
-    assertSame(opt, output.getValueVector(1).getValueVector());
-    assertSame(rep, output.getValueVector(2).getValueVector());
-
-    // Verify values and types
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .addNullable("req", MinorType.FLOAT8)
-        .addNullable("opt", MinorType.FLOAT8)
-        .addArray("rep", MinorType.FLOAT8)
-        .addNullable("unk", MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(null, null, new int[] { }, null)
-        .addRow(null, null, new int[] { }, null)
-        .build();
-
-    new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(output));
-    staticLoader.close();
-  }
-
-  @Test
-  public void testCachedTypesAllowRequired() {
-
-    List<ResolvedNullColumn> defns = new ArrayList<>();
-    defns.add(makeNullCol("req"));
-    defns.add(makeNullCol("opt"));
-    defns.add(makeNullCol("rep"));
-    defns.add(makeNullCol("unk"));
-
-    // Populate the cache with a column of each mode.
-
-    ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
-    cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
-    ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
-    ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
-
-    // Use nullable Varchar for unknown null columns.
-
-    MajorType nullType = MajorType.newBuilder()
-        .setMinorType(MinorType.VARCHAR)
-        .setMode(DataMode.OPTIONAL)
-        .build();
-    NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, true);
-
-    // Create a batch
-
-    VectorContainer output = staticLoader.load(2);
-
-    // Verify vectors are reused
-
-    assertSame(opt, output.getValueVector(1).getValueVector());
-    assertSame(rep, output.getValueVector(2).getValueVector());
-
-    // Verify values and types
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("req", MinorType.FLOAT8)
-        .addNullable("opt", MinorType.FLOAT8)
-        .addArray("rep", MinorType.FLOAT8)
-        .addNullable("unk", MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(0.0, null, new int[] { }, null)
-        .addRow(0.0, null, new int[] { }, null)
-        .build();
-
-    new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(output));
-    staticLoader.close();
-  }
-
-  @Test
-  public void testNullColumnBuilder() {
-
-    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-
-    builder.add("unspecified");
-    builder.add("nullType", Types.optional(MinorType.NULL));
-    builder.add("specifiedOpt", Types.optional(MinorType.VARCHAR));
-    builder.add("specifiedReq", Types.required(MinorType.VARCHAR));
-    builder.add("specifiedArray", Types.repeated(MinorType.VARCHAR));
-    builder.build(cache);
-
-    // Create a batch
-
-    builder.load(2);
-
-    // Verify values and types
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
-        .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
-        .addNullable("specifiedOpt", MinorType.VARCHAR)
-        .addNullable("specifiedReq", MinorType.VARCHAR)
-        .addArray("specifiedArray", MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(null, null, null, null, new String[] {})
-        .addRow(null, null, null, null, new String[] {})
-        .build();
-
-    new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(builder.output()));
-    builder.close();
-  }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java
deleted file mode 100644
index 0794557..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java
+++ /dev/null
@@ -1,459 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
-import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
-import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
-import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.junit.Test;
-
-import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
-import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
-import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
-
-
-/**
- * Test the row batch merger by merging two batches. Tests both the
- * "direct" and "exchange" cases. Direct means that the output container
- * contains the source vector directly: they are the same vectors.
- * Exchange means we have two vectors, but we swap the underlying
- * Drillbufs to effectively shift data from source to destination
- * vector.
- */
-
-public class TestRowBatchMerger extends SubOperatorTest {
-
-  public static class RowSetSource implements VectorSource {
-
-    private SingleRowSet rowSet;
-
-    public RowSetSource(SingleRowSet rowSet) {
-      this.rowSet = rowSet;
-    }
-
-    public RowSet rowSet() { return rowSet; }
-
-    public void clear() {
-      rowSet.clear();
-    }
-
-    @Override
-    public ValueVector vector(int index) {
-      return rowSet.container().getValueVector(index).getValueVector();
-    }
-  }
-
-  private RowSetSource makeFirst() {
-    BatchSchema firstSchema = new SchemaBuilder()
-        .add("d", MinorType.VARCHAR)
-        .add("a", MinorType.INT)
-        .build();
-    return new RowSetSource(
-        fixture.rowSetBuilder(firstSchema)
-          .addRow("barney", 10)
-          .addRow("wilma", 20)
-          .build());
-  }
-
-  private RowSetSource makeSecond() {
-    BatchSchema secondSchema = new SchemaBuilder()
-        .add("b", MinorType.INT)
-        .add("c", MinorType.VARCHAR)
-        .build();
-    return new RowSetSource(
-        fixture.rowSetBuilder(secondSchema)
-          .addRow(1, "foo.csv")
-          .addRow(2, "foo.csv")
-          .build());
-  }
-
-  public static class TestProjection extends ResolvedColumn {
-
-    public TestProjection(VectorSource source, int sourceIndex) {
-      super(source, sourceIndex);
-    }
-
-    @Override
-    public String name() { return null; }
-
-    @Override
-    public int nodeType() { return -1; }
-
-    @Override
-    public MaterializedField schema() { return null; }
-  }
-
-  @Test
-  public void testSimpleFlat() {
-
-    // Create the first batch
-
-    RowSetSource first = makeFirst();
-
-    // Create the second batch
-
-    RowSetSource second = makeSecond();
-
-    ResolvedRow resolvedTuple = new ResolvedRow(null);
-    resolvedTuple.add(new TestProjection(first, 1));
-    resolvedTuple.add(new TestProjection(second, 0));
-    resolvedTuple.add(new TestProjection(second, 1));
-    resolvedTuple.add(new TestProjection(first, 0));
-
-    // Do the merge
-
-    VectorContainer output = new VectorContainer(fixture.allocator());
-    resolvedTuple.project(null, output);
-    output.setRecordCount(first.rowSet().rowCount());
-    RowSet result = fixture.wrap(output);
-
-    // Verify
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.INT)
-        .add("c", MinorType.VARCHAR)
-        .add("d", MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(10, 1, "foo.csv", "barney")
-        .addRow(20, 2, "foo.csv", "wilma")
-        .build();
-
-    new RowSetComparison(expected)
-      .verifyAndClearAll(result);
-  }
-
-  @Test
-  public void testImplicitFlat() {
-
-    // Create the first batch
-
-    RowSetSource first = makeFirst();
-
-    // Create the second batch
-
-    RowSetSource second = makeSecond();
-
-    ResolvedRow resolvedTuple = new ResolvedRow(null);
-    resolvedTuple.add(new TestProjection(resolvedTuple, 1));
-    resolvedTuple.add(new TestProjection(second, 0));
-    resolvedTuple.add(new TestProjection(second, 1));
-    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
-
-    // Do the merge
-
-    VectorContainer output = new VectorContainer(fixture.allocator());
-    resolvedTuple.project(first.rowSet().container(), output);
-    output.setRecordCount(first.rowSet().rowCount());
-    RowSet result = fixture.wrap(output);
-
-    // Verify
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.INT)
-        .add("c", MinorType.VARCHAR)
-        .add("d", MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(10, 1, "foo.csv", "barney")
-        .addRow(20, 2, "foo.csv", "wilma")
-        .build();
-
-    new RowSetComparison(expected)
-      .verifyAndClearAll(result);
-  }
-
-  @Test
-  public void testFlatWithNulls() {
-
-    // Create the first batch
-
-    RowSetSource first = makeFirst();
-
-    // Create null columns
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-
-    ResolvedRow resolvedTuple = new ResolvedRow(builder);
-    resolvedTuple.add(new TestProjection(resolvedTuple, 1));
-    resolvedTuple.add(resolvedTuple.nullBuilder().add("null1"));
-    resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
-    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
-
-    // Build the null values
-
-    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
-    builder.build(cache);
-    builder.load(first.rowSet().rowCount());
-
-    // Do the merge
-
-    VectorContainer output = new VectorContainer(fixture.allocator());
-    resolvedTuple.project(first.rowSet().container(), output);
-    output.setRecordCount(first.rowSet().rowCount());
-    RowSet result = fixture.wrap(output);
-
-    // Verify
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .addNullable("null1", MinorType.INT)
-        .addNullable("null2", MinorType.VARCHAR)
-        .add("d", MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(10, null, null, "barney")
-        .addRow(20, null, null, "wilma")
-        .build();
-
-    new RowSetComparison(expected)
-      .verifyAndClearAll(result);
-    builder.close();
-  }
-
-  /**
-   * Test the ability to create maps from whole cloth if requested in
-   * the projection list, and the map is not available from the data
-   * source.
-   */
-
-  @Test
-  public void testNullMaps() {
-
-    // Create the first batch
-
-    RowSetSource first = makeFirst();
-
-    // Create null columns
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow resolvedTuple = new ResolvedRow(builder);
-
-    resolvedTuple.add(new TestProjection(resolvedTuple, 1));
-
-    ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1");
-    ResolvedTuple nullMap = nullMapCol.members();
-    nullMap.add(nullMap.nullBuilder().add("null1"));
-    nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
-
-    ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2");
-    ResolvedTuple nullMap2 = nullMapCol2.members();
-    nullMap2.add(nullMap2.nullBuilder().add("null3"));
-    nullMap.add(nullMapCol2);
-
-    resolvedTuple.add(nullMapCol);
-    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
-
-    // Build the null values
-
-    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
-    resolvedTuple.buildNulls(cache);
-
-    // LoadNulls
-
-    resolvedTuple.loadNulls(first.rowSet().rowCount());
-
-    // Do the merge
-
-    VectorContainer output = new VectorContainer(fixture.allocator());
-    resolvedTuple.project(first.rowSet().container(), output);
-    resolvedTuple.setRowCount(first.rowSet().rowCount());
-    RowSet result = fixture.wrap(output);
-
-    // Verify
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .addMap("map1")
-          .addNullable("null1", MinorType.INT)
-          .addNullable("null2", MinorType.VARCHAR)
-          .addMap("map2")
-            .addNullable("null3", MinorType.INT)
-            .resumeMap()
-          .resumeSchema()
-        .add("d", MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(10, mapValue(null, null, singleMap(null)), "barney")
-        .addRow(20, mapValue(null, null, singleMap(null)), "wilma")
-        .build();
-
-    new RowSetComparison(expected)
-      .verifyAndClearAll(result);
-    resolvedTuple.close();
-  }
-
-  /**
-   * Test that the merger mechanism can rewrite a map to include
-   * projected null columns.
-   */
-
-  @Test
-  public void testMapRevision() {
-
-    // Create the first batch
-
-    BatchSchema inputSchema = new SchemaBuilder()
-        .add("b", MinorType.VARCHAR)
-        .addMap("a")
-          .add("c", MinorType.INT)
-          .resumeSchema()
-        .build();
-    RowSetSource input = new RowSetSource(
-        fixture.rowSetBuilder(inputSchema)
-          .addRow("barney", singleMap(10))
-          .addRow("wilma", singleMap(20))
-          .build());
-
-    // Create mappings
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow resolvedTuple = new ResolvedRow(builder);
-
-    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
-    ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
-        inputSchema.getColumn(1), 1);
-    resolvedTuple.add(mapCol);
-    ResolvedTuple map = mapCol.members();
-    map.add(new TestProjection(map, 0));
-    map.add(map.nullBuilder().add("null1"));
-
-    // Build the null values
-
-    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
-    resolvedTuple.buildNulls(cache);
-
-    // LoadNulls
-
-    resolvedTuple.loadNulls(input.rowSet().rowCount());
-
-    // Do the merge
-
-    VectorContainer output = new VectorContainer(fixture.allocator());
-    resolvedTuple.project(input.rowSet().container(), output);
-    output.setRecordCount(input.rowSet().rowCount());
-    RowSet result = fixture.wrap(output);
-
-    // Verify
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("b", MinorType.VARCHAR)
-        .addMap("a")
-          .add("c", MinorType.INT)
-          .addNullable("null1", MinorType.INT)
-          .resumeSchema()
-        .build();
-    RowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow("barney", mapValue(10, null))
-        .addRow("wilma", mapValue(20, null))
-        .build();
-
-    new RowSetComparison(expected)
-      .verifyAndClearAll(result);
-  }
-
-  /**
-   * Test that the merger mechanism can rewrite a map array to include
-   * projected null columns.
-   */
-
-  @Test
-  public void testMapArrayRevision() {
-
-    // Create the first batch
-
-    BatchSchema inputSchema = new SchemaBuilder()
-        .add("b", MinorType.VARCHAR)
-        .addMapArray("a")
-          .add("c", MinorType.INT)
-          .resumeSchema()
-        .build();
-    RowSetSource input = new RowSetSource(
-        fixture.rowSetBuilder(inputSchema)
-          .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12)))
-          .addRow("wilma", mapArray(singleMap(20), singleMap(21)))
-          .build());
-
-    // Create mappings
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow resolvedTuple = new ResolvedRow(builder);
-
-    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
-    ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
-        inputSchema.getColumn(1), 1);
-    resolvedTuple.add(mapCol);
-    ResolvedTuple map = mapCol.members();
-    map.add(new TestProjection(map, 0));
-    map.add(map.nullBuilder().add("null1"));
-
-    // Build the null values
-
-    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
-    resolvedTuple.buildNulls(cache);
-
-    // LoadNulls
-
-    resolvedTuple.loadNulls(input.rowSet().rowCount());
-
-    // Do the merge
-
-    VectorContainer output = new VectorContainer(fixture.allocator());
-    resolvedTuple.project(input.rowSet().container(), output);
-    output.setRecordCount(input.rowSet().rowCount());
-    RowSet result = fixture.wrap(output);
-
-    // Verify
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("b", MinorType.VARCHAR)
-        .addMapArray("a")
-          .add("c", MinorType.INT)
-          .addNullable("null1", MinorType.INT)
-          .resumeSchema()
-        .build();
-    RowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow("barney", mapArray(
-            mapValue(10, null), mapValue(11, null), mapValue(12, null)))
-        .addRow("wilma", mapArray(
-            mapValue(20, null), mapValue(21, null)))
-        .build();
-
-    new RowSetComparison(expected)
-      .verifyAndClearAll(result);
-  }
-
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java
index 912a717..03587cb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.scan;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.AbstractSubScan;
@@ -34,6 +35,7 @@ import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import io.netty.buffer.DrillBuf;
 
@@ -42,6 +44,7 @@ import io.netty.buffer.DrillBuf;
  * set follows the same semantics as the original set.
  */
 
+@Category(RowSetTests.class)
 public class TestScanBatchWriters extends SubOperatorTest {
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java
deleted file mode 100644
index 558f761..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
-import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
-import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.record.metadata.ProjectionType;
-import org.apache.drill.test.SubOperatorTest;
-import org.junit.Test;
-
-/**
- * Test the level of projection done at the level of the scan as a whole;
- * before knowledge of table "implicit" columns or the specific table schema.
- */
-
-public class TestScanLevelProjection extends SubOperatorTest {
-
-  /**
-   * Basic test: select a set of columns (a, b, c) when the
-   * data source has an early schema of (a, c, d). (a, c) are
-   * projected, (d) is null.
-   */
-
-  @Test
-  public void testBasics() {
-
-    // Simulate SELECT a, b, c ...
-    // Build the projection plan and verify
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("a", "b", "c"),
-        ScanTestUtils.parsers());
-    assertFalse(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
-
-    assertEquals(3, scanProj.requestedCols().size());
-    assertEquals("a", scanProj.requestedCols().get(0).rootName());
-    assertEquals("b", scanProj.requestedCols().get(1).rootName());
-    assertEquals("c", scanProj.requestedCols().get(2).rootName());
-
-    assertEquals(3, scanProj.columns().size());
-    assertEquals("a", scanProj.columns().get(0).name());
-    assertEquals("b", scanProj.columns().get(1).name());
-    assertEquals("c", scanProj.columns().get(2).name());
-
-    // Verify column type
-
-    assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
-  }
-
-  @Test
-  public void testMap() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
-        ScanTestUtils.parsers());
-    assertFalse(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
-
-    assertEquals(3, scanProj.columns().size());
-    assertEquals("a", scanProj.columns().get(0).name());
-    assertEquals("b", scanProj.columns().get(1).name());
-    assertEquals("c", scanProj.columns().get(2).name());
-
-    // Verify column type
-
-    assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
-
-    // Map structure
-
-    RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
-    assertTrue(a.isTuple());
-    assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("x"));
-    assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("y"));
-    assertEquals(ProjectionType.UNPROJECTED,  a.mapProjection().projectionType("z"));
-
-    RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element();
-    assertTrue(c.isSimple());
-  }
-
-  @Test
-  public void testArray() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("a[1]", "a[3]"),
-        ScanTestUtils.parsers());
-    assertFalse(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
-
-    assertEquals(1, scanProj.columns().size());
-    assertEquals("a", scanProj.columns().get(0).name());
-
-    // Verify column type
-
-    assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
-
-    // Map structure
-
-    RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
-    assertTrue(a.isArray());
-    assertFalse(a.hasIndex(0));
-    assertTrue(a.hasIndex(1));
-    assertFalse(a.hasIndex(2));
-    assertTrue(a.hasIndex(3));
-  }
-
-  /**
-   * Simulate a SELECT * query by passing "**" (Drill's internal version
-   * of the wildcard) as a column name.
-   */
-
-  @Test
-  public void testWildcard() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    assertTrue(scanProj.projectAll());
-    assertFalse(scanProj.projectNone());
-    assertEquals(1, scanProj.requestedCols().size());
-    assertTrue(scanProj.requestedCols().get(0).isDynamicStar());
-
-    assertEquals(1, scanProj.columns().size());
-    assertEquals(SchemaPath.DYNAMIC_STAR, scanProj.columns().get(0).name());
-
-    // Verify bindings
-
-    assertEquals(scanProj.columns().get(0).name(), scanProj.requestedCols().get(0).rootName());
-
-    // Verify column type
-
-    assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType());
-  }
-
-  /**
-   * Test an empty projection which occurs in a
-   * SELECT COUNT(*) query.
-   */
-
-  @Test
-  public void testEmptyProjection() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList(),
-        ScanTestUtils.parsers());
-
-    assertFalse(scanProj.projectAll());
-    assertTrue(scanProj.projectNone());
-    assertEquals(0, scanProj.requestedCols().size());
-  }
-
-  /**
-   * Can't include both a wildcard and a column name.
-   */
-
-  @Test
-  public void testErrorWildcardAndColumns() {
-    try {
-      new ScanLevelProjection(
-          RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"),
-          ScanTestUtils.parsers());
-      fail();
-    } catch (IllegalArgumentException e) {
-      // Expected
-    }
-  }
-
-  /**
-   * Can't include both a column name and a wildcard.
-   */
-
-  @Test
-  public void testErrorColumnAndWildcard() {
-    try {
-      new ScanLevelProjection(
-          RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR),
-          ScanTestUtils.parsers());
-      fail();
-    } catch (IllegalArgumentException e) {
-      // Expected
-    }
-  }
-
-  /**
-   * Can't include a wildcard twice.
-   * <p>
-   * Note: Drill actually allows this, but the work should be done
-   * in the project operator; scan should see at most one wildcard.
-   */
-
-  @Test
-  public void testErrorTwoWildcards() {
-    try {
-      new ScanLevelProjection(
-          RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR),
-          ScanTestUtils.parsers());
-      fail();
-    } catch (UserException e) {
-      // Expected
-    }
-  }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java
index 33752a7..386849b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -54,6 +55,7 @@ import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * Test of the scan operator framework. Here the focus is on the
@@ -66,6 +68,7 @@ import org.junit.Test;
  * appear elsewhere.
  */
 
+@Category(RowSetTests.class)
 public class TestScanOperatorExec extends SubOperatorTest {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestScanOperatorExec.class);
 
@@ -187,7 +190,7 @@ public class TestScanOperatorExec extends SubOperatorTest {
           .add("a", MinorType.INT)
           .addNullable("b", MinorType.VARCHAR, 10)
           .buildSchema();
-      schemaNegotiator.setTableSchema(schema);
+      schemaNegotiator.setTableSchema(schema, true);
       tableLoader = schemaNegotiator.build();
       return true;
     }
@@ -213,7 +216,7 @@ public class TestScanOperatorExec extends SubOperatorTest {
           .add("a", MinorType.VARCHAR)
           .addNullable("b", MinorType.VARCHAR, 10)
           .buildSchema();
-      schemaNegotiator.setTableSchema(schema);
+      schemaNegotiator.setTableSchema(schema, true);
       schemaNegotiator.build();
       tableLoader = schemaNegotiator.build();
       return true;
@@ -1367,7 +1370,7 @@ public class TestScanOperatorExec extends SubOperatorTest {
       TupleMetadata schema = new SchemaBuilder()
           .add("a", MinorType.VARCHAR)
           .buildSchema();
-      schemaNegotiator.setTableSchema(schema);
+      schemaNegotiator.setTableSchema(schema, true);
       tableLoader = schemaNegotiator.build();
       return true;
     }
@@ -1493,7 +1496,7 @@ public class TestScanOperatorExec extends SubOperatorTest {
       TupleMetadata schema = new SchemaBuilder()
           .add("a", MinorType.INT)
           .buildSchema();
-      schemaNegotiator.setTableSchema(schema);
+      schemaNegotiator.setTableSchema(schema, true);
       tableLoader = schemaNegotiator.build();
       return true;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
index d8c9a65..ef79b0e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
@@ -22,12 +22,13 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.BatchSchema;
@@ -37,8 +38,9 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * Test the early-schema support of the scan orchestrator. "Early schema"
@@ -49,6 +51,7 @@ import org.junit.Test;
  * that tests for lower-level components have already passed.
  */
 
+@Category(RowSetTests.class)
 public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
   /**
@@ -78,16 +81,17 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
-    // Schema provided, so an empty batch is available to
-    // send downstream.
+    // Simulate a first reader in a scan that can provide an
+    // empty batch to define schema.
 
     {
+      reader.defineSchema();
       SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
           .build();
 
       assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
     }
 
     // Create a batch of data.
@@ -107,8 +111,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .addRow(2, "wilma")
           .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
     }
 
     // Second batch.
@@ -128,8 +132,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .addRow(4, "betty")
           .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
     }
 
     // Explicit reader close. (All other tests are lazy, they
@@ -167,16 +171,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
-    // Verify empty batch.
-
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
-          .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
+    // Don't bother with an empty batch here or in other tests.
+    // Simulates the second reader in a scan.
 
     // Create a batch of data.
 
@@ -188,15 +184,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     // Verify
 
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
-          .addRow(1, "fred")
-          .addRow(2, "wilma")
-          .build();
+    SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
+        .addRow(1, "fred")
+        .addRow(2, "wilma")
+        .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
+    RowSetUtilities.verify(expected,
+        fixture.wrap(scanner.output()));
 
     scanner.close();
   }
@@ -228,20 +222,10 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
-    // Verify empty batch.
-
     BatchSchema expectedSchema = new SchemaBuilder()
         .add("b", MinorType.VARCHAR)
         .add("a", MinorType.INT)
         .build();
-   {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-          .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-   }
 
     // Create a batch of data.
 
@@ -253,17 +237,15 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     // Verify
 
-   {
-     SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow("fred", 1)
-        .addRow("wilma", 2)
-        .build();
+   SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+      .addRow("fred", 1)
+      .addRow("wilma", 2)
+      .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-   }
+   RowSetUtilities.verify(expected,
+       fixture.wrap(scanner.output()));
 
-    scanner.close();
+   scanner.close();
   }
 
   /**
@@ -294,21 +276,11 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
-    // Verify empty batch
-
     BatchSchema expectedSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
         .addNullable("c", MinorType.INT)
         .build();
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-          .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-   }
 
    // Create a batch of data.
 
@@ -320,15 +292,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     // Verify
 
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(1, "fred", null)
-        .addRow(2, "wilma", null)
-        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, "fred", null)
+      .addRow(2, "wilma", null)
+      .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
+    RowSetUtilities.verify(expected,
+        fixture.wrap(scanner.output()));
 
     scanner.close();
   }
@@ -369,21 +339,11 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
-    // Verify empty batch
-
     BatchSchema expectedSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
         .addNullable("c", MinorType.VARCHAR)
         .build();
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-          .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-   }
 
     // Create a batch of data.
 
@@ -395,15 +355,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     // Verify
 
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(1, "fred", null)
-        .addRow(2, "wilma", null)
-        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, "fred", null)
+      .addRow(2, "wilma", null)
+      .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
+    RowSetUtilities.verify(expected,
+        fixture.wrap(scanner.output()));
 
     scanner.close();
   }
@@ -440,19 +398,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     assertFalse(loader.writer().column("b").schema().isProjected());
 
-    // Verify empty batch.
-
     BatchSchema expectedSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .build();
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-          .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
 
     // Create a batch of data.
 
@@ -464,15 +412,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     // Verify
 
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow(1)
-        .addRow(2)
-        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1)
+      .addRow(2)
+      .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
+    RowSetUtilities.verify(expected,
+        fixture.wrap(scanner.output()));
 
     scanner.close();
   }
@@ -516,16 +462,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     BatchSchema expectedSchema = new SchemaBuilder()
         .build();
-    {
-      // Expect an empty schema
-
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-          .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
 
     // Create a batch of data.
 
@@ -545,8 +481,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
         .addRow()
         .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
     }
 
     // Fast path to fill in empty rows
@@ -592,18 +528,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     reader.makeTableLoader(tableSchema);
 
-    // Schema provided, so an empty batch is available to
-    // send downstream.
-
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
-          .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
-
     // Create a batch of data. Because there are no columns, it does
     // not make sense to ready any rows.
 
@@ -616,8 +540,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
           .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
     }
 
     scanner.close();
@@ -650,19 +574,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     reader.makeTableLoader(tableSchema);
 
-    // Verify initial empty batch.
-
     BatchSchema expectedSchema = new SchemaBuilder()
         .addNullable("a", MinorType.INT)
         .build();
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-          .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
 
     // Create a batch of data. Because there are no columns, it does
     // not make sense to ready any rows.
@@ -672,16 +586,14 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
     // Verify
 
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-          .build();
-
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .build();
+    RowSetUtilities.verify(expected,
+        fixture.wrap(scanner.output()));
 
     scanner.close();
   }
+
   /**
    * The projection mechanism provides "type smoothing": null
    * columns prefer the type of previously-seen non-null columns.
@@ -718,6 +630,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
       ReaderSchemaOrchestrator reader = scanner.startReader();
       reader.makeTableLoader(table1Schema);
+      reader.defineSchema();
       VectorContainer output = scanner.output();
       tracker.trackSchema(output);
       schemaVersion = tracker.schemaVersion();
@@ -737,6 +650,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .buildSchema();
       ReaderSchemaOrchestrator reader = scanner.startReader();
       reader.makeTableLoader(table2Schema);
+      reader.defineSchema();
       VectorContainer output = scanner.output();
       tracker.trackSchema(output);
       assertEquals(schemaVersion, tracker.schemaVersion());
@@ -756,6 +670,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .buildSchema();
       ReaderSchemaOrchestrator reader = scanner.startReader();
       reader.makeTableLoader(table3Schema);
+      reader.defineSchema();
       VectorContainer output = scanner.output();
       tracker.trackSchema(output);
       assertEquals(schemaVersion, tracker.schemaVersion());
@@ -776,6 +691,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .buildSchema();
       ReaderSchemaOrchestrator reader = scanner.startReader();
       reader.makeTableLoader(table2Schema);
+      reader.defineSchema();
       VectorContainer output = scanner.output();
       tracker.trackSchema(output);
       assertEquals(MinorType.BIGINT, output.getSchema().getColumn(0).getType().getMinorType());
@@ -843,8 +759,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .addRow(10, "fred")
           .addRow(20, "wilma")
           .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(projector.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(projector.output()));
     }
     {
       // ... FROM table 2
@@ -874,8 +790,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .addRow(30, null)
           .addRow(40, null)
           .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(projector.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(projector.output()));
     }
     {
       // ... FROM table 3
@@ -899,8 +815,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .addRow(50, "dino")
           .addRow(60, "barney")
           .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(projector.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(projector.output()));
     }
 
     projector.close();
@@ -926,9 +842,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       ReaderSchemaOrchestrator reader = scanner.startReader();
       ResultSetLoader loader = reader.makeTableLoader(schema1);
 
-      tracker.trackSchema(scanner.output());
-      schemaVersion = tracker.schemaVersion();
-
       // Create a batch
 
       reader.startBatch();
@@ -936,6 +849,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
         .addRow("fred")
         .addRow("wilma");
       reader.endBatch();
+      tracker.trackSchema(scanner.output());
+      schemaVersion = tracker.schemaVersion();
 
       // Verify
 
@@ -944,8 +859,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
         .addRow("wilma")
         .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
       scanner.closeReader();
     }
     {
@@ -960,8 +875,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       ReaderSchemaOrchestrator reader = scanner.startReader();
       ResultSetLoader loader = reader.makeTableLoader(schema2);
 
-      tracker.trackSchema(scanner.output());
-      assertEquals(schemaVersion, tracker.schemaVersion());
 
       // Create a batch
 
@@ -973,13 +886,15 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
       // Verify, using persistent schema
 
+      tracker.trackSchema(scanner.output());
+      assertEquals(schemaVersion, tracker.schemaVersion());
       SingleRowSet expected = fixture.rowSetBuilder(schema1)
         .addRow("barney")
         .addRow("betty")
         .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
       scanner.closeReader();
     }
     {
@@ -994,9 +909,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
       ReaderSchemaOrchestrator reader = scanner.startReader();
       ResultSetLoader loader = reader.makeTableLoader(schema3);
 
-      tracker.trackSchema(scanner.output());
-      assertEquals(schemaVersion, tracker.schemaVersion());
-
       // Create a batch
 
       reader.startBatch();
@@ -1007,13 +919,16 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
 
       // Verify, using persistent schema
 
+      tracker.trackSchema(scanner.output());
+      assertEquals(schemaVersion, tracker.schemaVersion());
+
       SingleRowSet expected = fixture.rowSetBuilder(schema1)
         .addRow("bam-bam")
         .addRow("pebbles")
         .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
       scanner.closeReader();
     }
 
@@ -1073,8 +988,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .addRow(10, "fred", 110L)
           .addRow(20, "wilma", 110L)
           .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
 
       scanner.closeReader();
     }
@@ -1100,8 +1015,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .addRow(30, "bambam", 330L)
           .addRow(40, "betty", 440L)
           .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
     }
     {
       // ... FROM table 3
@@ -1125,8 +1040,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
           .addRow(50, "dino", 550L)
           .addRow(60, "barney", 660L)
           .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
     }
 
     scanner.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java
index 0cf2cba..84ffc4e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java
@@ -18,10 +18,12 @@
 package org.apache.drill.exec.physical.impl.scan;
 
 import static org.junit.Assert.assertFalse;
+
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
@@ -32,6 +34,7 @@ import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 /**
  * Test the late-schema support in the scan orchestrator. "Late schema" is the case
@@ -43,6 +46,7 @@ import org.junit.Test;
  * that tests for lower-level components have already passed.
  */
 
+@Category(RowSetTests.class)
 public class TestScanOrchestratorLateSchema extends SubOperatorTest {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
index e100551..c7b52e2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
@@ -20,14 +20,15 @@ package org.apache.drill.exec.physical.impl.scan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
 import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
 import org.apache.drill.exec.record.BatchSchema;
@@ -35,10 +36,10 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
@@ -46,6 +47,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
  * with implicit file columns provided by the file metadata manager.
  */
 
+@Category(RowSetTests.class)
 public class TestScanOrchestratorMetadata extends SubOperatorTest {
 
   /**
@@ -104,8 +106,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
         .addRow(2, "wilma", "/w/x/y/z.csv", "/w/x/y", "z.csv", "csv", "x", "y")
         .build();
 
-    new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(scanner.output()));
+    RowSetUtilities.verify(expected,
+        fixture.wrap(scanner.output()));
 
     scanner.close();
   }
@@ -146,19 +148,9 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
 
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
-    // Verify empty batch.
-
     BatchSchema expectedSchema = new SchemaBuilder()
         .addNullable("c", MinorType.INT)
         .build();
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-         .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-         .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
 
     // Create a batch of data.
 
@@ -170,15 +162,13 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
 
     // Verify
 
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addSingleCol(null)
-        .addSingleCol(null)
-        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+      .addSingleCol(null)
+      .addSingleCol(null)
+      .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
+    RowSetUtilities.verify(expected,
+        fixture.wrap(scanner.output()));
 
     scanner.close();
   }
@@ -229,6 +219,7 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
 
     // Verify empty batch.
 
+    reader.defineSchema();
     BatchSchema expectedSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
         .add("b", MinorType.VARCHAR)
@@ -240,8 +231,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
          .build();
 
       assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-         .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
     }
 
     // Create a batch of data.
@@ -260,8 +251,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
         .addRow(2, "wilma", "x", "csv")
         .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
     }
 
     scanner.close();
@@ -302,22 +293,12 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
 
     ResultSetLoader loader = reader.makeTableLoader(tableSchema);
 
-    // Verify empty batch.
-
     BatchSchema expectedSchema = new SchemaBuilder()
         .addNullable("dir0", MinorType.VARCHAR)
         .add("b", MinorType.VARCHAR)
         .add("suffix", MinorType.VARCHAR)
         .addNullable("c", MinorType.INT)
         .build();
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-         .build();
-
-      assertNotNull(scanner.output());
-      new RowSetComparison(expected)
-         .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
 
     // Create a batch of data.
 
@@ -329,15 +310,13 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
 
     // Verify
 
-    {
-      SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .addRow("x", "fred", "csv", null)
-        .addRow("x", "wilma", "csv", null)
-        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+      .addRow("x", "fred", "csv", null)
+      .addRow("x", "wilma", "csv", null)
+      .build();
 
-      new RowSetComparison(expected)
-          .verifyAndClearAll(fixture.wrap(scanner.output()));
-    }
+    RowSetUtilities.verify(expected,
+        fixture.wrap(scanner.output()));
 
     scanner.close();
   }
@@ -403,8 +382,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
           .addRow("x", "y", "a.csv", "fred")
           .addRow("x", "y", "a.csv", "wilma")
           .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
 
       // Do explicit close (as in real code) to avoid an implicit
       // close which will blow away the current file info...
@@ -431,8 +410,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
           .addRow("x", null, "b.csv", "bambam")
           .addRow("x", null, "b.csv", "betty")
           .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(scanner.output()));
+      RowSetUtilities.verify(expected,
+          fixture.wrap(scanner.output()));
 
       scanner.closeReader();
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java
deleted file mode 100644
index 021d7e3..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java
+++ /dev/null
@@ -1,557 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
-import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
-import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
-import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
-import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
-import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.test.SubOperatorTest;
-import org.junit.Test;
-
-public class TestSchemaLevelProjection extends SubOperatorTest {
-
-  @Test
-  public void testWildcard() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-    assertEquals(1, scanProj.columns().size());
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.VARCHAR)
-        .addNullable("c", MinorType.INT)
-        .addArray("d", MinorType.FLOAT8)
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    new WildcardSchemaProjection(
-        scanProj, tableSchema, rootTuple,
-        ScanTestUtils.resolvers());
-
-    List<ResolvedColumn> columns = rootTuple.columns();
-    assertEquals(3, columns.size());
-
-    assertEquals("a", columns.get(0).name());
-    assertEquals(0, columns.get(0).sourceIndex());
-    assertSame(rootTuple, columns.get(0).source());
-    assertEquals("c", columns.get(1).name());
-    assertEquals(1, columns.get(1).sourceIndex());
-    assertSame(rootTuple, columns.get(1).source());
-    assertEquals("d", columns.get(2).name());
-    assertEquals(2, columns.get(2).sourceIndex());
-    assertSame(rootTuple, columns.get(2).source());
-  }
-
-  /**
-   * Test SELECT list with columns defined in a order and with
-   * name case different than the early-schema table.
-   */
-
-  @Test
-  public void testFullList() {
-
-    // Simulate SELECT c, b, a ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("c", "b", "a"),
-        ScanTestUtils.parsers());
-    assertEquals(3, scanProj.columns().size());
-
-    // Simulate a data source, with early schema, of (a, b, c)
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("A", MinorType.VARCHAR)
-        .add("B", MinorType.VARCHAR)
-        .add("C", MinorType.VARCHAR)
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    new ExplicitSchemaProjection(
-        scanProj, tableSchema, rootTuple,
-        ScanTestUtils.resolvers());
-
-    List<ResolvedColumn> columns = rootTuple.columns();
-    assertEquals(3, columns.size());
-
-    assertEquals("c", columns.get(0).name());
-    assertEquals(2, columns.get(0).sourceIndex());
-    assertSame(rootTuple, columns.get(0).source());
-
-    assertEquals("b", columns.get(1).name());
-    assertEquals(1, columns.get(1).sourceIndex());
-    assertSame(rootTuple, columns.get(1).source());
-
-    assertEquals("a", columns.get(2).name());
-    assertEquals(0, columns.get(2).sourceIndex());
-    assertSame(rootTuple, columns.get(2).source());
-  }
-
-  /**
-   * Test SELECT list with columns missing from the table schema.
-   */
-
-  @Test
-  public void testMissing() {
-
-    // Simulate SELECT c, v, b, w ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("c", "v", "b", "w"),
-        ScanTestUtils.parsers());
-    assertEquals(4, scanProj.columns().size());
-
-    // Simulate a data source, with early schema, of (a, b, c)
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("A", MinorType.VARCHAR)
-        .add("B", MinorType.VARCHAR)
-        .add("C", MinorType.VARCHAR)
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    new ExplicitSchemaProjection(
-        scanProj, tableSchema, rootTuple,
-        ScanTestUtils.resolvers());
-
-    List<ResolvedColumn> columns = rootTuple.columns();
-    assertEquals(4, columns.size());
-    VectorSource nullBuilder = rootTuple.nullBuilder();
-
-    assertEquals("c", columns.get(0).name());
-    assertEquals(2, columns.get(0).sourceIndex());
-    assertSame(rootTuple, columns.get(0).source());
-
-    assertEquals("v", columns.get(1).name());
-    assertEquals(0, columns.get(1).sourceIndex());
-    assertSame(nullBuilder, columns.get(1).source());
-
-    assertEquals("b", columns.get(2).name());
-    assertEquals(1, columns.get(2).sourceIndex());
-    assertSame(rootTuple, columns.get(2).source());
-
-    assertEquals("w", columns.get(3).name());
-    assertEquals(1, columns.get(3).sourceIndex());
-    assertSame(nullBuilder, columns.get(3).source());
-  }
-
-  @Test
-  public void testSubset() {
-
-    // Simulate SELECT c, a ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("c", "a"),
-        ScanTestUtils.parsers());
-    assertEquals(2, scanProj.columns().size());
-
-    // Simulate a data source, with early schema, of (a, b, c)
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("A", MinorType.VARCHAR)
-        .add("B", MinorType.VARCHAR)
-        .add("C", MinorType.VARCHAR)
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    new ExplicitSchemaProjection(
-        scanProj, tableSchema, rootTuple,
-        ScanTestUtils.resolvers());
-
-    List<ResolvedColumn> columns = rootTuple.columns();
-    assertEquals(2, columns.size());
-
-    assertEquals("c", columns.get(0).name());
-    assertEquals(2, columns.get(0).sourceIndex());
-    assertSame(rootTuple, columns.get(0).source());
-
-    assertEquals("a", columns.get(1).name());
-    assertEquals(0, columns.get(1).sourceIndex());
-    assertSame(rootTuple, columns.get(1).source());
-  }
-
-  @Test
-  public void testDisjoint() {
-
-    // Simulate SELECT c, a ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("b"),
-        ScanTestUtils.parsers());
-    assertEquals(1, scanProj.columns().size());
-
-    // Simulate a data source, with early schema, of (a)
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("A", MinorType.VARCHAR)
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    new ExplicitSchemaProjection(
-        scanProj, tableSchema, rootTuple,
-        ScanTestUtils.resolvers());
-
-    List<ResolvedColumn> columns = rootTuple.columns();
-    assertEquals(1, columns.size());
-    VectorSource nullBuilder = rootTuple.nullBuilder();
-
-    assertEquals("b", columns.get(0).name());
-    assertEquals(0, columns.get(0).sourceIndex());
-    assertSame(nullBuilder, columns.get(0).source());
-  }
-
-  @Test
-  public void testOmittedMap() {
-
-    // Simulate SELECT a, b.c.d ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("a", "b.c.d"),
-        ScanTestUtils.parsers());
-    assertEquals(2, scanProj.columns().size());
-    {
-      assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(1).nodeType());
-      UnresolvedColumn bCol = (UnresolvedColumn) (scanProj.columns().get(1));
-      assertTrue(bCol.element().isTuple());
-    }
-
-    // Simulate a data source, with early schema, of (a)
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.VARCHAR)
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    new ExplicitSchemaProjection(
-        scanProj, tableSchema, rootTuple,
-        ScanTestUtils.resolvers());
-
-    List<ResolvedColumn> columns = rootTuple.columns();
-    assertEquals(2, columns.size());
-
-    // Should have resolved a to a table column, b to a missing map.
-
-    // A is projected
-
-    ResolvedColumn aCol = columns.get(0);
-    assertEquals("a", aCol.name());
-    assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
-
-    // B is not projected, is implicitly a map
-
-    ResolvedColumn bCol = columns.get(1);
-    assertEquals("b", bCol.name());
-    assertEquals(ResolvedMapColumn.ID, bCol.nodeType());
-
-    ResolvedMapColumn bMap = (ResolvedMapColumn) bCol;
-    ResolvedTuple bMembers = bMap.members();
-    assertNotNull(bMembers);
-    assertEquals(1, bMembers.columns().size());
-
-    // C is a map within b
-
-    ResolvedColumn cCol = bMembers.columns().get(0);
-    assertEquals(ResolvedMapColumn.ID, cCol.nodeType());
-
-    ResolvedMapColumn cMap = (ResolvedMapColumn) cCol;
-    ResolvedTuple cMembers = cMap.members();
-    assertNotNull(cMembers);
-    assertEquals(1, cMembers.columns().size());
-
-    // D is an unknown column type (not a map)
-
-    ResolvedColumn dCol = cMembers.columns().get(0);
-    assertEquals(ResolvedNullColumn.ID, dCol.nodeType());
-  }
-
-  /**
-   * Test of a map with missing columns.
-   * table of (a{b, c}), project a.c, a.d, a.e.f
-   */
-
-  @Test
-  public void testOmittedMapMembers() {
-
-    // Simulate SELECT a.c, a.d, a.e.f ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("x", "a.c", "a.d", "a.e.f", "y"),
-        ScanTestUtils.parsers());
-    assertEquals(3, scanProj.columns().size());
-
-    // Simulate a data source, with early schema, of (x, y, a{b, c})
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("x", MinorType.VARCHAR)
-        .add("y", MinorType.INT)
-        .addMap("a")
-          .add("b", MinorType.BIGINT)
-          .add("c", MinorType.FLOAT8)
-          .resumeSchema()
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    new ExplicitSchemaProjection(
-        scanProj, tableSchema, rootTuple,
-        ScanTestUtils.resolvers());
-
-    List<ResolvedColumn> columns = rootTuple.columns();
-    assertEquals(3, columns.size());
-
-    // Should have resolved a.b to a map column,
-    // a.d to a missing nested map, and a.e.f to a missing
-    // nested map member
-
-    // X is projected
-
-    ResolvedColumn xCol = columns.get(0);
-    assertEquals("x", xCol.name());
-    assertEquals(ResolvedTableColumn.ID, xCol.nodeType());
-    assertSame(rootTuple, ((ResolvedTableColumn) (xCol)).source());
-    assertEquals(0, ((ResolvedTableColumn) (xCol)).sourceIndex());
-
-    // Y is projected
-
-    ResolvedColumn yCol = columns.get(2);
-    assertEquals("y", yCol.name());
-    assertEquals(ResolvedTableColumn.ID, yCol.nodeType());
-    assertSame(rootTuple, ((ResolvedTableColumn) (yCol)).source());
-    assertEquals(1, ((ResolvedTableColumn) (yCol)).sourceIndex());
-
-    // A is projected
-
-    ResolvedColumn aCol = columns.get(1);
-    assertEquals("a", aCol.name());
-    assertEquals(ResolvedMapColumn.ID, aCol.nodeType());
-
-    ResolvedMapColumn aMap = (ResolvedMapColumn) aCol;
-    ResolvedTuple aMembers = aMap.members();
-    assertFalse(aMembers.isSimpleProjection());
-    assertNotNull(aMembers);
-    assertEquals(3, aMembers.columns().size());
-
-    // a.c is projected
-
-    ResolvedColumn acCol = aMembers.columns().get(0);
-    assertEquals("c", acCol.name());
-    assertEquals(ResolvedTableColumn.ID, acCol.nodeType());
-    assertEquals(1, ((ResolvedTableColumn) (acCol)).sourceIndex());
-
-    // a.d is not in the table, is null
-
-    ResolvedColumn adCol = aMembers.columns().get(1);
-    assertEquals("d", adCol.name());
-    assertEquals(ResolvedNullColumn.ID, adCol.nodeType());
-
-    // a.e is not in the table, is implicitly a map
-
-    ResolvedColumn aeCol = aMembers.columns().get(2);
-    assertEquals("e", aeCol.name());
-    assertEquals(ResolvedMapColumn.ID, aeCol.nodeType());
-
-    ResolvedMapColumn aeMap = (ResolvedMapColumn) aeCol;
-    ResolvedTuple aeMembers = aeMap.members();
-    assertFalse(aeMembers.isSimpleProjection());
-    assertNotNull(aeMembers);
-    assertEquals(1, aeMembers.columns().size());
-
-    // a.d.f is a null column
-
-    ResolvedColumn aefCol = aeMembers.columns().get(0);
-    assertEquals("f", aefCol.name());
-    assertEquals(ResolvedNullColumn.ID, aefCol.nodeType());
-  }
-
-  /**
-   * Simple map project. This is an internal case in which the
-   * query asks for a set of columns inside a map, and the table
-   * loader produces exactly that set. No special projection is
-   * needed, the map is projected as a whole.
-   */
-
-  @Test
-  public void testSimpleMapProject() {
-
-    // Simulate SELECT a.b, a.c ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("a.b", "a.c"),
-        ScanTestUtils.parsers());
-    assertEquals(1, scanProj.columns().size());
-
-    // Simulate a data source, with early schema, of (a{b, c})
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .addMap("a")
-          .add("b", MinorType.BIGINT)
-          .add("c", MinorType.FLOAT8)
-          .resumeSchema()
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    new ExplicitSchemaProjection(
-        scanProj, tableSchema, rootTuple,
-        ScanTestUtils.resolvers());
-
-    List<ResolvedColumn> columns = rootTuple.columns();
-    assertEquals(1, columns.size());
-
-    // Should have resolved a.b to a map column,
-    // a.d to a missing nested map, and a.e.f to a missing
-    // nested map member
-
-    // a is projected as a vector, not as a structured map
-
-    ResolvedColumn aCol = columns.get(0);
-    assertEquals("a", aCol.name());
-    assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
-    assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
-    assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
-  }
-
-  /**
-   * Project of a non-map as a map
-   */
-
-  @Test
-  public void testMapMismatch() {
-
-    // Simulate SELECT a.b ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("a.b"),
-        ScanTestUtils.parsers());
-
-    // Simulate a data source, with early schema, of (a)
-    // where a is not a map.
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.VARCHAR)
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    try {
-      new ExplicitSchemaProjection(
-          scanProj, tableSchema, rootTuple,
-          ScanTestUtils.resolvers());
-      fail();
-    } catch (UserException e) {
-      // Expected
-    }
-  }
-
-  /**
-   * Test project of an array. At the scan level, we just verify
-   * that the requested column is, indeed, an array.
-   */
-
-  @Test
-  public void testArrayProject() {
-
-    // Simulate SELECT a[0] ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("a[0]"),
-        ScanTestUtils.parsers());
-
-    // Simulate a data source, with early schema, of (a)
-    // where a is not an array.
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .addArray("a", MinorType.VARCHAR)
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    new ExplicitSchemaProjection(
-          scanProj, tableSchema, rootTuple,
-          ScanTestUtils.resolvers());
-
-    List<ResolvedColumn> columns = rootTuple.columns();
-    assertEquals(1, columns.size());
-
-    ResolvedColumn aCol = columns.get(0);
-    assertEquals("a", aCol.name());
-    assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
-    assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
-    assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
-  }
-
-  /**
-   * Project of a non-array as an array
-   */
-
-  @Test
-  public void testArrayMismatch() {
-
-    // Simulate SELECT a[0] ...
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList("a[0]"),
-        ScanTestUtils.parsers());
-
-    // Simulate a data source, with early schema, of (a)
-    // where a is not an array.
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.VARCHAR)
-        .buildSchema();
-
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    try {
-      new ExplicitSchemaProjection(
-          scanProj, tableSchema, rootTuple,
-          ScanTestUtils.resolvers());
-      fail();
-    } catch (UserException e) {
-      // Expected
-    }
-  }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java
deleted file mode 100644
index a30321b..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java
+++ /dev/null
@@ -1,946 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
-import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
-import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother;
-import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
-import org.apache.drill.exec.physical.impl.scan.project.SmoothingProjection;
-import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
-import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
-import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
-/**
- * Tests schema smoothing at the schema projection level.
- * This level handles reusing prior types when filling null
- * values. But, because no actual vectors are involved, it
- * does not handle the schema chosen for a table ahead of
- * time, only the schema as it is merged with prior schema to
- * detect missing columns.
- * <p>
- * Focuses on the <tt>SmoothingProjection</tt> class itself.
- * <p>
- * Note that, at present, schema smoothing does not work for entire
- * maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt>
- * and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does
- * not currently know how to recreate the map. The same is true of
- * lists and unions. Handling such cases is complex and is probably
- * better handled via a system that allows the user to specify their
- * intent by providing a schema to apply to the two files.
- */
-
-public class TestSchemaSmoothing extends SubOperatorTest {
-
-  /**
-   * Sanity test for the simple, discrete case. The purpose of
-   * discrete is just to run the basic lifecycle in a way that
-   * is compatible with the schema-persistence version.
-   */
-
-  @Test
-  public void testDiscrete() {
-
-    // Set up the file metadata manager
-
-    Path filePathA = new Path("hdfs:///w/x/y/a.csv");
-    Path filePathB = new Path("hdfs:///w/x/y/b.csv");
-    FileMetadataManager metadataManager = new FileMetadataManager(
-        fixture.getOptionManager(),
-        new Path("hdfs:///w"),
-        Lists.newArrayList(filePathA, filePathB));
-
-    // Set up the scan level projection
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"),
-        ScanTestUtils.parsers(metadataManager.projectionParser()));
-
-    {
-      // Define a file a.csv
-
-      metadataManager.startFile(filePathA);
-
-      // Build the output schema from the (a, b) table schema
-
-      TupleMetadata twoColSchema = new SchemaBuilder()
-          .add("a", MinorType.INT)
-          .addNullable("b", MinorType.VARCHAR, 10)
-          .buildSchema();
-      NullColumnBuilder builder = new NullColumnBuilder(null, false);
-      ResolvedRow rootTuple = new ResolvedRow(builder);
-      new ExplicitSchemaProjection(
-          scanProj, twoColSchema, rootTuple,
-          ScanTestUtils.resolvers(metadataManager));
-
-      // Verify the full output schema
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("filename", MinorType.VARCHAR)
-          .add("a", MinorType.INT)
-          .addNullable("b", MinorType.VARCHAR, 10)
-          .buildSchema();
-
-      // Verify
-
-      List<ResolvedColumn> columns = rootTuple.columns();
-      assertEquals(3, columns.size());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
-      assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
-      assertEquals("a.csv", ((FileMetadataColumn) columns.get(0)).value());
-      assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType());
-    }
-    {
-      // Define a file b.csv
-
-      metadataManager.startFile(filePathB);
-
-      // Build the output schema from the (a) table schema
-
-      TupleMetadata oneColSchema = new SchemaBuilder()
-          .add("a", MinorType.INT)
-          .buildSchema();
-      NullColumnBuilder builder = new NullColumnBuilder(null, false);
-      ResolvedRow rootTuple = new ResolvedRow(builder);
-      new ExplicitSchemaProjection(
-          scanProj, oneColSchema, rootTuple,
-          ScanTestUtils.resolvers(metadataManager));
-
-      // Verify the full output schema
-      // Since this mode is "discrete", we don't remember the type
-      // of the missing column. (Instead, it is filled in at the
-      // vector level as part of vector persistence.) During projection, it is
-      // marked with type NULL so that the null column builder will fill in
-      // the proper type.
-
-      TupleMetadata expectedSchema = new SchemaBuilder()
-          .add("filename", MinorType.VARCHAR)
-          .add("a", MinorType.INT)
-          .addNullable("b", MinorType.NULL)
-          .buildSchema();
-
-      // Verify
-
-      List<ResolvedColumn> columns = rootTuple.columns();
-      assertEquals(3, columns.size());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
-      assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
-      assertEquals("b.csv", ((FileMetadataColumn) columns.get(0)).value());
-      assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType());
-      assertEquals(ResolvedNullColumn.ID, columns.get(2).nodeType());
-    }
-  }
-
-  /**
-   * Low-level test of the smoothing projection, including the exceptions
-   * it throws when things are not going its way.
-   */
-
-  @Test
-  public void testSmoothingProjection() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    // Table 1: (a: nullable bigint, b)
-
-    TupleMetadata schema1 = new SchemaBuilder()
-        .addNullable("a", MinorType.BIGINT)
-        .addNullable("b", MinorType.VARCHAR)
-        .add("c", MinorType.FLOAT8)
-        .buildSchema();
-    ResolvedRow priorSchema;
-    {
-      NullColumnBuilder builder = new NullColumnBuilder(null, false);
-      ResolvedRow rootTuple = new ResolvedRow(builder);
-      new WildcardSchemaProjection(
-          scanProj, schema1, rootTuple,
-          ScanTestUtils.resolvers());
-      priorSchema = rootTuple;
-    }
-
-    // Table 2: (a: nullable bigint, c), column omitted, original schema preserved
-
-    TupleMetadata schema2 = new SchemaBuilder()
-        .addNullable("a", MinorType.BIGINT)
-        .add("c", MinorType.FLOAT8)
-        .buildSchema();
-    try {
-      NullColumnBuilder builder = new NullColumnBuilder(null, false);
-      ResolvedRow rootTuple = new ResolvedRow(builder);
-      new SmoothingProjection(
-          scanProj, schema2, priorSchema, rootTuple,
-          ScanTestUtils.resolvers());
-      assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
-      priorSchema = rootTuple;
-    } catch (IncompatibleSchemaException e) {
-      fail();
-    }
-
-    // Table 3: (a, c, d), column added, must replan schema
-
-    TupleMetadata schema3 = new SchemaBuilder()
-        .addNullable("a", MinorType.BIGINT)
-        .addNullable("b", MinorType.VARCHAR)
-        .add("c", MinorType.FLOAT8)
-        .add("d", MinorType.INT)
-        .buildSchema();
-    try {
-      NullColumnBuilder builder = new NullColumnBuilder(null, false);
-      ResolvedRow rootTuple = new ResolvedRow(builder);
-      new SmoothingProjection(
-          scanProj, schema3, priorSchema, rootTuple,
-          ScanTestUtils.resolvers());
-      fail();
-    } catch (IncompatibleSchemaException e) {
-      // Expected
-    }
-
-    // Table 4: (a: double), change type must replan schema
-
-    TupleMetadata schema4 = new SchemaBuilder()
-        .addNullable("a", MinorType.FLOAT8)
-        .addNullable("b", MinorType.VARCHAR)
-        .add("c", MinorType.FLOAT8)
-        .buildSchema();
-    try {
-      NullColumnBuilder builder = new NullColumnBuilder(null, false);
-      ResolvedRow rootTuple = new ResolvedRow(builder);
-      new SmoothingProjection(
-          scanProj, schema4, priorSchema, rootTuple,
-          ScanTestUtils.resolvers());
-      fail();
-    } catch (IncompatibleSchemaException e) {
-      // Expected
-    }
-
-//    // Table 5: (a: not-nullable bigint): convert to nullable for consistency
-//
-//    TupleMetadata schema5 = new SchemaBuilder()
-//        .addNullable("a", MinorType.BIGINT)
-//        .add("c", MinorType.FLOAT8)
-//        .buildSchema();
-//    try {
-//      SmoothingProjection schemaProj = new SmoothingProjection(
-//          scanProj, schema5, dummySource, dummySource,
-//          new ArrayList<>(), priorSchema);
-//      assertTrue(schema1.isEquivalent(ScanTestUtils.schema(schemaProj.columns())));
-//    } catch (IncompatibleSchemaException e) {
-//      fail();
-//    }
-
-    // Table 6: Drop a non-nullable column, must replan
-
-    TupleMetadata schema6 = new SchemaBuilder()
-        .addNullable("a", MinorType.BIGINT)
-        .addNullable("b", MinorType.VARCHAR)
-        .buildSchema();
-    try {
-      NullColumnBuilder builder = new NullColumnBuilder(null, false);
-      ResolvedRow rootTuple = new ResolvedRow(builder);
-      new SmoothingProjection(
-          scanProj, schema6, priorSchema, rootTuple,
-          ScanTestUtils.resolvers());
-      fail();
-    } catch (IncompatibleSchemaException e) {
-      // Expected
-    }
-  }
-
-  /**
-   * Case in which the table schema is a superset of the prior
-   * schema. Discard prior schema. Turn off auto expansion of
-   * metadata for a simpler test.
-   */
-
-  @Test
-  public void testSmaller() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers());
-
-    TupleMetadata priorSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .buildSchema();
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-
-    {
-      NullColumnBuilder builder = new NullColumnBuilder(null, false);
-      ResolvedRow rootTuple = new ResolvedRow(builder);
-      smoother.resolve(priorSchema, rootTuple);
-      assertEquals(1, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
-    }
-    {
-      NullColumnBuilder builder = new NullColumnBuilder(null, false);
-      ResolvedRow rootTuple = new ResolvedRow(builder);
-      smoother.resolve(tableSchema, rootTuple);
-      assertEquals(2, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
-    }
-  }
-
-  /**
-   * Case in which the table schema and prior are disjoint
-   * sets. Discard the prior schema.
-   */
-
-  @Test
-  public void testDisjoint() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers());
-
-    TupleMetadata priorSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .buildSchema();
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-
-    {
-      doResolve(smoother, priorSchema);
-    }
-    {
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(2, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
-    }
-  }
-
-  private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) {
-    NullColumnBuilder builder = new NullColumnBuilder(null, false);
-    ResolvedRow rootTuple = new ResolvedRow(builder);
-    smoother.resolve(schema, rootTuple);
-    return rootTuple;
-  }
-
-  /**
-   * Column names match, but types differ. Discard the prior schema.
-   */
-
-  @Test
-  public void testDifferentTypes() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers());
-
-    TupleMetadata priorSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .addNullable("b", MinorType.VARCHAR)
-        .buildSchema();
-
-    {
-      doResolve(smoother, priorSchema);
-    }
-    {
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(2, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
-    }
-  }
-
-  /**
-   * The prior and table schemas are identical. Preserve the prior
-   * schema (though, the output is no different than if we discarded
-   * the prior schema...)
-   */
-
-  @Test
-  public void testSameSchemas() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers());
-
-    TupleMetadata priorSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-
-    {
-      doResolve(smoother, priorSchema);
-    }
-    {
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(1, smoother.schemaVersion());
-      TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple);
-      assertTrue(actualSchema.isEquivalent(tableSchema));
-      assertTrue(actualSchema.isEquivalent(priorSchema));
-    }
-  }
-
-  /**
-   * The prior and table schemas are identical, but the cases of names differ.
-   * Preserve the case of the first schema.
-   */
-
-  @Test
-  public void testDifferentCase() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers());
-
-    TupleMetadata priorSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("A", MinorType.INT)
-        .add("B", MinorType.VARCHAR)
-        .buildSchema();
-
-    {
-      doResolve(smoother, priorSchema);
-    }
-    {
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(1, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
-      List<ResolvedColumn> columns = rootTuple.columns();
-      assertEquals("a", columns.get(0).name());
-    }
-  }
-
-  /**
-   * Can't preserve the prior schema if it had required columns
-   * where the new schema has no columns.
-   */
-
-  @Test
-  public void testRequired() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers());
-
-    TupleMetadata priorSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .addNullable("b", MinorType.VARCHAR)
-        .buildSchema();
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .addNullable("b", MinorType.VARCHAR)
-        .buildSchema();
-
-    {
-      doResolve(smoother, priorSchema);
-    }
-    {
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(2, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
-    }
-  }
-
-  /**
-   * Preserve the prior schema if table is a subset and missing columns
-   * are nullable or repeated.
-   */
-
-  @Test
-  public void testMissingNullableColumns() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers());
-
-    TupleMetadata priorSchema = new SchemaBuilder()
-        .addNullable("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .addArray("c", MinorType.BIGINT)
-        .buildSchema();
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-
-    {
-      doResolve(smoother, priorSchema);
-    }
-    {
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(1, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
-    }
-  }
-
-  /**
-   * Preserve the prior schema if table is a subset. Map the table
-   * columns to the output using the prior schema ordering.
-   */
-
-  @Test
-  public void testReordering() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers());
-
-    TupleMetadata priorSchema = new SchemaBuilder()
-        .addNullable("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .addArray("c", MinorType.BIGINT)
-        .buildSchema();
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("b", MinorType.VARCHAR)
-        .addNullable("a", MinorType.INT)
-        .buildSchema();
-
-    {
-      doResolve(smoother, priorSchema);
-    }
-    {
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(1, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
-    }
-  }
-
-  /**
-   * If using the legacy wildcard expansion, reuse schema if partition paths
-   * are the same length.
-   */
-
-  @Test
-  public void testSamePartitionLength() {
-
-    // Set up the file metadata manager
-
-    Path filePathA = new Path("hdfs:///w/x/y/a.csv");
-    Path filePathB = new Path("hdfs:///w/x/y/b.csv");
-    FileMetadataManager metadataManager = new FileMetadataManager(
-        fixture.getOptionManager(),
-        new Path("hdfs:///w"),
-        Lists.newArrayList(filePathA, filePathB));
-
-    // Set up the scan level projection
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        ScanTestUtils.projectAllWithMetadata(2),
-        ScanTestUtils.parsers(metadataManager.projectionParser()));
-
-    // Define the schema smoother
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers(metadataManager));
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-
-    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
-    {
-      metadataManager.startFile(filePathA);
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
-    }
-    {
-      metadataManager.startFile(filePathB);
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(1, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
-    }
-  }
-
-  /**
-   * If using the legacy wildcard expansion, reuse schema if the new partition path
-   * is shorter than the previous. (Unneeded partitions will be set to null by the
-   * scan projector.)
-   */
-
-  @Test
-  public void testShorterPartitionLength() {
-
-    // Set up the file metadata manager
-
-    Path filePathA = new Path("hdfs:///w/x/y/a.csv");
-    Path filePathB = new Path("hdfs:///w/x/b.csv");
-    FileMetadataManager metadataManager = new FileMetadataManager(
-        fixture.getOptionManager(),
-        new Path("hdfs:///w"),
-        Lists.newArrayList(filePathA, filePathB));
-
-    // Set up the scan level projection
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        ScanTestUtils.projectAllWithMetadata(2),
-        ScanTestUtils.parsers(metadataManager.projectionParser()));
-
-    // Define the schema smoother
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers(metadataManager));
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-
-    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
-    {
-      metadataManager.startFile(filePathA);
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
-    }
-    {
-      metadataManager.startFile(filePathB);
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(1, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
-    }
-  }
-
-  /**
-   * If using the legacy wildcard expansion, we are able to use the same
-   * schema even if the new partition path is longer than the previous.
-   * Because all file names are provided up front.
-   */
-
-  @Test
-  public void testLongerPartitionLength() {
-
-    // Set up the file metadata manager
-
-    Path filePathA = new Path("hdfs:///w/x/a.csv");
-    Path filePathB = new Path("hdfs:///w/x/y/b.csv");
-    FileMetadataManager metadataManager = new FileMetadataManager(
-        fixture.getOptionManager(),
-        new Path("hdfs:///w"),
-        Lists.newArrayList(filePathA, filePathB));
-
-    // Set up the scan level projection
-
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        ScanTestUtils.projectAllWithMetadata(2),
-        ScanTestUtils.parsers(metadataManager.projectionParser()));
-
-    // Define the schema smoother
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers(metadataManager));
-
-    TupleMetadata tableSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .add("b", MinorType.VARCHAR)
-        .buildSchema();
-
-    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
-    {
-      metadataManager.startFile(filePathA);
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
-    }
-    {
-      metadataManager.startFile(filePathB);
-      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
-      assertEquals(1, smoother.schemaVersion());
-      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
-    }
-  }
-
-  /**
-   * Integrated test across multiple schemas at the batch level.
-   */
-
-  @Test
-  public void testSmoothableSchemaBatches() {
-    ScanLevelProjection scanProj = new ScanLevelProjection(
-        RowSetTestUtils.projectAll(),
-        ScanTestUtils.parsers());
-
-    SchemaSmoother smoother = new SchemaSmoother(scanProj,
-        ScanTestUtils.resolvers());
-
-    // Table 1: (a: bigint, b)
-
-    TupleMetadata schema1 = new SchemaBuilder()
-        .addNullable("a", MinorType.BIGINT)
-        .addNullable("b", MinorType.VARCHAR)
-        .add("c", MinorType.FLOAT8)
-        .buildSchema();
-    {
-      ResolvedRow rootTuple = doResolve(smoother, schema1);
-
-      // Just use the original schema.
-
-      assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
-      assertEquals(1, smoother.schemaVersion());
-    }
-
-    // Table 2: (a: nullable bigint, c), column ommitted, original schema preserved
-
-    TupleMetadata schema2 = new SchemaBuilder()
-        .addNullable("a", MinorType.BIGINT)
-        .add("c", MinorType.FLOAT8)
-        .buildSchema();
-    {
-      ResolvedRow rootTuple = doResolve(smoother, schema2);
-      assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
-      assertEquals(1, smoother.schemaVersion());
-    }
-
-    // Table 3: (a, c, d), column added, must replan schema
-
-    TupleMetadata schema3 = new SchemaBuilder()
-        .addNullable("a", MinorType.BIGINT)
-        .addNullable("b", MinorType.VARCHAR)
-        .add("c", MinorType.FLOAT8)
-        .add("d", MinorType.INT)
-        .buildSchema();
-    {
-      ResolvedRow rootTuple = doResolve(smoother, schema3);
-      assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple)));
-      assertEquals(2, smoother.schemaVersion());
-    }
-
-    // Table 4: Drop a non-nullable column, must replan
-
-    TupleMetadata schema4 = new SchemaBuilder()
-        .addNullable("a", MinorType.BIGINT)
-        .addNullable("b", MinorType.VARCHAR)
-        .buildSchema();
-    {
-      ResolvedRow rootTuple = doResolve(smoother, schema4);
-      assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple)));
-      assertEquals(3, smoother.schemaVersion());
-    }
-
-    // Table 5: (a: double), change type must replan schema
-
-    TupleMetadata schema5 = new SchemaBuilder()
-        .addNullable("a", MinorType.FLOAT8)
-        .addNullable("b", MinorType.VARCHAR)
-        .buildSchema();
-    {
-      ResolvedRow rootTuple = doResolve(smoother, schema5);
-      assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple)));
-       assertEquals(4, smoother.schemaVersion());
-    }
-
-//    // Table 6: (a: not-nullable bigint): convert to nullable for consistency
-//
-//    TupleMetadata schema6 = new SchemaBuilder()
-//        .add("a", MinorType.FLOAT8)
-//        .add("b", MinorType.VARCHAR)
-//        .buildSchema();
-//    {
-//      SchemaLevelProjection schemaProj = smoother.resolve(schema3, dummySource);
-//      assertTrue(schema5.isEquivalent(ScanTestUtils.schema(schemaProj.columns())));
-//    }
-  }
-
-  /**
-   * A SELECT * query uses the schema of the table as the output schema.
-   * This is trivial when the scanner has one table. But, if two or more
-   * tables occur, then things get interesting. The first table sets the
-   * schema. The second table then has:
-   * <ul>
-   * <li>The same schema, trivial case.</li>
-   * <li>A subset of the first table. The type of the "missing" column
-   * from the first table is used for a null column in the second table.</li>
-   * <li>A superset or disjoint set of the first schema. This triggers a hard schema
-   * change.</li>
-   * </ul>
-   * <p>
-   * It is an open question whether previous columns should be preserved on
-   * a hard reset. For now, the code implements, and this test verifies, that a
-   * hard reset clears the "memory" of prior schemas.
-   */
-
-  @Test
-  public void testWildcardSmoothing() {
-    ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator());
-    projector.enableSchemaSmoothing(true);
-    projector.build(RowSetTestUtils.projectAll());
-
-    TupleMetadata firstSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .addNullable("b", MinorType.VARCHAR, 10)
-        .addNullable("c", MinorType.BIGINT)
-        .buildSchema();
-    TupleMetadata subsetSchema = new SchemaBuilder()
-        .addNullable("b", MinorType.VARCHAR, 10)
-        .add("a", MinorType.INT)
-        .buildSchema();
-    TupleMetadata disjointSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .addNullable("b", MinorType.VARCHAR, 10)
-        .add("d", MinorType.VARCHAR)
-        .buildSchema();
-
-    SchemaTracker tracker = new SchemaTracker();
-    int schemaVersion;
-    {
-      // First table, establishes the baseline
-      // ... FROM table 1
-
-      ReaderSchemaOrchestrator reader = projector.startReader();
-      ResultSetLoader loader = reader.makeTableLoader(firstSchema);
-
-      reader.startBatch();
-      loader.writer()
-          .addRow(10, "fred", 110L)
-          .addRow(20, "wilma", 110L);
-      reader.endBatch();
-
-      tracker.trackSchema(projector.output());
-      schemaVersion = tracker.schemaVersion();
-
-      SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
-          .addRow(10, "fred", 110L)
-          .addRow(20, "wilma", 110L)
-          .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(projector.output()));
-    }
-    {
-      // Second table, same schema, the trivial case
-      // ... FROM table 2
-
-      ReaderSchemaOrchestrator reader = projector.startReader();
-      ResultSetLoader loader = reader.makeTableLoader(firstSchema);
-
-      reader.startBatch();
-      loader.writer()
-          .addRow(70, "pebbles", 770L)
-          .addRow(80, "hoppy", 880L);
-      reader.endBatch();
-
-      tracker.trackSchema(projector.output());
-      assertEquals(schemaVersion, tracker.schemaVersion());
-
-      SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
-          .addRow(70, "pebbles", 770L)
-          .addRow(80, "hoppy", 880L)
-          .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(projector.output()));
-    }
-    {
-      // Third table: subset schema of first two
-      // ... FROM table 3
-
-      ReaderSchemaOrchestrator reader = projector.startReader();
-      ResultSetLoader loader = reader.makeTableLoader(subsetSchema);
-
-      reader.startBatch();
-      loader.writer()
-          .addRow("bambam", 30)
-          .addRow("betty", 40);
-      reader.endBatch();
-
-      tracker.trackSchema(projector.output());
-      assertEquals(schemaVersion, tracker.schemaVersion());
-
-      SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
-          .addRow(30, "bambam", null)
-          .addRow(40, "betty", null)
-          .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(projector.output()));
-    }
-    {
-      // Fourth table: disjoint schema, cases a schema reset
-      // ... FROM table 4
-
-      ReaderSchemaOrchestrator reader = projector.startReader();
-      ResultSetLoader loader = reader.makeTableLoader(disjointSchema);
-
-      reader.startBatch();
-      loader.writer()
-          .addRow(50, "dino", "supporting")
-          .addRow(60, "barney", "main");
-      reader.endBatch();
-
-      tracker.trackSchema(projector.output());
-      assertNotEquals(schemaVersion, tracker.schemaVersion());
-
-      SingleRowSet expected = fixture.rowSetBuilder(disjointSchema)
-          .addRow(50, "dino", "supporting")
-          .addRow(60, "barney", "main")
-          .build();
-      new RowSetComparison(expected)
-        .verifyAndClearAll(fixture.wrap(projector.output()));
-    }
-
-    projector.close();
-  }
-
-  // TODO: Test schema smoothing with repeated
-  // TODO: Test hard schema change
-  // TODO: Typed null column tests (resurrect)
-  // TODO: Test maps and arrays of maps
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
index b45374b..086da96 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
@@ -24,14 +24,21 @@ import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadata;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumnDefn;
+import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn;
 import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec;
 import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -114,4 +121,42 @@ public class TestConstantColumnLoader extends SubOperatorTest {
         .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
     staticLoader.close();
   }
+
+  @Test
+  public void testFileMetadata() {
+
+    FileMetadata fileInfo = new FileMetadata(new Path("hdfs:///w/x/y/z.csv"), new Path("hdfs:///w"));
+    List<ConstantColumnSpec> defns = new ArrayList<>();
+    FileMetadataColumnDefn iDefn = new FileMetadataColumnDefn(
+        ScanTestUtils.SUFFIX_COL, ImplicitFileColumns.SUFFIX);
+    FileMetadataColumn iCol = new FileMetadataColumn(ScanTestUtils.SUFFIX_COL,
+        iDefn, fileInfo, null, 0);
+    defns.add(iCol);
+
+    String partColName = ScanTestUtils.partitionColName(1);
+    PartitionColumn pCol = new PartitionColumn(partColName, 1, fileInfo, null, 0);
+    defns.add(pCol);
+
+    ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+    ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns);
+
+    // Create a batch
+
+    staticLoader.load(2);
+
+    // Verify
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add(ScanTestUtils.SUFFIX_COL, MinorType.VARCHAR)
+        .addNullable(partColName, MinorType.VARCHAR)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("csv", "y")
+        .addRow("csv", "y")
+        .build();
+
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
+    staticLoader.close();
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
index 5b49ab3..f40e847 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.project;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -27,6 +28,8 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
 import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.test.SubOperatorTest;
@@ -56,6 +59,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
     final ScanLevelProjection scanProj = new ScanLevelProjection(
         RowSetTestUtils.projectList("a", "b", "c"),
         ScanTestUtils.parsers());
+
     assertFalse(scanProj.projectAll());
     assertFalse(scanProj.projectNone());
 
@@ -72,6 +76,19 @@ public class TestScanLevelProjection extends SubOperatorTest {
     // Verify column type
 
     assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+
+    // Verify tuple projection
+
+    RequestedTuple outputProj = scanProj.rootProjection();
+    assertEquals(3, outputProj.projections().size());
+    assertNotNull(outputProj.get("a"));
+    assertTrue(outputProj.get("a").isSimple());
+
+    RequestedTuple readerProj = scanProj.readerProjection();
+    assertEquals(3, readerProj.projections().size());
+    assertNotNull(readerProj.get("a"));
+    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a"));
+    assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("d"));
   }
 
   /**
@@ -85,6 +102,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
     final ScanLevelProjection scanProj = new ScanLevelProjection(
         RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
         ScanTestUtils.parsers());
+
     assertFalse(scanProj.projectAll());
     assertFalse(scanProj.projectNone());
 
@@ -107,6 +125,20 @@ public class TestScanLevelProjection extends SubOperatorTest {
 
     final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element();
     assertTrue(c.isSimple());
+
+    // Verify tuple projection
+
+    RequestedTuple outputProj = scanProj.rootProjection();
+    assertEquals(3, outputProj.projections().size());
+    assertNotNull(outputProj.get("a"));
+    assertTrue(outputProj.get("a").isTuple());
+
+    RequestedTuple readerProj = scanProj.readerProjection();
+    assertEquals(3, readerProj.projections().size());
+    assertNotNull(readerProj.get("a"));
+    assertEquals(ProjectionType.TUPLE, readerProj.projectionType("a"));
+    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("c"));
+    assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("d"));
   }
 
   /**
@@ -119,6 +151,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
     final ScanLevelProjection scanProj = new ScanLevelProjection(
         RowSetTestUtils.projectList("a[1]", "a[3]"),
         ScanTestUtils.parsers());
+
     assertFalse(scanProj.projectAll());
     assertFalse(scanProj.projectNone());
 
@@ -137,6 +170,19 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertTrue(a.hasIndex(1));
     assertFalse(a.hasIndex(2));
     assertTrue(a.hasIndex(3));
+
+    // Verify tuple projection
+
+    RequestedTuple outputProj = scanProj.rootProjection();
+    assertEquals(1, outputProj.projections().size());
+    assertNotNull(outputProj.get("a"));
+    assertTrue(outputProj.get("a").isArray());
+
+    RequestedTuple readerProj = scanProj.readerProjection();
+    assertEquals(1, readerProj.projections().size());
+    assertNotNull(readerProj.get("a"));
+    assertEquals(ProjectionType.ARRAY, readerProj.projectionType("a"));
+    assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("c"));
   }
 
   /**
@@ -165,6 +211,17 @@ public class TestScanLevelProjection extends SubOperatorTest {
     // Verify column type
 
     assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType());
+
+    // Verify tuple projection
+
+    RequestedTuple outputProj = scanProj.rootProjection();
+    assertEquals(1, outputProj.projections().size());
+    assertNotNull(outputProj.get("**"));
+    assertTrue(outputProj.get("**").isWildcard());
+
+    RequestedTuple readerProj = scanProj.readerProjection();
+    assertTrue(readerProj instanceof ImpliedTupleRequest);
+    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a"));
   }
 
   /**
@@ -181,38 +238,62 @@ public class TestScanLevelProjection extends SubOperatorTest {
     assertFalse(scanProj.projectAll());
     assertTrue(scanProj.projectNone());
     assertEquals(0, scanProj.requestedCols().size());
+
+    // Verify tuple projection
+
+    RequestedTuple outputProj = scanProj.rootProjection();
+    assertEquals(0, outputProj.projections().size());
+
+    RequestedTuple readerProj = scanProj.readerProjection();
+    assertTrue(readerProj instanceof ImpliedTupleRequest);
+    assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("a"));
   }
 
   /**
-   * Can't include both a wildcard and a column name.
+   * Can include both a wildcard and a column name. The Project
+   * operator will fill in the column, the scan framework just ignores
+   * the extra column.
    */
 
   @Test
-  public void testErrorWildcardAndColumns() {
-    try {
-      new ScanLevelProjection(
+  public void testWildcardAndColumns() {
+    ScanLevelProjection scanProj = new ScanLevelProjection(
           RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"),
           ScanTestUtils.parsers());
-      fail();
-    } catch (final IllegalArgumentException e) {
-      // Expected
-    }
+
+    assertTrue(scanProj.projectAll());
+    assertFalse(scanProj.projectNone());
+    assertEquals(2, scanProj.requestedCols().size());
+    assertEquals(1, scanProj.columns().size());
+
+    // Verify tuple projection
+
+    RequestedTuple outputProj = scanProj.rootProjection();
+    assertEquals(2, outputProj.projections().size());
+    assertNotNull(outputProj.get("**"));
+    assertTrue(outputProj.get("**").isWildcard());
+    assertNotNull(outputProj.get("a"));
+
+    RequestedTuple readerProj = scanProj.readerProjection();
+    assertTrue(readerProj instanceof ImpliedTupleRequest);
+    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a"));
+    assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("c"));
   }
 
   /**
-   * Can't include both a column name and a wildcard.
+   * Test a column name and a wildcard.
    */
 
   @Test
-  public void testErrorColumnAndWildcard() {
-    try {
-      new ScanLevelProjection(
+  public void testColumnAndWildcard() {
+    ScanLevelProjection scanProj = new ScanLevelProjection(
           RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR),
           ScanTestUtils.parsers());
-      fail();
-    } catch (final IllegalArgumentException e) {
-      // Expected
-    }
+
+    assertTrue(scanProj.projectAll());
+    assertFalse(scanProj.projectNone());
+    assertEquals(2, scanProj.requestedCols().size());
+    assertEquals(1, scanProj.columns().size());
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
index a21b1e4..8adc037 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -28,8 +28,9 @@ import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
 import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
@@ -38,8 +39,10 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
  * Tests schema smoothing at the schema projection level.
@@ -63,7 +66,7 @@ import org.junit.experimental.categories.Category;
  * to a fundamental limitation in Drill:
  * <ul>
  * <li>Drill cannot predict the future: each file (or batch)
- * may have a different schema.</ul>
+ * may have a different schema.</li>
  * <li>Drill does not know about these differences until they
  * occur.</li>
  * <li>The scan operator is obligated to return the same schema
@@ -85,6 +88,105 @@ import org.junit.experimental.categories.Category;
 public class TestSchemaSmoothing extends SubOperatorTest {
 
   /**
+   * Sanity test for the simple, discrete case. The purpose of
+   * discrete is just to run the basic lifecycle in a way that
+   * is compatible with the schema-persistence version.
+   */
+
+  @Test
+  public void testDiscrete() {
+
+    // Set up the file metadata manager
+
+    Path filePathA = new Path("hdfs:///w/x/y/a.csv");
+    Path filePathB = new Path("hdfs:///w/x/y/b.csv");
+    FileMetadataManager metadataManager = new FileMetadataManager(
+        fixture.getOptionManager(),
+        new Path("hdfs:///w"),
+        Lists.newArrayList(filePathA, filePathB));
+
+    // Set up the scan level projection
+
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"),
+        ScanTestUtils.parsers(metadataManager.projectionParser()));
+
+    {
+      // Define a file a.csv
+
+      metadataManager.startFile(filePathA);
+
+      // Build the output schema from the (a, b) table schema
+
+      TupleMetadata twoColSchema = new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .addNullable("b", MinorType.VARCHAR, 10)
+          .buildSchema();
+      NullColumnBuilder builder = new NullColumnBuilder(null, false);
+      ResolvedRow rootTuple = new ResolvedRow(builder);
+      new ExplicitSchemaProjection(
+          scanProj, twoColSchema, rootTuple,
+          ScanTestUtils.resolvers(metadataManager));
+
+      // Verify the full output schema
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("filename", MinorType.VARCHAR)
+          .add("a", MinorType.INT)
+          .addNullable("b", MinorType.VARCHAR, 10)
+          .buildSchema();
+
+      // Verify
+
+      List<ResolvedColumn> columns = rootTuple.columns();
+      assertEquals(3, columns.size());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+      assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
+      assertEquals("a.csv", ((FileMetadataColumn) columns.get(0)).value());
+      assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType());
+    }
+    {
+      // Define a file b.csv
+
+      metadataManager.startFile(filePathB);
+
+      // Build the output schema from the (a) table schema
+
+      TupleMetadata oneColSchema = new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .buildSchema();
+      NullColumnBuilder builder = new NullColumnBuilder(null, false);
+      ResolvedRow rootTuple = new ResolvedRow(builder);
+      new ExplicitSchemaProjection(
+          scanProj, oneColSchema, rootTuple,
+          ScanTestUtils.resolvers(metadataManager));
+
+      // Verify the full output schema
+      // Since this mode is "discrete", we don't remember the type
+      // of the missing column. (Instead, it is filled in at the
+      // vector level as part of vector persistence.) During projection, it is
+      // marked with type NULL so that the null column builder will fill in
+      // the proper type.
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("filename", MinorType.VARCHAR)
+          .add("a", MinorType.INT)
+          .addNullable("b", MinorType.NULL)
+          .buildSchema();
+
+      // Verify
+
+      List<ResolvedColumn> columns = rootTuple.columns();
+      assertEquals(3, columns.size());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+      assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
+      assertEquals("b.csv", ((FileMetadataColumn) columns.get(0)).value());
+      assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType());
+      assertEquals(ResolvedNullColumn.ID, columns.get(2).nodeType());
+    }
+  }
+
+  /**
    * Low-level test of the smoothing projection, including the exceptions
    * it throws when things are not going its way.
    */
@@ -463,6 +565,150 @@ public class TestSchemaSmoothing extends SubOperatorTest {
       assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
     }
   }
+
+  /**
+   * If using the legacy wildcard expansion, reuse schema if partition paths
+   * are the same length.
+   */
+
+  @Test
+  public void testSamePartitionLength() {
+
+    // Set up the file metadata manager
+
+    Path filePathA = new Path("hdfs:///w/x/y/a.csv");
+    Path filePathB = new Path("hdfs:///w/x/y/b.csv");
+    FileMetadataManager metadataManager = new FileMetadataManager(
+        fixture.getOptionManager(),
+        new Path("hdfs:///w"),
+        Lists.newArrayList(filePathA, filePathB));
+
+    // Set up the scan level projection
+
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        ScanTestUtils.projectAllWithMetadata(2),
+        ScanTestUtils.parsers(metadataManager.projectionParser()));
+
+    // Define the schema smoother
+
+    SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers(metadataManager));
+
+    TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+    {
+      metadataManager.startFile(filePathA);
+      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+    }
+    {
+      metadataManager.startFile(filePathB);
+      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(1, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+    }
+  }
+
+  /**
+   * If using the legacy wildcard expansion, reuse schema if the new partition path
+   * is shorter than the previous. (Unneeded partitions will be set to null by the
+   * scan projector.)
+   */
+
+  @Test
+  public void testShorterPartitionLength() {
+
+    // Set up the file metadata manager
+
+    Path filePathA = new Path("hdfs:///w/x/y/a.csv");
+    Path filePathB = new Path("hdfs:///w/x/b.csv");
+    FileMetadataManager metadataManager = new FileMetadataManager(
+        fixture.getOptionManager(),
+        new Path("hdfs:///w"),
+        Lists.newArrayList(filePathA, filePathB));
+
+    // Set up the scan level projection
+
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        ScanTestUtils.projectAllWithMetadata(2),
+        ScanTestUtils.parsers(metadataManager.projectionParser()));
+
+    // Define the schema smoother
+
+    SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers(metadataManager));
+
+    TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+    {
+      metadataManager.startFile(filePathA);
+      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+    }
+    {
+      metadataManager.startFile(filePathB);
+      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(1, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+    }
+  }
+
+  /**
+   * If using the legacy wildcard expansion, we are able to use the same
+   * schema even if the new partition path is longer than the previous.
+   * Because all file names are provided up front.
+   */
+
+  @Test
+  public void testLongerPartitionLength() {
+
+    // Set up the file metadata manager
+
+    Path filePathA = new Path("hdfs:///w/x/a.csv");
+    Path filePathB = new Path("hdfs:///w/x/y/b.csv");
+    FileMetadataManager metadataManager = new FileMetadataManager(
+        fixture.getOptionManager(),
+        new Path("hdfs:///w"),
+        Lists.newArrayList(filePathA, filePathB));
+
+    // Set up the scan level projection
+
+    ScanLevelProjection scanProj = new ScanLevelProjection(
+        ScanTestUtils.projectAllWithMetadata(2),
+        ScanTestUtils.parsers(metadataManager.projectionParser()));
+
+    // Define the schema smoother
+
+    SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers(metadataManager));
+
+    TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+    {
+      metadataManager.startFile(filePathA);
+      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+    }
+    {
+      metadataManager.startFile(filePathB);
+      ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(1, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+    }
+  }
+
   /**
    * Integrated test across multiple schemas at the batch level.
    */