You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/12/10 12:03:08 UTC

[GitHub] arina-ielchiieva closed pull request #1501: DRILL-6791: Scan projection framework

arina-ielchiieva closed pull request #1501: DRILL-6791: Scan projection framework
URL: https://github.com/apache/drill/pull/1501
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 00a7620bee9..37901c65b99 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,3 +24,4 @@ install_manifest.txt
 *.swp
 # TODO - DRILL-4336
 exec/jdbc-all/dependency-reduced-pom.xml
+.*.html
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java
new file mode 100644
index 00000000000..0540d0be1d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+/**
+ * Core interface for a projected column. Models a column throughout the
+ * projection lifecycle. Columns evolve from unresolved to resolved at
+ * different times. Each class that derives from this interface can act
+ * as a column "node", while declaring its type so it may be processed
+ * easily at the proper type.
+ * <p>
+ * For example, an implicit column is processed at the file schema
+ * resolution phase, converting from unresolved to resolved. At the same
+ * time, table columns remain unresolved, waiting for the table schema
+ * to appear.
+ * <p>
+ * In an advanced, experimental feature, schema persistence sees some
+ * columns transition from resolved to unresolved and back again.
+ * <p>
+ * Having all column nodes derive from this same interface keeps things
+ * tidy.
+ */
+
+public interface ColumnProjection {
+
+  public static final int FRAMEWORK_BASE_ID = 100;
+  public static final int READER_BASE_ID = 200;
+
+  /**
+   * The name of the column as it appears in the output
+   * row (record batch.)
+   *
+   * @return the output column name
+   */
+  String name();
+
+  /**
+   * A node type unique to each node. Nodes defined in this package
+   * use IDs less than {@link #FRAMEWORK_BASE_ID}. Nodes defined by
+   * frameworks (for file metadata columns or for other special
+   * columns) start numbering with {@link #FRAMEWORK_BASE_ID}. Readers
+   * may need their own specialized nodes, which must use IDs starting
+   * with {@link #READER_BASE_ID}.
+   * <p>
+   * This system solves two problems:
+   * <ol>
+   * <li>Provides an efficient way for each mechanism to recognize its
+   * own nodes without using <code>instance of</code>.</li>
+   * <li>Allows for frameworks and readers to be added without changing
+   * any base enum. This works because every instance of this mechanism
+   * sees only the base nodes, those from a single framework and those
+   * from a single reader; there is no need for a universal ID registry.
+   * Two frameworks can use identical IDs because they never mix.
+   * Similarly, two readers can use the same IDs because Drill does not
+   * allow a single scan operator to use two different kinds of readers.
+   * </li>
+   * </ol>
+   * @return the numeric ID for this node, used for each layer to
+   * recognize its own nodes
+   */
+  int nodeType();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java
new file mode 100644
index 00000000000..6d2e5d121ba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * Populate metadata columns either file metadata (AKA "implicit
+ * columns") or directory metadata (AKA "partition columns.") In both
+ * cases the column type is nullable Varchar and the column value
+ * is predefined by the projection planner; this class just copies
+ * that value into each row.
+ * <p>
+ * The values for the columns appear in the column definitions.
+ * This works because the metadata columns are re-resolved for
+ * for each file, picking up file-specific values. In some cases,
+ * a column might not even have a value (such as a reference to
+ * a dirN level that isn't defined for a particular file.
+ * <p>
+ * That said, this class is agnostic about the source and meaning
+ * of the columns: it only cares that the columns are of type
+ * nullable Varchar and that the values are in the column nodes.
+ */
+
+public class ConstantColumnLoader extends StaticColumnLoader {
+
+  public interface ConstantColumnSpec {
+    String name();
+    MaterializedField schema();
+    String value();
+  }
+
+  private final String values[];
+  private final List<? extends ConstantColumnSpec> constantCols;
+
+  public ConstantColumnLoader(ResultVectorCache vectorCache,
+      List<? extends ConstantColumnSpec> defns) {
+    super(vectorCache);
+
+    // Populate the loader schema from that provided.
+    // Cache values for faster access.
+    // TODO: Rewrite to specify schema up front
+
+    constantCols = defns;
+    RowSetLoader schema = loader.writer();
+    values = new String[defns.size()];
+    for (int i = 0; i < defns.size(); i++) {
+      ConstantColumnSpec defn  = defns.get(i);
+      values[i] = defn.value();
+      schema.addColumn(defn.schema());
+    }
+  }
+
+  @Override
+  public VectorContainer load(int rowCount) {
+    loader.startBatch();
+    RowSetLoader writer = loader.writer();
+    for (int i = 0; i < rowCount; i++) {
+      writer.start();
+      loadRow(writer);
+      writer.save();
+    }
+    return loader.harvest();
+  }
+
+  /**
+   * Populate static vectors with the defined static values.
+   *
+   * @param rowCount number of rows to generate. Must match the
+   * row count in the batch returned by the reader
+   */
+
+  private void loadRow(TupleWriter writer) {
+    for (int i = 0; i < values.length; i++) {
+
+      // Set the column (of any type) to null if the string value
+      // is null.
+
+      if (values[i] == null) {
+        writer.scalar(i).setNull();
+      } else {
+        // Else, set the static (string) value.
+
+        writer.scalar(i).setString(values[i]);
+      }
+    }
+  }
+
+  public List<? extends ConstantColumnSpec> columns() { return constantCols; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
new file mode 100644
index 00000000000..41cc59582c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Perform a schema projection for the case of an explicit list of
+ * projected columns. Example: SELECT a, b, c.
+ * <p>
+ * An explicit projection starts with the requested set of columns,
+ * then looks in the table schema to find matches. That is, it is
+ * driven by the query itself.
+ * <p>
+ * An explicit projection may include columns that do not exist in
+ * the source schema. In this case, we fill in null columns for
+ * unmatched projections.
+ */
+
+public class ExplicitSchemaProjection extends SchemaLevelProjection {
+
+  public ExplicitSchemaProjection(ScanLevelProjection scanProj,
+      TupleMetadata tableSchema,
+      ResolvedTuple rootTuple,
+      List<SchemaProjectionResolver> resolvers) {
+    super(resolvers);
+    resolveRootTuple(scanProj, rootTuple, tableSchema);
+  }
+
+  private void resolveRootTuple(ScanLevelProjection scanProj,
+      ResolvedTuple rootTuple,
+      TupleMetadata tableSchema) {
+    for (ColumnProjection col : scanProj.columns()) {
+      if (col.nodeType() == UnresolvedColumn.UNRESOLVED) {
+        resolveColumn(rootTuple, ((UnresolvedColumn) col).element(), tableSchema);
+      } else {
+        resolveSpecial(rootTuple, col, tableSchema);
+      }
+    }
+  }
+
+  private void resolveColumn(ResolvedTuple outputTuple,
+      RequestedColumn inputCol, TupleMetadata tableSchema) {
+    int tableColIndex = tableSchema.index(inputCol.name());
+    if (tableColIndex == -1) {
+      resolveNullColumn(outputTuple, inputCol);
+    } else {
+      resolveTableColumn(outputTuple, inputCol,
+          tableSchema.metadata(tableColIndex),
+          tableColIndex);
+    }
+  }
+
+  private void resolveTableColumn(ResolvedTuple outputTuple,
+      RequestedColumn requestedCol,
+      ColumnMetadata column, int sourceIndex) {
+
+    // Is the requested column implied to be a map?
+    // A requested column is a map if the user requests x.y and we
+    // are resolving column x. The presence of y as a member implies
+    // that x is a map.
+
+    if (requestedCol.isTuple()) {
+      resolveMap(outputTuple, requestedCol, column, sourceIndex);
+    }
+
+    // Is the requested column implied to be an array?
+    // This occurs when the projection list contains at least one
+    // array index reference such as x[10].
+
+    else if (requestedCol.isArray()) {
+      resolveArray(outputTuple, requestedCol, column, sourceIndex);
+    }
+
+    // A plain old column. Might be an array or a map, but if
+    // so, the request list just mentions it by name without implying
+    // the column type. That is, the project list just contains x
+    // by itself.
+
+    else {
+      projectTableColumn(outputTuple, requestedCol, column, sourceIndex);
+    }
+  }
+
+  private void resolveMap(ResolvedTuple outputTuple,
+      RequestedColumn requestedCol, ColumnMetadata column,
+      int sourceIndex) {
+
+    // If the actual column isn't a map, then the request is invalid.
+
+    if (! column.isMap()) {
+      throw UserException
+        .validationError()
+        .message("Project list implies a map column, but actual column is not a map")
+        .addContext("Projected column", requestedCol.fullName())
+        .addContext("Actual type", column.type().name())
+        .build(logger);
+    }
+
+    // The requested column is implied to be a map because it lists
+    // members to project. Project these.
+
+    ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple,
+        column.schema(), sourceIndex);
+    resolveTuple(mapCol.members(), requestedCol.mapProjection(),
+        column.mapSchema());
+
+    // If the projection is simple, then just project the map column
+    // as is. A projection is simple if all map columns from the table
+    // are projected, and no null columns are needed. The simple case
+    // occurs more often than one might expect because the result set
+    // loader only projected those columns that were needed, so the only
+    // issue we have to handle is null columns.
+    //
+    // In the simple case, we discard the map tuple just created
+    // since we ended up not needing it.
+
+    if (mapCol.members().isSimpleProjection()) {
+      outputTuple.removeChild(mapCol.members());
+      projectTableColumn(outputTuple, requestedCol, column, sourceIndex);
+    }
+
+    // The resolved tuple may have a subset of table columns
+    // and/or null columns. Project a new map that will be created
+    // to hold the projected map elements.
+
+    else {
+      outputTuple.add(mapCol);
+    }
+  }
+
+  private void resolveTuple(ResolvedTuple mapTuple,
+      RequestedTuple requestedTuple, TupleMetadata mapSchema) {
+    for (RequestedColumn col : requestedTuple.projections()) {
+      resolveColumn(mapTuple, col, mapSchema);
+    }
+  }
+
+  private void resolveArray(ResolvedTuple outputTuple,
+      RequestedColumn requestedCol, ColumnMetadata column,
+      int sourceIndex) {
+
+    // If the actual column isn't a array or list,
+    // then the request is invalid.
+
+    if (column.type() != MinorType.LIST && ! column.isArray()) {
+      throw UserException
+        .validationError()
+        .message("Project list implies an array, but actual column is not an array")
+        .addContext("Projected column", requestedCol.fullName())
+        .addContext("Actual cardinality", column.mode().name())
+        .build(logger);
+    }
+
+    // The project operator will do the actual array element
+    // projection.
+
+    projectTableColumn(outputTuple, requestedCol, column, sourceIndex);
+  }
+
+  /**
+   * Project a column to the specified output tuple. The name comes from the
+   * project list. (If the actual column name is `X` (upper case), but the
+   * project list requests `x` (lower case), project the column using the
+   * lower-case name. The column type comes from the table column. The source
+   * index is the location in the table map or row.
+   *
+   * @param outputTuple
+   *          projected tuple being built
+   * @param requestedCol
+   *          column as requested in the project list
+   * @param column
+   *          metadata for the actual table column
+   * @param sourceIndex
+   *          index of the column within the table tuple (implies the location
+   *          of the table vector to be projected)
+   */
+
+  private void projectTableColumn(ResolvedTuple outputTuple,
+      RequestedColumn requestedCol,
+      ColumnMetadata column, int sourceIndex) {
+     outputTuple.add(
+        new ResolvedTableColumn(requestedCol.name(),
+            MaterializedField.create(requestedCol.name(),
+                column.majorType()),
+            outputTuple, sourceIndex));
+  }
+
+  /**
+   * Resolve a null column. This is a projected column which does not match
+   * an implicit or table column. We consider two cases: a simple top-level
+   * column reference ("a", say) and an implied map reference ("a.b", say.)
+   * If the column appears to be a map, determine the set of children, which
+   * map appear to any depth, that were requested.
+   */
+
+  private void resolveNullColumn(ResolvedTuple outputTuple,
+      RequestedColumn requestedCol) {
+    ResolvedColumn nullCol;
+    if (requestedCol.isTuple()) {
+      nullCol = resolveMapMembers(outputTuple, requestedCol);
+    } else {
+      nullCol = outputTuple.nullBuilder.add(requestedCol.name());
+    }
+    outputTuple.add(nullCol);
+  }
+
+  /**
+   * A child column of a map is not projected. Recurse to determine the full
+   * set of nullable child columns.
+   *
+   * @param projectedColumn the map column which was projected
+   * @return a list of null markers for the requested children
+   */
+
+  private ResolvedColumn resolveMapMembers(ResolvedTuple outputTuple, RequestedColumn col) {
+    ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple, col.name());
+    ResolvedTuple members = mapCol.members();
+    for (RequestedColumn child : col.mapProjection().projections()) {
+      if (child.isTuple()) {
+        members.add(resolveMapMembers(members, child));
+      } else {
+        members.add(outputTuple.nullBuilder.add(child.name()));
+      }
+    }
+    return mapCol;
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java
new file mode 100644
index 00000000000..fe1e0240ae3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+
+/**
+ * Queries can contain a wildcard (*), table columns, or special
+ * system-defined columns (the file metadata columns AKA implicit
+ * columns, the `columns` column of CSV, etc.).
+ * <p>
+ * This class provides a generalized way of handling such extended
+ * columns. That is, this handles metadata for columns defined by
+ * the scan or file; columns defined by the table (the actual
+ * data metadata) is handled elsewhere.
+ * <p>
+ * Objects of this interface are driven by the projection processing
+ * framework which provides a vector cache from which to obtain
+ * materialized columns. The implementation must provide a projection
+ * parser to pick out the columns which this object handles.
+ * <p>
+ * A better name might be ImplicitMetadataManager to signify that
+ * this is about metadata other than table columns.
+ */
+public interface MetadataManager {
+
+  void bind(ResultVectorCache vectorCache);
+
+  ScanProjectionParser projectionParser();
+
+  SchemaProjectionResolver resolver();
+
+  /**
+   * Define (materialize) the columns which this manager
+   * represents.
+   */
+  void define();
+
+  /**
+   * Load data into the custom columns, if needed (such as for
+   * null or implicit columns.)
+   *
+   * @param rowCount number of rows read into a batch.
+   */
+  void load(int rowCount);
+
+  /**
+   * Event indicating the end of a file (or other data source.)
+   */
+  void endFile();
+
+  /**
+   * Event indicating the end of a scan.
+   */
+  void close();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java
new file mode 100644
index 00000000000..ec95de8a7c5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+
+/**
+ * Do-nothing implementation of the metadata manager. Allows the
+ * metadata manager to be optional without needing an if-statement
+ * on every access.
+ */
+
+public class NoOpMetadataManager implements MetadataManager {
+
+  @Override
+  public void bind(ResultVectorCache vectorCache) { }
+
+  @Override
+  public ScanProjectionParser projectionParser() { return null; }
+
+  @Override
+  public SchemaProjectionResolver resolver() {
+    // The resolver is requested only for user-defined metadata
+    // managers, not for this default, no-op version. If this
+    // method is called, something is amiss with the default
+    // setup.
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public void define() { }
+
+  @Override
+  public void load(int rowCount) { }
+
+  @Override
+  public void endFile() { }
+
+  @Override
+  public void close() { }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
new file mode 100644
index 00000000000..7e9d2fdd410
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader.NullColumnSpec;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Manages null columns by creating a null column loader for each
+ * set of non-empty null columns. This class acts as a scan-wide
+ * facade around the per-schema null column loader.
+ */
+
+public class NullColumnBuilder implements VectorSource {
+
+  /**
+   * Creates null columns if needed.
+   */
+
+  protected final List<NullColumnSpec> nullCols = new ArrayList<>();
+  private NullColumnLoader nullColumnLoader;
+  private VectorContainer outputContainer;
+
+  /**
+   * The reader-specified null type if other than the default.
+   */
+
+  private final MajorType nullType;
+  private final boolean allowRequiredNullColumns;
+
+  public NullColumnBuilder(
+      MajorType nullType, boolean allowRequiredNullColumns) {
+    this.nullType = nullType;
+    this.allowRequiredNullColumns = allowRequiredNullColumns;
+  }
+
+  public NullColumnBuilder newChild() {
+    return new NullColumnBuilder(nullType, allowRequiredNullColumns);
+  }
+
+  public ResolvedNullColumn add(String name) {
+    return add(name, null);
+  }
+
+  public ResolvedNullColumn add(String name, MajorType type) {
+    final ResolvedNullColumn col = new ResolvedNullColumn(name, type, this, nullCols.size());
+    nullCols.add(col);
+    return col;
+  }
+
+  public void build(ResultVectorCache vectorCache) {
+    close();
+
+    // If no null columns for this schema, no need to create
+    // the loader.
+
+    if (hasColumns()) {
+      nullColumnLoader = new NullColumnLoader(vectorCache, nullCols, nullType, allowRequiredNullColumns);
+      outputContainer = nullColumnLoader.output();
+    }
+  }
+
+  public boolean hasColumns() {
+    return nullCols != null && ! nullCols.isEmpty();
+  }
+
+  public void load(int rowCount) {
+    if (nullColumnLoader != null) {
+      final VectorContainer output = nullColumnLoader.load(rowCount);
+      assert output == outputContainer;
+    }
+  }
+
+  @Override
+  public ValueVector vector(int index) {
+    return outputContainer.getValueVector(index).getValueVector();
+  }
+
+  @VisibleForTesting
+  public VectorContainer output() { return outputContainer; }
+
+  public void close() {
+    if (nullColumnLoader != null) {
+      nullColumnLoader.close();
+      nullColumnLoader = null;
+    }
+    if (outputContainer != null) {
+      outputContainer.clear();
+      outputContainer = null;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java
new file mode 100644
index 00000000000..0c36cb99d46
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * Create and populate null columns for the case in which a SELECT statement
+ * refers to columns that do not exist in the actual table. Nullable and array
+ * types are suitable for null columns. (Drill defines an empty array as the
+ * same as a null array: not true, but the best we have at present.) Required
+ * types cannot be used as we don't know what value to set into the column
+ * values.
+ * <p>
+ * Seeks to preserve "vector continuity" by reusing vectors when possible.
+ * Cases:
+ * <ul>
+ * <li>A column a was available in a prior reader (or batch), but is no longer
+ * available, and is thus null. Reuses the type and vector of the prior reader
+ * (or batch) to prevent trivial schema changes.</li>
+ * <li>A column has an implied type (specified in the metadata about the
+ * column provided by the reader.) That type information is used instead of
+ * the defined null column type.</li>
+ * <li>A column has no type information. The type becomes the null column type
+ * defined by the reader (or nullable int by default.</li>
+ * <li>Required columns are not suitable. If any of the above found a required
+ * type, convert the type to nullable.</li>
+ * <li>The resulting column and type, whatever it turned out to be, is placed
+ * into the vector cache so that it can be reused by the next reader or batch,
+ * to again preserve vector continuity.</li>
+ * </ul>
+ * The above rules eliminate "trivia" schema changes, but can still result in
+ * "hard" schema changes if a required type is replaced by a nullable type.
+ */
+
+public class NullColumnLoader extends StaticColumnLoader {
+
+  public interface NullColumnSpec {
+    String name();
+    MajorType type();
+    void setType(MajorType type);
+  }
+
+  public static final MajorType DEFAULT_NULL_TYPE = MajorType.newBuilder()
+      .setMinorType(MinorType.INT)
+      .setMode(DataMode.OPTIONAL)
+      .build();
+
+  private final MajorType nullType;
+  private final boolean allowRequired;
+  private final boolean isArray[];
+
+  public NullColumnLoader(ResultVectorCache vectorCache, List<? extends NullColumnSpec> defns,
+      MajorType nullType, boolean allowRequired) {
+    super(vectorCache);
+
+    // Normally, null columns must be optional or arrays. However
+    // we allow required columns either if the client requests it,
+    // or if the client's requested null type is itself required.
+    // (The generated "null column" vectors will go into the vector
+    // cache and be pulled back out; we must preserve the required
+    // mode in this case.
+
+    this.allowRequired = allowRequired ||
+        (nullType != null && nullType.getMode() == DataMode.REQUIRED);
+
+    // Use the provided null type, else the standard nullable int.
+
+    if (nullType == null) {
+      this.nullType = DEFAULT_NULL_TYPE;
+    } else {
+      this.nullType = nullType;
+    }
+
+    // Populate the loader schema from that provided
+
+    RowSetLoader schema = loader.writer();
+    isArray = new boolean[defns.size()];
+    for (int i = 0; i < defns.size(); i++) {
+      NullColumnSpec defn = defns.get(i);
+      MaterializedField colSchema = selectType(defn);
+      isArray[i] = colSchema.getDataMode() == DataMode.REPEATED;
+      schema.addColumn(colSchema);
+    }
+  }
+
+  /**
+   * Implements the type mapping algorithm; preferring the best fit
+   * to preserve the schema, else resorting to changes when needed.
+   * @param defn output column definition
+   * @return type of the empty column that implements the definition
+   */
+
+  private MaterializedField selectType(NullColumnSpec defn) {
+
+    // Prefer the type of any previous occurrence of
+    // this column.
+
+    MajorType type = vectorCache.getType(defn.name());
+
+    // Else, use the type defined in the projection, if any.
+
+    if (type == null) {
+      type = defn.type();
+    }
+    if (type != null && ! allowRequired && type.getMode() == DataMode.REQUIRED) {
+
+      // Type was found in the vector cache and the type is required.
+      // The client determines whether to map required types to optional.
+      // The text readers use required Varchar columns for missing columns,
+      // and so no required-to-optional mapping is desired. Other readers
+      // want to use nulls for missing columns, and so need the
+      // required-to-nullable mapping.
+
+      type = MajorType.newBuilder()
+            .setMinorType(type.getMinorType())
+            .setMode(DataMode.OPTIONAL)
+            .build();
+    }
+
+    // Else, use the specified null type.
+
+    if (type == null) {
+      type = nullType;
+    }
+
+    // If the schema had the special NULL type, replace it with the
+    // null column type.
+
+    if (type.getMinorType() == MinorType.NULL) {
+      type = nullType;
+    }
+    defn.setType(type);
+    return MaterializedField.create(defn.name(), type);
+  }
+
+  public VectorContainer output() {
+    return loader.outputContainer();
+  }
+
+  @Override
+  public VectorContainer load(int rowCount) {
+    loader.startBatch();
+    loader.skipRows(rowCount);
+    return loader.harvest();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java
new file mode 100644
index 00000000000..a658629f24c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * A resolved column has a name, and a specification for how to project
+ * data from a source vector to a vector in the final output container.
+ * Describes the projection of a single column from
+ * an input to an output batch.
+ * <p>
+ * Although the table schema mechanism uses the newer "metadata"
+ * mechanism, resolved columns revert back to the original
+ * {@link MajorType} and {@link MaterializedField} mechanism used
+ * by the rest of Drill. Doing so loses a bit of additional
+ * information, but at present there is no way to export that information
+ * along with a serialized record batch; each operator must rediscover
+ * it after deserialization.
+ */
+
+public abstract class ResolvedColumn implements ColumnProjection {
+
+  public final VectorSource source;
+  public final int sourceIndex;
+
+  public ResolvedColumn(VectorSource source, int sourceIndex) {
+    this.source = source;
+    this.sourceIndex = sourceIndex;
+  }
+
+  public VectorSource source() { return source; }
+
+  public int sourceIndex() { return sourceIndex; }
+
+  /**
+   * Return the type of this column. Used primarily by the schema smoothing
+   * mechanism.
+   *
+   * @return the MaterializedField representation of this column
+   */
+
+  public abstract MaterializedField schema();
+
+  public void project(ResolvedTuple dest) {
+    dest.addVector(source.vector(sourceIndex));
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java
new file mode 100644
index 00000000000..4a158f7b074
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedMap;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedMapArray;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedSingleMap;
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * Represents a column which is implicitly a map (because it has children
+ * in the project list), but which does not match any column in the table.
+ * This kind of column gives rise to a map of null columns in the output.
+ */
+
+public class ResolvedMapColumn extends ResolvedColumn {
+
+  public static final int ID = 17;
+
+  private final MaterializedField schema;
+  private final ResolvedTuple parent;
+  private final ResolvedMap members;
+
+  public ResolvedMapColumn(ResolvedTuple parent, String name) {
+    super(parent, -1);
+    schema = MaterializedField.create(name,
+        Types.required(MinorType.MAP));
+    this.parent = parent;
+    members = new ResolvedSingleMap(this);
+    parent.addChild(members);
+  }
+
+  public ResolvedMapColumn(ResolvedTuple parent,
+      MaterializedField schema, int sourceIndex) {
+    super(parent, sourceIndex);
+    this.schema = schema;
+    this.parent = parent;
+
+    // This column corresponds to an input map.
+    // We may have to create a matching new output
+    // map. Determine whether it is a single map or
+    // an array.
+
+    assert schema.getType().getMinorType() == MinorType.MAP;
+    if (schema.getType().getMode() == DataMode.REPEATED) {
+      members = new ResolvedMapArray(this);
+    } else {
+      members = new ResolvedSingleMap(this);
+    }
+    parent.addChild(members);
+  }
+
+  public ResolvedTuple parent() { return parent; }
+
+  @Override
+  public String name() { return schema.getName(); }
+
+  @Override
+  public int nodeType() { return ID; }
+
+  public ResolvedTuple members() { return members; }
+
+  @Override
+  public void project(ResolvedTuple dest) {
+    dest.addVector(members.buildMap());
+  }
+
+  @Override
+  public MaterializedField schema() { return schema; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java
new file mode 100644
index 00000000000..7694b7e713b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader.NullColumnSpec;
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * Projected column that serves as both a resolved column (provides projection
+ * mapping) and a null column spec (provides the information needed to create
+ * the required null vectors.)
+ */
+
+public class ResolvedNullColumn extends ResolvedColumn implements NullColumnSpec {
+
+  public static final int ID = 4;
+
+  private final String name;
+  private MajorType type;
+
+  public ResolvedNullColumn(String name, MajorType type, VectorSource source, int sourceIndex) {
+    super(source, sourceIndex);
+    this.name = name;
+    this.type = type;
+  }
+
+  @Override
+  public String name() { return name; }
+
+  @Override
+  public MajorType type() { return type; }
+
+  @Override
+  public int nodeType() { return ID; }
+
+  @Override
+  public void setType(MajorType type) {
+
+    // Update the actual type based on what the null-column
+    // mechanism chose for this column.
+
+    this.type = type;
+  }
+
+  @Override
+  public MaterializedField schema() {
+    return MaterializedField.create(name, type);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java
new file mode 100644
index 00000000000..f17802a6c08
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * Column that matches one provided by the table. Provides the data type
+ * of that column and information to project from the result set loader
+ * output container to the scan output container. (Note that the result
+ * set loader container is, itself, a projection from the actual table
+ * schema to the desired set of columns; but in the order specified
+ * by the table.)
+ */
+
+public class ResolvedTableColumn extends ResolvedColumn {
+
+  public static final int ID = 3;
+
+  public final String projectedName;
+  public final MaterializedField schema;
+
+  public ResolvedTableColumn(String projectedName,
+      MaterializedField schema,
+      VectorSource source, int sourceIndex) {
+    super(source, sourceIndex);
+    this.projectedName = projectedName;
+    this.schema = schema;
+  }
+
+  @Override
+  public String name() { return projectedName; }
+
+  @Override
+  public MaterializedField schema() { return schema; }
+
+  @Override
+  public int nodeType() { return ID; }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf
+      .append("[")
+      .append(getClass().getSimpleName())
+      .append(" name=")
+      .append(name())
+      .append("]");
+    return buf.toString();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
new file mode 100644
index 00000000000..c6e15106b96
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Drill rows are made up of a tree of tuples, with the row being the root
+ * tuple. Each tuple contains columns, some of which may be maps. This
+ * class represents each row or map in the output projection.
+ * <p>
+ * Output columns within the tuple can be projected from the data source,
+ * might be null (requested columns that don't match a data source column)
+ * or might be a constant (such as an implicit column.) This class
+ * orchestrates assembling an output tuple from a collection of these
+ * three column types. (Though implicit columns appear only in the root
+ * tuple.)
+ *
+ * <h4>Null Handling</h4>
+ *
+ * The project list might reference a "missing" map if the project list
+ * includes, say, <tt>SELECT a.b.c</tt> but <tt>`a`</tt> does not exist
+ * in the data source. In this case, the column a is implied to be a map,
+ * so the projection mechanism will create a null map for <tt>`a`</tt>
+ * and <tt>`b`</tt>, and will create a null column for <tt>`c`</tt>.
+ * <p>
+ * To accomplish this recursive null processing, each tuple is associated
+ * with a null builder. (The null builder can be null if projection is
+ * implicit with a wildcard; in such a case no null columns can occur.
+ * But, even here, with schema persistence, a <tt>SELECT *</tt> query
+ * may need null columns if a second file does not contain a column
+ * that appeared in a first file.)
+ * <p>
+ * The null builder is bound to each tuple to allow vector persistence
+ * via the result vector cache. If we must create a null column
+ * <tt>`x`</tt> in two different readers, then the rules of Drill
+ * require that the same vector be used for both (or else a schema
+ * change is signaled.) The vector cache works by name (and type).
+ * Since maps may contain columns with the same names as other maps,
+ * the vector cache must be associated with each tuple. And, by extension,
+ * the null builder must also be associated with each tuple.
+ *
+ * <h4>Lifecycle</h4>
+ *
+ * The lifecycle of a resolved tuple is:
+ * <ul>
+ * <li>The projection mechanism creates the output tuple, and its columns,
+ * by comparing the project list against the table schema. The result is
+ * a set of table, null, or constant columns.</li>
+ * <li>Once per schema change, the resolved tuple creates the output
+ * tuple by linking to vectors in their original locations. As it turns out,
+ * we can simply share the vectors; we don't need to transfer the buffers.</li>
+ * <li>To prepare for the transfer, the tuple asks the null column builder
+ * (if present) to build the required null columns.</li>
+ * <li>Once the output tuple is built, it can be used for any number of
+ * batches without further work. (The same vectors appear in the various inputs
+ * and the output, eliminating the need for any transfers.)</li>
+ * <li>Once per batch, the client must set the row count. This is needed for the
+ * output container, and for any "null" maps that the project may have created.</li>
+ * </ul>
+ *
+ * <h4>Projection Mapping</h4>
+ *
+ * Each column is is mapped into the output tuple (vector container or map) in
+ * the order that the columns are defined here. (That order follows the project
+ * list for explicit projection, or the table schema for implicit projection.)
+ * The source, however, may be in any order (at least for the table schema.)
+ * A projection mechanism identifies the {@link VectorSource} that supplies the
+ * vector for the column, along with the vector's index within that source.
+ * The resolved tuple is bound to an output tuple. The projection mechanism
+ * grabs the input vector from the vector source at the indicated position, and
+ * links it into the output tuple, represented by this projected tuple, at the
+ * position of the resolved column in the child list.
+ *
+ * <h4>Caveats</h4>
+ *
+ * The project mechanism handles nested "missing" columns as mentioned
+ * above. This works to create null columns within maps that are defined by the
+ * data source. However, the mechanism does not currently handle creating null
+ * columns within repeated maps or lists. Doing so is possible, but requires
+ * adding a level of cardinality computation to create the proper number of
+ * "inner" values.
+ */
+
+public abstract class ResolvedTuple implements VectorSource {
+
+  /**
+   * Represents the top-level tuple which is projected to a
+   * vector container.
+   */
+
+  public static class ResolvedRow extends ResolvedTuple {
+
+    private VectorContainer input;
+    private VectorContainer output;
+
+    public ResolvedRow(NullColumnBuilder nullBuilder) {
+      super(nullBuilder);
+    }
+
+    public void project(VectorContainer input, VectorContainer output) {
+      this.input = input;
+      this.output = output;
+      output.removeAll();
+      buildColumns();
+      output.buildSchema(SelectionVectorMode.NONE);
+    }
+
+    @Override
+    public ValueVector vector(int index) {
+      return input.getValueVector(index).getValueVector();
+    }
+
+    @Override
+    public void addVector(ValueVector vector) {
+      output.add(vector);
+    }
+
+    @Override
+    public void setRowCount(int rowCount) {
+      output.setRecordCount(rowCount);
+      cascadeRowCount(rowCount);
+    }
+
+    @Override
+    public BufferAllocator allocator() {
+      return output.getAllocator();
+    }
+
+    @Override
+    public String name() {
+
+      // Never used in production, just for debugging.
+
+       return "$root$";
+    }
+
+    public VectorContainer output() { return output; }
+
+    @Override
+    public int innerCardinality(int rowCount) { return rowCount; }
+  }
+
+  /**
+   * Represents a map implied by the project list, whether or not the map
+   * actually appears in the table schema.
+   * The column is implied to be a map because it contains
+   * children. This implementation builds the map and its children.
+   */
+
+  public static abstract class ResolvedMap extends ResolvedTuple {
+
+    protected final ResolvedMapColumn parentColumn;
+    protected AbstractMapVector inputMap;
+    protected AbstractMapVector outputMap;
+
+    public ResolvedMap(ResolvedMapColumn parentColumn) {
+      super(parentColumn.parent().nullBuilder == null ? null : parentColumn.parent().nullBuilder.newChild());
+      this.parentColumn = parentColumn;
+    }
+
+    @Override
+    public void addVector(ValueVector vector) {
+      outputMap.putChild(vector.getField().getName(), vector);
+    }
+
+    @Override
+    public ValueVector vector(int index) {
+      assert inputMap != null;
+      return inputMap.getChildByOrdinal(index);
+    }
+
+    public AbstractMapVector buildMap() {
+      if (parentColumn.sourceIndex() != -1) {
+        final ResolvedTuple parentTuple = parentColumn.parent();
+        inputMap = (AbstractMapVector) parentTuple.vector(parentColumn.sourceIndex());
+      }
+      final MaterializedField colSchema = parentColumn.schema();
+      outputMap = createMap(inputMap,
+          MaterializedField.create(
+              colSchema.getName(), colSchema.getType()),
+          parentColumn.parent().allocator());
+      buildColumns();
+      return outputMap;
+    }
+
+    protected abstract AbstractMapVector createMap(AbstractMapVector inputMap,
+        MaterializedField create, BufferAllocator allocator);
+
+    @Override
+    public BufferAllocator allocator() {
+      return outputMap.getAllocator();
+    }
+
+    @Override
+    public String name() { return parentColumn.name(); }
+  }
+
+  public static class ResolvedSingleMap extends ResolvedMap {
+
+    public ResolvedSingleMap(ResolvedMapColumn parentColumn) {
+      super(parentColumn);
+    }
+
+    @Override
+    protected AbstractMapVector createMap(AbstractMapVector inputMap,
+        MaterializedField schema, BufferAllocator allocator) {
+      return new MapVector(schema,
+          allocator, null);
+    }
+
+    @Override
+    public void setRowCount(int rowCount) {
+      ((MapVector) outputMap).setMapValueCount(rowCount);
+      cascadeRowCount(rowCount);
+    }
+
+    @Override
+    public int innerCardinality(int outerCardinality) {
+      return outerCardinality;
+    }
+  }
+
+  /**
+   * Represents a map tuple (not the map column, rather the value of the
+   * map column.) When projecting, we create a new repeated map vector,
+   * but share the offsets vector from input to output. The size of the
+   * offset vector reveals the number of elements in the "inner" array,
+   * which is the number of null values to create if null columns are
+   * added.
+   */
+
+  public static class ResolvedMapArray extends ResolvedMap {
+
+    private int valueCount;
+
+    public ResolvedMapArray(ResolvedMapColumn parentColumn) {
+      super(parentColumn);
+    }
+
+    @Override
+    protected AbstractMapVector createMap(AbstractMapVector inputMap,
+        MaterializedField schema, BufferAllocator allocator) {
+
+      // Create a new map array, reusing the offset vector from
+      // the original input map.
+
+      final RepeatedMapVector source = (RepeatedMapVector) inputMap;
+      final UInt4Vector offsets = source.getOffsetVector();
+      valueCount = offsets.getAccessor().getValueCount();
+      return new RepeatedMapVector(schema,
+          offsets, null);
+    }
+
+    @Override
+    public int innerCardinality(int outerCardinality) {
+      return valueCount;
+    }
+
+    @Override
+    public void setRowCount(int rowCount) {
+      cascadeRowCount(valueCount);
+    }
+  }
+
+  protected final List<ResolvedColumn> members = new ArrayList<>();
+  protected final NullColumnBuilder nullBuilder;
+  protected List<ResolvedTuple> children;
+  protected VectorSource binding;
+
+  public ResolvedTuple(NullColumnBuilder nullBuilder) {
+    this.nullBuilder = nullBuilder;
+  }
+
+  public NullColumnBuilder nullBuilder() {
+    return nullBuilder;
+  }
+
+  public void add(ResolvedColumn col) {
+    members.add(col);
+  }
+
+  public void addChild(ResolvedTuple child) {
+    if (children == null) {
+      children = new ArrayList<>();
+    }
+    children.add(child);
+  }
+
+  public void removeChild(ResolvedTuple child) {
+    assert ! children.isEmpty() && children.get(children.size()-1) == child;
+    children.remove(children.size()-1);
+  }
+
+  public boolean isSimpleProjection() {
+    if (children != null && ! children.isEmpty()) {
+      return false;
+    }
+    for (int i = 0; i < members.size(); i++) {
+      if (members.get(i).nodeType() == ResolvedNullColumn.ID) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @VisibleForTesting
+  public List<ResolvedColumn> columns() { return members; }
+
+  public void buildNulls(ResultVectorCache vectorCache) {
+    if (nullBuilder != null) {
+      nullBuilder.build(vectorCache);
+    }
+    if (children != null) {
+      for (final ResolvedTuple child : children) {
+        child.buildNulls(vectorCache.childCache(child.name()));
+      }
+    }
+  }
+
+  public void loadNulls(int rowCount) {
+    if (nullBuilder != null) {
+      nullBuilder.load(rowCount);
+    }
+    if (children != null) {
+      for (final ResolvedTuple child : children) {
+        child.loadNulls(innerCardinality(rowCount));
+      }
+    }
+  }
+
+  public abstract int innerCardinality(int outerCardinality);
+
+  /**
+   * Merge two or more <i>partial batches</i> to produce a final output batch.
+   * A partial batch is a vertical slice of a batch, such as the set of null
+   * columns or the set of data columns.
+   * <p>
+   * For example, consider
+   * two partial batches:<pre><code>
+   * (a, d, e)
+   * (c, b)</code></pre>
+   * We may wish to merge them by projecting columns into an output batch
+   * of the form:<pre><code>
+   * (a, b, c, d)</code></pre>
+   * It is not necessary to project all columns from the inputs, but all
+   * columns in the output must have a projection.
+   * <p>
+   * The merger is created once per schema, then can be reused for any
+   * number of batches. The only restriction is that the partial batches must
+   * have the same row count so that the final output batch record
+   * count makes sense.
+   * <p>
+   * Merging is done by discarding any data in the output, then exchanging
+   * the buffers from the input columns to the output, leaving projected
+   * columns empty. Note that unprojected columns must be cleared by the
+   * caller. The caller will have figured out which columns to project and
+   * which not to project.
+   */
+
+  protected void buildColumns() {
+    for (int i = 0; i < members.size(); i++) {
+      members.get(i).project(this);
+    }
+  }
+
+  public abstract void addVector(ValueVector vector);
+
+  public abstract void setRowCount(int rowCount);
+
+  protected void cascadeRowCount(int rowCount) {
+    if (children == null) {
+      return;
+    }
+    for (final ResolvedTuple child : children) {
+      child.setRowCount(rowCount);
+    }
+  }
+
+  public abstract BufferAllocator allocator();
+
+  public abstract String name();
+
+  /**
+   * During planning, discard a partial plan to allow reusing the same (root) tuple
+   * for multiple projection plans.
+   */
+
+  public void reset() {
+    members.clear();
+    children = null;
+  }
+
+  public void close() {
+    if (nullBuilder != null) {
+      nullBuilder.close();
+    }
+    if (children != null) {
+      for (final ResolvedTuple child : children) {
+        child.close();
+      }
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
new file mode 100644
index 00000000000..e20724521e7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
+
+/**
+ * Parses and analyzes the projection list passed to the scanner. The
+ * projection list is per scan, independent of any tables that the
+ * scanner might scan. The projection list is then used as input to the
+ * per-table projection planning.
+ * <p>
+ * In most query engines, this kind of projection analysis is done at
+ * plan time. But, since Drill is schema-on-read, we don't know the
+ * available columns, or their types, until we start scanning a table.
+ * The table may provide the schema up-front, or may discover it as
+ * the read proceeds. Hence, the job here is to make sense of the
+ * project list based on static a-priori information, then to create
+ * a list that can be further resolved against an table schema when it
+ * appears. This give us two steps:
+ * <ul>
+ * <li>Scan-level projection: this class, that handles schema for the
+ * entire scan operator.</li>
+ * <li>Table-level projection: defined elsewhere, that merges the
+ * table and scan-level projections.
+ * </ul>
+ * <p>
+ * Accepts the inputs needed to
+ * plan a projection, builds the mappings, and constructs the projection
+ * mapping object.
+ * <p>
+ * Builds the per-scan projection plan given a set of projected columns.
+ * Determines the output schema, which columns to project from the data
+ * source, which are metadata, and so on.
+ * <p>
+ * An annoying aspect of SQL is that the projection list (the list of
+ * columns to appear in the output) is specified after the SELECT keyword.
+ * In Relational theory, projection is about columns, selection is about
+ * rows...
+ * <p>
+ * Mappings can be based on three primary use cases:
+ * <ul>
+ * <li><tt>SELECT *</tt>: Project all data source columns, whatever they happen
+ * to be. Create columns using names from the data source. The data source
+ * also determines the order of columns within the row.</li>
+ * <li><tt>SELECT columns</tt>: Similar to SELECT * in that it projects all columns
+ * from the data source, in data source order. But, rather than creating
+ * individual output columns for each data source column, creates a single
+ * column which is an array of Varchars which holds the (text form) of
+ * each column as an array element.</li>
+ * <li><tt>SELECT a, b, c, ...</tt>: Project a specific set of columns, identified by
+ * case-insensitive name. The output row uses the names from the SELECT list,
+ * but types from the data source. Columns appear in the row in the order
+ * specified by the SELECT.</li>
+ * <li<tt>SELECT ...</tt>: SELECT nothing, occurs in <tt>SELECT COUNT(*)</tt>
+ * type queries. The provided projection list contains no (table) columns, though
+ * it may contain metadata columns.</li>
+ * </ul>
+ * Names in the SELECT list can reference any of five distinct types of output
+ * columns:
+ * <ul>
+ * <li>Wildcard ("*") column: indicates the place in the projection list to insert
+ * the table columns once found in the table projection plan.</li>
+ * <li>Data source columns: columns from the underlying table. The table
+ * projection planner will determine if the column exists, or must be filled
+ * in with a null column.</li>
+ * <li>The generic data source columns array: <tt>columns</tt>, or optionally
+ * specific members of the <tt>columns</tt> array such as <tt>columns[1]</tt>.</li>
+ * <li>Implicit columns: <tt>fqn</tt>, <tt>filename</tt>, <tt>filepath</tt>
+ * and <tt>suffix</tt>. These reference
+ * parts of the name of the file being scanned.</li>
+ * <li>Partition columns: <tt>dir0</tt>, <tt>dir1</tt>, ...: These reference
+ * parts of the path name of the file.</li>
+ * </ul>
+ *
+ * @see {@link ImplicitColumnExplorer}, the class from which this class
+ * evolved
+ */
+
+public class ScanLevelProjection {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanLevelProjection.class);
+
+  /**
+   * Interface for add-on parsers, avoids the need to create
+   * a single, tightly-coupled parser for all types of columns.
+   * The main parser handles wildcards and assumes the rest of
+   * the columns are table columns. The add-on parser can tag
+   * columns as special, such as to hold metadata.
+   */
+
+  public interface ScanProjectionParser {
+    void bind(ScanLevelProjection builder);
+    boolean parse(RequestedColumn inCol);
+    void validate();
+    void validateColumn(ColumnProjection col);
+    void build();
+  }
+
+  // Input
+
+  protected final List<SchemaPath> projectionList;
+
+  // Configuration
+
+  protected List<ScanProjectionParser> parsers;
+  private final boolean v1_12MetadataLocation;
+
+  // Internal state
+
+  protected boolean sawWildcard;
+
+  // Output
+
+  protected List<ColumnProjection> outputCols = new ArrayList<>();
+  protected RequestedTuple outputProjection;
+  protected boolean hasWildcard;
+  protected boolean emptyProjection = true;
+
+  /**
+   * Specify the set of columns in the SELECT list. Since the column list
+   * comes from the query planner, assumes that the planner has checked
+   * the list for syntax and uniqueness.
+   *
+   * @param queryCols list of columns in the SELECT list in SELECT list order
+   * @return this builder
+   */
+  public ScanLevelProjection(List<SchemaPath> projectionList,
+      List<ScanProjectionParser> parsers,
+      boolean v1_12MetadataLocation) {
+    this.projectionList = projectionList;
+    this.parsers = parsers;
+    this.v1_12MetadataLocation = v1_12MetadataLocation;
+    doParse();
+  }
+
+  private void doParse() {
+    outputProjection = RequestedTupleImpl.parse(projectionList);
+
+    for (ScanProjectionParser parser : parsers) {
+      parser.bind(this);
+    }
+    for (RequestedColumn inCol : outputProjection.projections()) {
+      if (inCol.isWildcard()) {
+        mapWildcard(inCol);
+      } else {
+        mapColumn(inCol);
+      }
+    }
+    verify();
+    for (ScanProjectionParser parser : parsers) {
+      parser.build();
+    }
+  }
+
+  public ScanLevelProjection(List<SchemaPath> projectionList,
+      List<ScanProjectionParser> parsers) {
+    this(projectionList, parsers, false);
+  }
+
+  /**
+   * Wildcard is special: add it, then let parsers add any custom
+   * columns that are needed. The order is important: we want custom
+   * columns to follow table columns.
+   */
+
+  private void mapWildcard(RequestedColumn inCol) {
+
+    // Wildcard column: this is a SELECT * query.
+
+    if (sawWildcard) {
+      throw new IllegalArgumentException("Duplicate * entry in project list");
+    }
+
+    // Remember the wildcard position, if we need to insert it.
+    // Ensures that the main wildcard expansion occurs before add-on
+    // columns.
+
+    int wildcardPosn = outputCols.size();
+
+    // Parsers can consume the wildcard. But, all parsers must
+    // have visibility to the wildcard column.
+
+    for (ScanProjectionParser parser : parsers) {
+      if (parser.parse(inCol)) {
+        wildcardPosn = -1;
+      }
+    }
+
+    // Set this flag only after the parser checks.
+
+    sawWildcard = true;
+
+    // If not consumed, put the wildcard column into the projection list as a
+    // placeholder to be filled in later with actual table columns.
+
+    if (wildcardPosn != -1) {
+
+      // Drill 1.1 - 1.11 and Drill 1.13 or later put metadata columns after
+      // data columns. Drill 1.12 moved them before data columns. For testing
+      // and compatibility, the client can request to use the Drill 1.12 position,
+      // though the after-data position is the default.
+      //
+      // Note that the after-data location is much more convenient for the dirx
+      // partition columns since these vary in number across scans within the same query.
+      // By putting them at the end, the index of all other columns remains
+      // constant. Drill 1.12 broke that behavior, but Drill 1.13 restored it.
+      //
+      // This option can be removed in Drill 1.14 after things settle down.
+
+      UnresolvedColumn wildcardCol = new UnresolvedColumn(inCol, UnresolvedColumn.WILDCARD);
+      if (v1_12MetadataLocation) {
+        outputCols.add(wildcardCol);
+      } else {
+        outputCols.add(wildcardPosn, wildcardCol);
+      }
+      hasWildcard = true;
+      emptyProjection = false;
+    }
+  }
+
+  /**
+   * Map the column into one of five categories.
+   * <ol>
+   * <li>Star column (to designate SELECT *)</li>
+   * <li>Partition file column (dir0, dir1, etc.)</li>
+   * <li>Implicit column (fqn, filepath, filename, suffix)</li>
+   * <li>Special <tt>columns</tt> column which holds all columns as
+   * an array.</li>
+   * <li>Table column. The actual match against the table schema
+   * is done later.</li>
+   * </ol>
+   *
+   * Actual mapping is done by parser extensions for all but the
+   * basic cases.
+   *
+   * @param inCol the SELECT column
+   */
+
+  private void mapColumn(RequestedColumn inCol) {
+
+    // Give the extensions first crack at each column.
+    // Some may want to "sniff" a column, even if they
+    // don't fully handle it.
+
+    for (ScanProjectionParser parser : parsers) {
+      if (parser.parse(inCol)) {
+        return;
+      }
+    }
+
+    // This is a desired table column.
+
+    addTableColumn(
+        new UnresolvedColumn(inCol, UnresolvedColumn.UNRESOLVED));
+  }
+
+  public void addTableColumn(ColumnProjection outCol) {
+    outputCols.add(outCol);
+    emptyProjection = false;
+  }
+
+  public void addMetadataColumn(ColumnProjection outCol) {
+    outputCols.add(outCol);
+  }
+
+  /**
+   * Once all columns are identified, perform a final pass
+   * over the set of columns to do overall validation. Each
+   * add-on parser is given an opportunity to do its own
+   * validation.
+   */
+
+  private void verify() {
+
+    // Let parsers do overall validation.
+
+    for (ScanProjectionParser parser : parsers) {
+      parser.validate();
+    }
+
+    // Validate column-by-column.
+
+    for (ColumnProjection outCol : outputCols) {
+      for (ScanProjectionParser parser : parsers) {
+        parser.validateColumn(outCol);
+      }
+      switch (outCol.nodeType()) {
+      case UnresolvedColumn.UNRESOLVED:
+        if (hasWildcard()) {
+          throw new IllegalArgumentException("Cannot select table columns and * together");
+        }
+        break;
+      default:
+        break;
+      }
+    }
+  }
+
+  /**
+   * Return the set of columns from the SELECT list
+   * @return the SELECT list columns, in SELECT list order
+   */
+
+  public List<SchemaPath> requestedCols() { return projectionList; }
+
+  /**
+   * The entire set of output columns, in output order. Output order is
+   * that specified in the SELECT (for an explicit list of columns) or
+   * table order (for SELECT * queries).
+   * @return the set of output columns in output order
+   */
+
+  public List<ColumnProjection> columns() { return outputCols; }
+
+  public boolean hasWildcard() { return hasWildcard; }
+
+  /**
+   * Return whether this is a SELECT * query
+   * @return true if this is a SELECT * query
+   */
+
+  public boolean projectAll() { return hasWildcard; }
+
+  /**
+   * Returns true if the projection list is empty. This usually
+   * indicates a <tt>SELECT COUNT(*)</tt> query (though the scan
+   * operator does not have the context to know that an empty
+   * list does, in fact, imply a count-only query...)
+   *
+   * @return true if no table columns are projected, false
+   * if at least one column is projected (or the query contained
+   * the wildcard)
+   */
+
+  public boolean projectNone() { return emptyProjection; }
+
+  public RequestedTuple rootProjection() { return outputProjection; }
+
+  @Override
+  public String toString() {
+    return new StringBuilder()
+        .append("[")
+        .append(getClass().getSimpleName())
+        .append(" projection=")
+        .append(outputCols.toString())
+        .append("]")
+        .toString();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
new file mode 100644
index 00000000000..c2b0262fd1a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Performs projection of a record reader, along with a set of static
+ * columns, to produce the final "public" result set (record batch)
+ * for the scan operator. Primarily solve the "vector permanence"
+ * problem: that the scan operator must present the same set of vectors
+ * to downstream operators despite the fact that the scan operator hosts
+ * a series of readers, each of which builds its own result set.
+ * <p>
+ * Provides the option to continue a schema from one batch to the next.
+ * This can reduce spurious schema changes in formats, such as JSON, with
+ * varying fields. It is not, however, a complete solution as the outcome
+ * still depends on the order of file scans and the division of files across
+ * readers.
+ * <p>
+ * Provides the option to infer the schema from the first batch. The "quick path"
+ * to obtain the schema will read one batch, then use that schema as the returned
+ * schema, returning the full batch in the next call to <tt>next()</tt>.
+ *
+ * <h4>Publishing the Final Result Set<h4>
+ *
+ * This class "publishes" a vector container that has the final, projected
+ * form of a scan. The projected schema include:
+ * <ul>
+ * <li>Columns from the reader.</li>
+ * <li>Static columns, such as implicit or partition columns.</li>
+ * <li>Null columns for items in the select list, but not found in either
+ * of the above two categories.</li>
+ * </ul>
+ * The order of columns is that set by the select list (or, by the reader for
+ * a <tt>SELECT *</tt> query.
+ *
+ * <h4>Schema Handling</h4>
+ *
+ * The mapping handles a variety of cases:
+ * <ul>
+ * <li>An early-schema table (one in which we know the schema and
+ * the schema remains constant for the whole table.</li>
+ * <li>A late schema table (one in which we discover the schema as
+ * we read the table, and where the schema can change as the read
+ * progresses.)<ul>
+ * <li>Late schema table with SELECT * (we want all columns, whatever
+ * they happen to be.)</li>
+ * <li>Late schema with explicit select list (we want only certain
+ * columns when they happen to appear in the input.)</li></ul></li>
+ * </ul>
+ *
+ * <h4>Implementation Overview</h4>
+ *
+ * Major tasks of this class include:
+ * <ul>
+ * <li>Project table columns (change position and or name).</li>
+ * <li>Insert static and null columns.</li>
+ * <li>Schema smoothing. That is, if table A produces columns (a, b), but
+ * table B produces only (a), use the type of the first table's b column for the
+ * null created for the missing b in table B.</li>
+ * <li>Vector persistence: use the same set of vectors across readers as long
+ * as the reader schema does not cause a "hard" schema change (change in type,
+ * introduction of a new column.</li>
+ * <li>Detection of schema changes (change of type, introduction of a new column
+ * for a <tt>SELECT *</tt> query, changing the projected schema, and reporting
+ * the change downstream.</li>
+ * </ul>
+ * A projection is needed to:
+ * <ul>
+ * <li>Reorder table columns</li>
+ * <li>Select a subset of table columns</li>
+ * <li>Fill in missing select columns</li>
+ * <li>Fill in implicit or partition columns</li>
+ * </ul>
+ * Creates and returns the batch merger that does the projection.
+ *
+ * <h4>Projection</h4>
+ *
+ * To visualize this, assume we have numbered table columns, lettered
+ * implicit, null or partition columns:<pre><code>
+ * [ 1 | 2 | 3 | 4 ]    Table columns in table order
+ * [ A | B | C ]        Static columns
+ * </code></pre>
+ * Now, we wish to project them into select order.
+ * Let's say that the SELECT clause looked like this, with "t"
+ * indicating table columns:<pre><code>
+ * SELECT t2, t3, C, B, t1, A, t2 ...
+ * </code></pre>
+ * Then the projection looks like this:<pre><code>
+ * [ 2 | 3 | C | B | 1 | A | 2 ]
+ * </code></pre>
+ * Often, not all table columns are projected. In this case, the
+ * result set loader presents the full table schema to the reader,
+ * but actually writes only the projected columns. Suppose we
+ * have:<pre><code>
+ * SELECT t3, C, B, t1,, A ...
+ * </code></pre>
+ * Then the abbreviated table schema looks like this:<pre><code>
+ * [ 1 | 3 ]</code></pre>
+ * Note that table columns retain their table ordering.
+ * The projection looks like this:<pre><code>
+ * [ 2 | C | B | 1 | A ]
+ * </code></pre>
+ * <p>
+ * The projector is created once per schema, then can be reused for any
+ * number of batches.
+ * <p>
+ * Merging is done in one of two ways, depending on the input source:
+ * <ul>
+ * <li>For the table loader, the merger discards any data in the output,
+ * then exchanges the buffers from the input columns to the output,
+ * leaving projected columns empty. Note that unprojected columns must
+ * be cleared by the caller.</li>
+ * <li>For implicit and null columns, the output vector is identical
+ * to the input vector.</li>
+ */
+
+public class ScanSchemaOrchestrator {
+
+  public static final int MIN_BATCH_BYTE_SIZE = 256 * 1024;
+  public static final int MAX_BATCH_BYTE_SIZE = Integer.MAX_VALUE;
+  public static final int DEFAULT_BATCH_ROW_COUNT = 4096;
+  public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE;
+  public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+  /**
+   * Orchestrates projection tasks for a single reader with the set that the
+   * scan operator manages. Vectors are reused across readers, but via a vector
+   * cache. All other state is distinct between readers.
+   */
+
+  public class ReaderSchemaOrchestrator implements VectorSource {
+
+    private int readerBatchSize;
+    private ResultSetLoaderImpl tableLoader;
+    private int prevTableSchemaVersion = -1;
+
+    /**
+     * Assembles the table, metadata and null columns into the final output
+     * batch to be sent downstream. The key goal of this class is to "smooth"
+     * schema changes in this output batch by absorbing trivial schema changes
+     * that occur across readers.
+     */
+
+    private ResolvedRow rootTuple;
+    private VectorContainer tableContainer;
+
+    public ReaderSchemaOrchestrator() {
+      readerBatchSize = scanBatchRecordLimit;
+    }
+
+    public void setBatchSize(int size) {
+      if (size > 0) {
+        readerBatchSize = Math.min(size, scanBatchRecordLimit);
+      }
+    }
+
+    public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) {
+      OptionBuilder options = new OptionBuilder();
+      options.setRowCountLimit(readerBatchSize);
+      options.setVectorCache(vectorCache);
+      options.setBatchSizeLimit(scanBatchByteLimit);
+
+      // Set up a selection list if available and is a subset of
+      // table columns. (Only needed for non-wildcard queries.)
+      // The projection list includes all candidate table columns
+      // whether or not they exist in the up-front schema. Handles
+      // the odd case where the reader claims a fixed schema, but
+      // adds a column later.
+
+      if (! scanProj.projectAll()) {
+        options.setProjectionSet(scanProj.rootProjection());
+      }
+      options.setSchema(tableSchema);
+
+      // Create the table loader
+
+      tableLoader = new ResultSetLoaderImpl(allocator, options.build());
+
+      // If a schema is given, create a zero-row batch to announce the
+      // schema downstream in the form of an empty batch.
+
+      if (tableSchema != null) {
+        tableLoader.startEmptyBatch();
+        endBatch();
+      }
+
+      return tableLoader;
+    }
+
+    public boolean hasSchema() {
+      return prevTableSchemaVersion >= 0;
+    }
+
+    public void startBatch() {
+      tableLoader.startBatch();
+    }
+
+    /**
+     * Build the final output batch by projecting columns from the three input sources
+     * to the output batch. First, build the metadata and/or null columns for the
+     * table row count. Then, merge the sources.
+     */
+
+    public void endBatch() {
+
+      // Get the batch results in a container.
+
+      tableContainer = tableLoader.harvest();
+
+      // If the schema changed, set up the final projection based on
+      // the new (or first) schema.
+
+      if (prevTableSchemaVersion < tableLoader.schemaVersion()) {
+        reviseOutputProjection();
+      } else {
+
+        // Fill in the null and metadata columns.
+
+        populateNonDataColumns();
+      }
+      rootTuple.setRowCount(tableContainer.getRecordCount());
+    }
+
+    private void populateNonDataColumns() {
+      int rowCount = tableContainer.getRecordCount();
+      metadataManager.load(rowCount);
+      rootTuple.loadNulls(rowCount);
+    }
+
+    /**
+     * Create the list of null columns by comparing the SELECT list against the
+     * columns available in the batch schema. Create null columns for those that
+     * are missing. This is done for the first batch, and any time the schema
+     * changes. (For early-schema, the projection occurs once as the schema is set
+     * up-front and does not change.) For a SELECT *, the null column check
+     * only need be done if null columns were created when mapping from a prior
+     * schema.
+     */
+
+    private void reviseOutputProjection() {
+
+      // Do the table-schema level projection; the final matching
+      // of projected columns to available columns.
+
+      TupleMetadata tableSchema = tableLoader.harvestSchema();
+      if (schemaSmoother != null) {
+        doSmoothedProjection(tableSchema);
+      } else if (scanProj.hasWildcard()) {
+        doWildcardProjection(tableSchema);
+      } else {
+        doExplicitProjection(tableSchema);
+      }
+
+      // Combine metadata, nulls and batch data to form the final
+      // output container. Columns are created by the metadata and null
+      // loaders only in response to a batch, so create the first batch.
+
+      rootTuple.buildNulls(vectorCache);
+      metadataManager.define();
+      populateNonDataColumns();
+      rootTuple.project(tableContainer, outputContainer);
+      prevTableSchemaVersion = tableLoader.schemaVersion();
+    }
+
+    private void doSmoothedProjection(TupleMetadata tableSchema) {
+      rootTuple = new ResolvedRow(
+          new NullColumnBuilder(nullType, allowRequiredNullColumns));
+      schemaSmoother.resolve(tableSchema, rootTuple);
+    }
+
+    /**
+     * Query contains a wildcard. The schema-level projection includes
+     * all columns provided by the reader.
+     */
+
+    private void doWildcardProjection(TupleMetadata tableSchema) {
+      rootTuple = new ResolvedRow(null);
+      new WildcardSchemaProjection(scanProj,
+          tableSchema, rootTuple, schemaResolvers);
+    }
+
+    /**
+     * Explicit projection: include only those columns actually
+     * requested by the query, which may mean filling in null
+     * columns for projected columns that don't actually exist
+     * in the table.
+     *
+     * @param tableSchema newly arrived schema
+     */
+
+    private void doExplicitProjection(TupleMetadata tableSchema) {
+      rootTuple = new ResolvedRow(
+          new NullColumnBuilder(nullType, allowRequiredNullColumns));
+      new ExplicitSchemaProjection(scanProj,
+              tableSchema, rootTuple,
+              schemaResolvers);
+    }
+
+    @Override
+    public ValueVector vector(int index) {
+      return tableContainer.getValueVector(index).getValueVector();
+    }
+
+    public void close() {
+      RuntimeException ex = null;
+      try {
+        if (tableLoader != null) {
+          tableLoader.close();
+          tableLoader = null;
+        }
+      }
+      catch (RuntimeException e) {
+        ex = e;
+      }
+      try {
+        if (rootTuple != null) {
+          rootTuple.close();
+          rootTuple = null;
+        }
+      }
+      catch (RuntimeException e) {
+        ex = ex == null ? e : ex;
+      }
+      metadataManager.endFile();
+      if (ex != null) {
+        throw ex;
+      }
+    }
+  }
+
+  // Configuration
+
+  /**
+   * Custom null type, if provided by the operator. If
+   * not set, the null type is the Drill default.
+   */
+
+  private MajorType nullType;
+
+  /**
+   * Creates the metadata (file and directory) columns, if needed.
+   */
+
+  private MetadataManager metadataManager;
+  private final BufferAllocator allocator;
+  private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
+  private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
+  private boolean v1_12MetadataLocation;
+  private final List<ScanProjectionParser> parsers = new ArrayList<>();
+
+  /**
+   * List of resolvers used to resolve projection columns for each
+   * new schema. Allows operators to introduce custom functionality
+   * as a plug-in rather than by copying code or subclassing this
+   * mechanism.
+   */
+
+  List<SchemaProjectionResolver> schemaResolvers = new ArrayList<>();
+
+  private boolean useSchemaSmoothing;
+  private boolean allowRequiredNullColumns;
+
+  // Internal state
+
+  /**
+   * Cache used to preserve the same vectors from one output batch to the
+   * next to keep the Project operator happy (which depends on exactly the
+   * same vectors.
+   * <p>
+   * If the Project operator ever changes so that it depends on looking up
+   * vectors rather than vector instances, this cache can be deprecated.
+   */
+
+  private ResultVectorCacheImpl vectorCache;
+  private ScanLevelProjection scanProj;
+  private ReaderSchemaOrchestrator currentReader;
+  private SchemaSmoother schemaSmoother;
+
+  // Output
+
+  private VectorContainer outputContainer;
+
+  public ScanSchemaOrchestrator(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Specify an optional metadata manager. Metadata is a set of constant
+   * columns with per-reader values. For file-based sources, this is usually
+   * the implicit and partition columns; but it could be other items for other
+   * data sources.
+   *
+   * @param metadataMgr the application-specific metadata manager to use
+   * for this scan
+   */
+
+  public void withMetadata(MetadataManager metadataMgr) {
+    metadataManager = metadataMgr;
+    schemaResolvers.add(metadataManager.resolver());
+  }
+
+  /**
+   * Specify a custom batch record count. This is the maximum number of records
+   * per batch for this scan. Readers can adjust this, but the adjustment is capped
+   * at the value specified here
+   *
+   * @param scanBatchSize maximum records per batch
+   */
+
+  public void setBatchRecordLimit(int batchRecordLimit) {
+    scanBatchRecordLimit = Math.max(1,
+        Math.min(batchRecordLimit, ValueVector.MAX_ROW_COUNT));
+  }
+
+  public void setBatchByteLimit(int byteLimit) {
+    scanBatchByteLimit = Math.max(MIN_BATCH_BYTE_SIZE,
+        Math.min(byteLimit, MAX_BATCH_BYTE_SIZE));
+  }
+
+  /**
+   * Specify the type to use for null columns in place of the standard
+   * nullable int. This type is used for all missing columns. (Readers
+   * that need per-column control need a different mechanism.)
+   *
+   * @param nullType
+   */
+
+  public void setNullType(MajorType nullType) {
+    this.nullType = nullType;
+  }
+
+  /**
+   * Enable schema smoothing: introduces an addition level of schema
+   * resolution each time a schema changes from a reader.
+   *
+   * @param flag true to enable schema smoothing, false to disable
+   */
+
+  public void enableSchemaSmoothing(boolean flag) {
+    useSchemaSmoothing = flag;
+  }
+
+  public void allowRequiredNullColumns(boolean flag) {
+    allowRequiredNullColumns = flag;
+  }
+
+  public void useDrill1_12MetadataPosition(boolean flag) {
+    v1_12MetadataLocation = flag;
+  }
+
+  public void build(List<SchemaPath> projection) {
+    vectorCache = new ResultVectorCacheImpl(allocator, useSchemaSmoothing);
+
+    // If no metadata manager was provided, create a mock
+    // version just to keep code simple.
+
+    if (metadataManager == null) {
+      metadataManager = new NoOpMetadataManager();
+    }
+    metadataManager.bind(vectorCache);
+
+    // Bind metadata manager parser to scan projector.
+    // A "real" (non-mock) metadata manager will provide
+    // a projection parser. Use this to tell us that this
+    // setup supports metadata.
+
+    ScanProjectionParser parser = metadataManager.projectionParser();
+    if (parser != null) {
+
+      // For compatibility with Drill 1.12, insert the file metadata
+      // parser before others so that, in a wildcard query, metadata
+      // columns appear before others (such as the `columns` column.)
+      // This is temporary and should be removed once the test framework
+      // is restored to Drill 1.11 functionality.
+
+      if (v1_12MetadataLocation) {
+        parsers.add(0, parser);
+      } else {
+        parsers.add(parser);
+      }
+    }
+
+    // Parse the projection list.
+
+    scanProj = new ScanLevelProjection(projection, parsers, v1_12MetadataLocation);
+
+    if (scanProj.hasWildcard() && useSchemaSmoothing) {
+      schemaSmoother = new SchemaSmoother(scanProj, schemaResolvers);
+    }
+
+    // Build the output container.
+
+    outputContainer = new VectorContainer(allocator);
+  }
+
+  public void addParser(ScanProjectionParser parser) {
+    parsers.add(parser);
+  }
+
+  public void addResolver(SchemaProjectionResolver resolver) {
+    schemaResolvers.add(resolver);
+  }
+
+  public ReaderSchemaOrchestrator startReader() {
+    closeReader();
+    currentReader = new ReaderSchemaOrchestrator();
+    return currentReader;
+  }
+
+  public boolean isProjectNone() {
+    return scanProj.projectNone();
+  }
+
+  public boolean hasSchema() {
+    return currentReader != null && currentReader.hasSchema();
+  }
+
+  public VectorContainer output() {
+    return outputContainer;
+  }
+
+  public void closeReader() {
+    if (currentReader != null) {
+      currentReader.close();
+      currentReader = null;
+    }
+  }
+
+  public void close() {
+    closeReader();
+    if (outputContainer != null) {
+      outputContainer.clear();
+      outputContainer = null;
+    }
+    vectorCache.close();
+    metadataManager.close();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
new file mode 100644
index 00000000000..c7bae278eec
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Computes the full output schema given a table (or batch)
+ * schema. Takes the original, unresolved output list from the projection
+ * definition, merges it with the file, directory and table schema information,
+ * and produces a partially or fully resolved output list.
+ * <p>
+ * A "resolved" projection list is a list of concrete columns: table
+ * columns, nulls, file metadata or partition metadata. An unresolved list
+ * has either table column names, but no match, or a wildcard column.
+ * <p>
+ * The idea is that the projection list moves through stages of resolution
+ * depending on which information is available. An "early schema" table
+ * provides schema information up front, and so allows fully resolving
+ * the projection list on table open. A "late schema" table allows only a
+ * partially resolved projection list, with the remainder of resolution
+ * happening on the first (or perhaps every) batch.
+ * <p>
+ * Data source (table) schema can be of two forms:
+ * <ul>
+ * <li>Early schema: the schema is known before reading data. A JDBC data
+ * source is an example, as is a CSV reader for a file with headers.</li>
+ * <li>Late schema: the schema is not known until data is read, and is
+ * discovered on the fly. Example: JSON, which declares values as maps
+ * without an up-front schema.</li>
+ * </ul>
+ * These two forms give rise to distinct ways of planning the projection.
+ * <p>
+ * The final result of the projection is a set of "output" columns: a set
+ * of columns that, taken together, defines the row (bundle of vectors) that
+ * the scan operator produces. Columns are ordered: the order specified here
+ * must match the order that columns appear in the result set loader and the
+ * vector container so that code can access columns by index as well as name.
+ *
+ * @see {@link ImplicitColumnExplorer}, the class from which this class
+ * evolved
+ */
+
+public class SchemaLevelProjection {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaLevelProjection.class);
+
+  /**
+   * Schema-level projection is customizable. Implement this interface, and
+   * add an instance to the scan orchestrator, to perform custom mappings
+   * from unresolved columns (perhaps of an extension-specified type) to
+   * final projected columns. The metadata manager, for example, implements
+   * this interface to map metadata columns.
+   */
+
+  public interface SchemaProjectionResolver {
+    void startResolution();
+    boolean resolveColumn(ColumnProjection col, ResolvedTuple tuple,
+        TupleMetadata tableSchema);
+  }
+
+  protected final List<SchemaProjectionResolver> resolvers;
+
+  protected SchemaLevelProjection(
+        List<SchemaProjectionResolver> resolvers) {
+    if (resolvers == null) {
+      resolvers = new ArrayList<>();
+    }
+    this.resolvers = resolvers;
+    for (SchemaProjectionResolver resolver : resolvers) {
+      resolver.startResolution();
+    }
+  }
+
+  protected void resolveSpecial(ResolvedTuple rootOutputTuple, ColumnProjection col,
+      TupleMetadata tableSchema) {
+    for (SchemaProjectionResolver resolver : resolvers) {
+      if (resolver.resolveColumn(col, rootOutputTuple, tableSchema)) {
+        return;
+      }
+    }
+    throw new IllegalStateException("No resolver for column: " + col.nodeType());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java
new file mode 100644
index 00000000000..b15fe9a2a13
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Implements a "schema smoothing" algorithm.
+ * Schema persistence for the wildcard selection (i.e. SELECT *)
+ * <p>
+ * Constraints:
+ * <ul>
+ * <li>Adding columns causes a hard schema change.</li>
+ * <li>Removing columns is allowed, uses type from previous
+ * schema, as long as previous mode was nullable or repeated.</li>
+ * <li>Changing type or mode causes a hard schema change.</li>
+ * <li>Changing column order is fine; use order from previous
+ * schema.</li>
+ * </ul>
+ * This can all be boiled down to a simpler rule:
+ * <ul>
+ * <li>Schema persistence is possible if the output schema
+ * from a prior schema can be reused for the current schema.</li>
+ * <li>Else, a hard schema change occurs and a new output
+ * schema is derived from the new table schema.</li>
+ * </ul>
+ * The core idea here is to "unresolve" a fully-resolved table schema
+ * to produce a new projection list that is the equivalent of using that
+ * prior projection list in the SELECT. Then, keep that projection list only
+ * if it is compatible with the next table schema, else throw it away and
+ * start over from the actual scan projection list.
+ * <p>
+ * Algorithm:
+ * <ul>
+ * <li>If partitions are included in the wildcard, and the new
+ * file needs more than the current one, create a new schema.</li>
+ * <li>Else, treat partitions as select, fill in missing with
+ * nulls.</li>
+ * <li>From an output schema, construct a new select list
+ * specification as though the columns in the current schema were
+ * explicitly specified in the SELECT clause.</li>
+ * <li>For each new schema column, verify that the column exists
+ * in the generated SELECT clause and is of the same type.
+ * If not, create a new schema.</li>
+ * <li>Use the generated schema to plan a new projection from
+ * the new schema to the prior schema.</li>
+ * </ul>
+ */
+
+public class SchemaSmoother {
+
+  /**
+   * Exception thrown if the prior schema is not compatible with the
+   * new table schema.
+   */
+
+  @SuppressWarnings("serial")
+  public static class IncompatibleSchemaException extends Exception { }
+
+  private final ScanLevelProjection scanProj;
+  private final List<SchemaProjectionResolver> resolvers;
+  private ResolvedTuple priorSchema;
+  private int schemaVersion = 0;
+
+  public SchemaSmoother(ScanLevelProjection scanProj,
+      List<SchemaProjectionResolver> resolvers) {
+    this.scanProj = scanProj;
+    this.resolvers = resolvers;
+  }
+
+  public SchemaLevelProjection resolve(
+      TupleMetadata tableSchema,
+      ResolvedTuple outputTuple) {
+
+    // If a prior schema exists, try resolving the new table using the
+    // prior schema. If this works, use the projection. Else, start
+    // over with the scan projection.
+
+    if (priorSchema != null) {
+      try {
+        SmoothingProjection smoother = new SmoothingProjection(scanProj, tableSchema,
+            priorSchema, outputTuple, resolvers);
+        priorSchema = outputTuple;
+        return smoother;
+      } catch (IncompatibleSchemaException e) {
+        outputTuple.reset();
+        // Fall through
+      }
+    }
+
+    // Can't use the prior schema. Start over with the original scan projection.
+    // Type smoothing is provided by the vector cache; but a hard schema change
+    // will occur because either a type has changed or a new column has appeared.
+    // (Or, this is the first schema.)
+
+    SchemaLevelProjection schemaProj = new WildcardSchemaProjection(scanProj,
+        tableSchema, outputTuple, resolvers);
+    priorSchema = outputTuple;
+    schemaVersion++;
+    return schemaProj;
+  }
+
+  public int schemaVersion() { return schemaVersion; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java
new file mode 100644
index 00000000000..da199b8c5fc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Resolve a table schema against the prior schema. This works only if the
+ * types match and if all columns in the table schema already appear in the
+ * prior schema.
+ * <p>
+ * Consider this an experimental mechanism. The hope was that, with clever
+ * techniques, we could "smooth over" some of the issues that cause schema
+ * change events in Drill. As it turned out, however, creating this mechanism
+ * revealed that it is not possible, even in theory, to handle most schema
+ * changes because of the time dimension:
+ * <ul>
+ * <li>An even in a later batch may provide information that would have
+ * caused us to make a different decision in an earlier batch. For example,
+ * we are asked for column `foo`, did not see such a column in the first
+ * batch, block or file, guessed some type, and later saw that the column
+ * was of a different type. We can't "time travel" to tell our earlier
+ * selves, nor, when we make the initial type decision, can we jump to
+ * the future to see what type we'll discover.</li>
+ * <li>Readers in this fragment may see column `foo` but readers in
+ * another fragment read files/blocks that don't have that column. The
+ * two readers cannot communicate to agree on a type.</li>
+ * </ul>
+ * <p>
+ * What this mechanism can do is make decisions based on history: when a
+ * column appears, we can adjust its type a bit to try to avoid an
+ * unnecessary change. For example, if a prior file in this scan saw
+ * `foo` as nullable Varchar, but the present file has the column as
+ * requied Varchar, we can use the more general nullable form. But,
+ * again, the "can't predict the future" bites us: we can handle a
+ * nullable-to-required column change, but not visa-versa.
+ * <p>
+ * What this mechanism will tell the careful reader is that the only
+ * general solution to the schema-change problem is to now the full
+ * schema up front: for the planner to be told the schema and to
+ * communicate that schema to all readers so that all readers agree
+ * on the final schema.
+ * <p>
+ * When that is done, the techniques shown here can be used to adjust
+ * any per-file variation of schema to match the up-front schema.
+ */
+
+public class SmoothingProjection extends SchemaLevelProjection {
+
+  protected final List<MaterializedField> rewrittenFields = new ArrayList<>();
+
+  public SmoothingProjection(ScanLevelProjection scanProj,
+      TupleMetadata tableSchema,
+      ResolvedTuple priorSchema,
+      ResolvedTuple outputTuple,
+      List<SchemaProjectionResolver> resolvers) throws IncompatibleSchemaException {
+
+    super(resolvers);
+
+    for (ResolvedColumn priorCol : priorSchema.columns()) {
+      switch (priorCol.nodeType()) {
+        case ResolvedTableColumn.ID:
+        case ResolvedNullColumn.ID:
+          // This is a regular column known to this base framework.
+          resolveColumn(outputTuple, priorCol, tableSchema);
+          break;
+        default:
+          // The column is one known to an add-on mechanism.
+          resolveSpecial(outputTuple, priorCol, tableSchema);
+      }
+    }
+
+    // Check if all table columns were matched. Since names are unique,
+    // each column can be matched at most once. If the number of matches is
+    // less than the total number of columns, then some columns were not
+    // matched and we must start over.
+
+    if (rewrittenFields.size() < tableSchema.size()) {
+      throw new IncompatibleSchemaException();
+    }
+  }
+
+  /**
+   * Resolve a prior column against the current table schema. Resolves to
+   * a table column, a null column, or throws an exception if the
+   * schemas are incompatible
+   *
+   * @param priorCol a column from the prior schema
+   * @throws IncompatibleSchemaException if the prior column exists in
+   * the current table schema, but with an incompatible type
+   */
+
+  private void resolveColumn(ResolvedTuple outputTuple,
+      ResolvedColumn priorCol, TupleMetadata tableSchema) throws IncompatibleSchemaException {
+    int tableColIndex = tableSchema.index(priorCol.name());
+    if (tableColIndex == -1) {
+      resolveNullColumn(outputTuple, priorCol);
+      return;
+    }
+    MaterializedField tableCol = tableSchema.column(tableColIndex);
+    MaterializedField priorField = priorCol.schema();
+    if (! tableCol.isPromotableTo(priorField, false)) {
+      throw new IncompatibleSchemaException();
+    }
+    outputTuple.add(
+        new ResolvedTableColumn(priorCol.name(), priorField, outputTuple, tableColIndex));
+    rewrittenFields.add(priorField);
+  }
+
+  /**
+   * A prior schema column does not exist in the present table column schema.
+   * Create a null column with the same type as the prior column, as long as
+   * the prior column was not required.
+   *
+   * @param priorCol the prior column to project to a null column
+   * @throws IncompatibleSchemaException if the prior column was required
+   * and thus cannot be null-filled
+   */
+
+  private void resolveNullColumn(ResolvedTuple outputTuple,
+      ResolvedColumn priorCol) throws IncompatibleSchemaException {
+    if (priorCol.schema().getType().getMode() == DataMode.REQUIRED) {
+      throw new IncompatibleSchemaException();
+    }
+    outputTuple.add(outputTuple.nullBuilder().add(priorCol.name(),
+        priorCol.schema().getType()));
+  }
+
+  public List<MaterializedField> revisedTableSchema() { return rewrittenFields; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java
new file mode 100644
index 00000000000..bfd3614b598
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * Base class for columns that take values based on the
+ * reader, not individual rows. E.g. null columns (values
+ * are all null) or file metadata (AKA "implicit") columns
+ * that take values based on the file.
+ */
+
+public abstract class StaticColumnLoader {
+  protected final ResultSetLoader loader;
+  protected final ResultVectorCache vectorCache;
+
+  public StaticColumnLoader(ResultVectorCache vectorCache) {
+
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+          .setVectorCache(vectorCache)
+          .build();
+    loader = new ResultSetLoaderImpl(vectorCache.allocator(), options);
+    this.vectorCache = vectorCache;
+  }
+
+  /**
+   * Populate static vectors with the defined static values.
+   *
+   * @param rowCount number of rows to generate. Must match the
+   * row count in the batch returned by the reader
+   */
+
+  public abstract VectorContainer load(int rowCount);
+
+  public void close() {
+    loader.close();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java
new file mode 100644
index 00000000000..2dfa9c44d0a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+
+/**
+ * Represents a projected column that has not yet been bound to a
+ * table column, special column or a null column. Once bound, this
+ * column projection is replaced with the detailed binding.
+ */
+public class UnresolvedColumn implements ColumnProjection {
+
+  public static final int WILDCARD = 1;
+  public static final int UNRESOLVED = 2;
+
+  /**
+   * The original physical plan column to which this output column
+   * maps. In some cases, multiple output columns map map the to the
+   * same "input" (to the projection process) column.
+   */
+
+  protected final RequestedColumn inCol;
+  private final int id;
+
+  public UnresolvedColumn(RequestedColumn inCol, int id) {
+    this.inCol = inCol;
+    this.id = id;
+  }
+
+  @Override
+  public int nodeType() { return id; }
+
+  @Override
+  public String name() { return inCol.name(); }
+
+  public RequestedColumn element() { return inCol; }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf
+      .append("[")
+      .append(getClass().getSimpleName())
+      .append(" type=")
+      .append(id == WILDCARD ? "wildcard" : "column");
+    if (inCol != null) {
+      buf
+        .append(", incol=")
+        .append(inCol.toString());
+    }
+    return buf.append("]").toString();
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java
new file mode 100644
index 00000000000..625ee73de2c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Generic mechanism for retrieving vectors from a source tuple when
+ * projecting columns to the output tuple. Works around the fact that
+ * vector containers and maps are both tuples, but have very different
+ * interfaces. Also allows other classes to act as "proxies" for a
+ * source tuple.
+ */
+
+public interface VectorSource {
+  ValueVector vector(int index);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java
new file mode 100644
index 00000000000..d1cfbe171fe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Perform a wildcard projection. In this case, the query wants all
+ * columns in the source table, so the table drives the final projection.
+ * Since we include only those columns in the table, there is no need
+ * to create null columns. Example: SELECT *
+ */
+
+public class WildcardSchemaProjection extends SchemaLevelProjection {
+
+  public WildcardSchemaProjection(ScanLevelProjection scanProj,
+      TupleMetadata tableSchema,
+      ResolvedTuple rootTuple,
+      List<SchemaProjectionResolver> resolvers) {
+    super(resolvers);
+    for (ColumnProjection col : scanProj.columns()) {
+      if (col.nodeType() == UnresolvedColumn.WILDCARD) {
+        projectAllColumns(rootTuple, tableSchema);
+      } else {
+        resolveSpecial(rootTuple, col, tableSchema);
+      }
+    }
+  }
+
+  /**
+   * Project all columns from table schema to the output, in table
+   * schema order. Since we accept any map columns as-is, no need
+   * to do recursive projection.
+   *
+   * @param tableSchema
+   */
+
+  private void projectAllColumns(ResolvedTuple rootTuple, TupleMetadata tableSchema) {
+    for (int i = 0; i < tableSchema.size(); i++) {
+      MaterializedField colSchema = tableSchema.column(i);
+      rootTuple.add(
+          new ResolvedTableColumn(colSchema.getName(),
+              colSchema, rootTuple, i));
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
new file mode 100644
index 00000000000..224c69ad14d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides run-time semantic analysis of the projection list for the
+ * scan operator. The project list can include table columns and a
+ * variety of special columns. Requested columns can exist in the table,
+ * or may be "missing" with null values applied. The code here prepares
+ * a run-time projection plan based on the actual table schema.
+ * <p>
+ * The core concept is one of successive refinement of the project
+ * list through a set of rewrites:
+ * <ul>
+ * <li>Scan-level rewrite: convert {@link SchemaPath} entries into
+ * internal column nodes, tagging the nodes with the column type:
+ * wildcard, unresolved table column, or special columns (such as
+ * file metadata.) The scan-level rewrite is done once per scan
+ * operator.</li>
+ * <li>Reader-level rewrite: convert the internal column nodes into
+ * other internal nodes, leaving table column nodes unresolved. The
+ * typical use is to fill in metadata columns with information about a
+ * specific file.</li>
+ * <li>Schema-level rewrite: given the actual schema of a record batch,
+ * rewrite the reader-level projection to describe the final projection
+ * from incoming data to output container. This step fills in missing
+ * columns, expands wildcards, etc.</li>
+ * </ul>
+ * The following outlines the steps from scan plan to per-file data
+ * loading to producing the output batch. The center path is the
+ * projection metadata which turns into an actual output batch.
+ * <pre>
+ *                   Scan Plan
+ *                       |
+ *                       v
+ *               +--------------+
+ *               | Project List |
+ *               |    Parser    |
+ *               +--------------+
+ *                       |
+ *                       v
+ *                +------------+
+ *                | Scan Level |
+ *                | Projection | -----------+
+ *                +------------+            |
+ *                       |                  |
+ *                       v                  v
+ *  +------+      +------------+     +------------+      +-----------+
+ *  | File | ---> | File Level |     | Result Set | ---> | Data File |
+ *  | Data |      | Projection |     |   Loader   | <--- |  Reader   |
+ *  +------+      +------------+     +------------+      +-----------+
+ *                       |                  |
+ *                       v                  |
+ *               +--------------+   Table   |
+ *               | Schema Level |   Schema  |
+ *               |  Projection  | <---------+
+ *               +--------------+           |
+ *                       |                  |
+ *                       v                  |
+ *                  +--------+   Loaded     |
+ *                  | Output |   Vectors    |
+ *                  | Mapper | <------------+
+ *                  +--------+
+ *                       |
+ *                       v
+ *                 Output Batch
+ * </pre>
+ * <p>
+ * The output mapper includes mechanisms to populate implicit columns, create
+ * null columns, and to merge implicit, null and data columns, omitting
+ * unprojected data columns.
+ * <p>
+ * In all cases, projection must handle maps, which are a recursive structure
+ * much like a row. That is, Drill consists of nested tuples (the row and maps),
+ * each of which contains columns which can be maps. Thus, there is a set of
+ * alternating layers of tuples, columns, tuples, and so on until we get to leaf
+ * (non-map) columns. As a result, most of the above structures are in the form
+ * of tuple trees, requiring recursive algorithms to apply rules down through the
+ * nested layers of tuples.
+ * <p>
+ * The above mechanism is done at runtime, in each scan fragment. Since Drill is
+ * schema-on-read, and has no plan-time schema concept, run-time projection is
+ * required. On the other hand, if Drill were ever to support the "classic"
+ * plan-time schema resolution, then much of this work could be done at plan
+ * time rather than (redundantly) at runtime. The main change would be to do
+ * the work abstractly, working with column and row descriptions, rather than
+ * concretely with vectors as is done here. Then, that abstract description
+ * would feed directly into these mechanisms with the "final answer" about
+ * projection, batch layout, and so on. The parts of this mechanism that
+ * create and populate vectors would remain.
+ */
+
+package org.apache.drill.exec.physical.impl.scan.project;
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
index 0dde5979bb2..3488e7bb757 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
@@ -23,7 +23,7 @@
 
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
-import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.FixedWidthVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.IsSetVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.UnionState.UnionVectorState;
 import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
@@ -111,14 +111,14 @@
 
     private final ColumnMetadata schema;
     private final ListVector vector;
-    private final FixedWidthVectorState bitsVectorState;
-    private final OffsetVectorState offsetVectorState;
+    private final VectorState bitsVectorState;
+    private final VectorState offsetVectorState;
     private VectorState memberVectorState;
 
     public ListVectorState(UnionWriterImpl writer, ListVector vector) {
       this.schema = writer.schema();
       this.vector = vector;
-      bitsVectorState = new FixedWidthVectorState(writer, vector.getBitsVector());
+      bitsVectorState = new IsSetVectorState(writer, vector.getBitsVector());
       offsetVectorState = new OffsetVectorState(writer, vector.getOffsetVector(), writer.elementPosition());
       memberVectorState = new NullVectorState();
     }
@@ -126,7 +126,7 @@ public ListVectorState(UnionWriterImpl writer, ListVector vector) {
     public ListVectorState(ListWriterImpl writer, WriterPosition elementWriter, ListVector vector) {
       this.schema = writer.schema();
       this.vector = vector;
-      bitsVectorState = new FixedWidthVectorState(writer, vector.getBitsVector());
+      bitsVectorState = new IsSetVectorState(writer, vector.getBitsVector());
       offsetVectorState = new OffsetVectorState(writer, vector.getOffsetVector(), elementWriter);
       memberVectorState = new NullVectorState();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
index f9c2bff2eba..b01292074c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
-import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.FixedWidthVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.IsSetVectorState;
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.SimpleVectorState;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.NullableVector;
@@ -31,15 +31,15 @@
   private final ColumnMetadata schema;
   private final NullableScalarWriter writer;
   private final NullableVector vector;
-  private final SimpleVectorState bitsState;
-  private final SimpleVectorState valuesState;
+  private final VectorState bitsState;
+  private final VectorState valuesState;
 
   public NullableVectorState(AbstractObjectWriter writer, NullableVector vector) {
     this.schema = writer.schema();
     this.vector = vector;
 
     this.writer = (NullableScalarWriter) writer.scalar();
-    bitsState = new FixedWidthVectorState(this.writer.bitsWriter(), vector.getBitsVector());
+    bitsState = new IsSetVectorState(this.writer.bitsWriter(), vector.getBitsVector());
     valuesState = SimpleVectorState.vectorState(this.writer.schema(),
         this.writer.baseWriter(), vector.getValuesVector());
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
index cfb7130cece..e57373caa7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
@@ -85,6 +85,23 @@ public int allocateVector(ValueVector vector, int cardinality) {
     }
   }
 
+  public static class IsSetVectorState extends FixedWidthVectorState {
+
+    public IsSetVectorState(WriterPosition writer, ValueVector mainVector) {
+      super(writer, mainVector);
+    }
+
+    @Override
+    public int allocateVector(ValueVector vector, int cardinality) {
+      int size = super.allocateVector(vector, cardinality);
+
+      // IsSet ("bit") vectors rely on values being initialized to zero (unset.)
+
+      ((FixedWidthVector) vector).zeroVector();
+      return size;
+    }
+  }
+
   /**
    * State for a scalar value vector. The vector might be for a simple (non-array)
    * vector, or might be the payload part of a scalar array (repeated scalar)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
index 45a704c070c..cd782c76655 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
@@ -44,13 +44,14 @@
  * <p>
  * Examples:<br>
  * <code>m</code><br>
- * If m turns out to be a map, project all members of m.<br>
+ * If <code>m</code> turns out to be a map, project all members of
+ * <code>m</code>.<br>
  * <code>m.a</code><br>
- * Column m must be a map. Project only column a.<br>
+ * Column <code>m</code> must be a map. Project only column <code>a</code>.<br>
  * <code>m, m.a</code><br>
  * Tricky case. We interpret this as projecting only the "a" element of map m.
  * <p>
- * The projection set is build from a list of columns, represented as
+ * The projection set is built from a list of columns, represented as
  * {@link SchemaPath} objects, provided by the physical plan. The structure of
  * <tt>SchemaPath</tt> is a bit awkward:
  * <p>
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
new file mode 100644
index 00000000000..c69357a719c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+public class ScanTestUtils {
+
+  /**
+   * Type-safe way to define a list of parsers.
+   * @param parsers as a varArgs list convenient for testing
+   * @return parsers as a Java List for input to the scan
+   * projection framework
+   */
+
+  public static List<ScanProjectionParser> parsers(ScanProjectionParser... parsers) {
+    return ImmutableList.copyOf(parsers);
+  }
+
+  public static List<SchemaProjectionResolver> resolvers(SchemaProjectionResolver... resolvers) {
+    return ImmutableList.copyOf(resolvers);
+  }
+
+  public static TupleMetadata schema(ResolvedTuple output) {
+    final TupleMetadata schema = new TupleSchema();
+    for (final ResolvedColumn col : output.columns()) {
+      MaterializedField field = col.schema();
+      if (field.getType() == null) {
+
+        // Convert from internal format of null columns (unset type)
+        // to a usable form (explicit minor type of NULL.)
+
+        field = MaterializedField.create(field.getName(),
+            Types.optional(MinorType.NULL));
+      }
+      schema.add(field);
+    }
+    return schema;
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
new file mode 100644
index 00000000000..c524577d976
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec;
+import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Drill allows file metadata columns (also called "implicit" columns.)
+ * These are columns that contain a long repeated sequences of the same
+ * values. The ConstantColumnLoader builds and populates these columns.
+ */
+
+public class TestConstantColumnLoader extends SubOperatorTest {
+
+  private static class DummyColumn implements ConstantColumnSpec {
+
+    private final String name;
+    private final MaterializedField schema;
+    private final String value;
+
+    public DummyColumn(String name, MajorType type, String value) {
+      this.name = name;
+      this.schema = MaterializedField.create(name, type);
+      this.value = value;
+    }
+
+    @Override
+    public String name() { return name; }
+
+    @Override
+    public MaterializedField schema() { return schema; }
+
+    @Override
+    public String value() { return value; }
+  }
+
+  /**
+   * Test the static column loader using one column of each type.
+   * The null column is of type int, but the associated value is of
+   * type string. This is a bit odd, but works out because we detect that
+   * the string value is null and call setNull on the writer, and avoid
+   * using the actual data.
+   */
+
+  @Test
+  public void testConstantColumnLoader() {
+
+    final MajorType aType = MajorType.newBuilder()
+        .setMinorType(MinorType.VARCHAR)
+        .setMode(DataMode.REQUIRED)
+        .build();
+    final MajorType bType = MajorType.newBuilder()
+        .setMinorType(MinorType.VARCHAR)
+        .setMode(DataMode.OPTIONAL)
+        .build();
+
+    final List<ConstantColumnSpec> defns = new ArrayList<>();
+    defns.add(
+        new DummyColumn("a", aType, "a-value" ));
+    defns.add(
+        new DummyColumn("b", bType, "b-value" ));
+
+    final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+    final ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns);
+
+    // Create a batch
+
+    staticLoader.load(2);
+
+    // Verify
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", aType)
+        .add("b", bType)
+        .build();
+    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("a-value", "b-value")
+        .addRow("a-value", "b-value")
+        .build();
+
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
+    staticLoader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java
new file mode 100644
index 00000000000..4bf5a6ad98a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertSame;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
+import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Test;
+
+/**
+ * Test the mechanism that handles all-null columns during projection.
+ * An all-null column is one projected in the query, but which does
+ * not actually exist in the underlying data source (or input
+ * operator.)
+ * <p>
+ * In anticipation of having type information, this mechanism
+ * can create the classic nullable Int null column, or one of
+ * any other type and mode.
+ */
+
+public class TestNullColumnLoader extends SubOperatorTest {
+
+  private ResolvedNullColumn makeNullCol(String name, MajorType nullType) {
+
+    // For this test, we don't need the projection, so just
+    // set it to null.
+
+    return new ResolvedNullColumn(name, nullType, null, 0);
+  }
+
+  private ResolvedNullColumn makeNullCol(String name) {
+    return makeNullCol(name, null);
+  }
+
+  /**
+   * Test the simplest case: default null type, nothing in the vector
+   * cache. Specify no column type, the special NULL type, or a
+   * predefined type. Output types should be set accordingly.
+   */
+
+  @Test
+  public void testBasics() {
+
+    final List<ResolvedNullColumn> defns = new ArrayList<>();
+    defns.add(makeNullCol("unspecified", null));
+    defns.add(makeNullCol("nullType", Types.optional(MinorType.NULL)));
+    defns.add(makeNullCol("specifiedOpt", Types.optional(MinorType.VARCHAR)));
+    defns.add(makeNullCol("specifiedReq", Types.required(MinorType.VARCHAR)));
+    defns.add(makeNullCol("specifiedArray", Types.repeated(MinorType.VARCHAR)));
+
+    final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+    final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, null, false);
+
+    // Create a batch
+
+    final VectorContainer output = staticLoader.load(2);
+
+    // Verify values and types
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
+        .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
+        .addNullable("specifiedOpt", MinorType.VARCHAR)
+        .addNullable("specifiedReq", MinorType.VARCHAR)
+        .addArray("specifiedArray", MinorType.VARCHAR)
+        .build();
+    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(null, null, null, null, new String[] {})
+        .addRow(null, null, null, null, new String[] {})
+        .build();
+
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(output));
+    staticLoader.close();
+  }
+
+  /**
+   * Test the ability to use a type other than nullable INT for null
+   * columns. This occurs, for example, in the CSV reader where no
+   * column is ever INT (nullable or otherwise) and we want our null
+   * columns to be (non-nullable) VARCHAR.
+   */
+
+  @Test
+  public void testCustomNullType() {
+
+    final List<ResolvedNullColumn> defns = new ArrayList<>();
+    defns.add(makeNullCol("unspecified", null));
+    defns.add(makeNullCol("nullType", MajorType.newBuilder()
+        .setMinorType(MinorType.NULL)
+        .setMode(DataMode.OPTIONAL)
+        .build()));
+
+    // Null required is an oxymoron, so is not tested.
+    // Null type array does not make sense, so is not tested.
+
+    final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+    final MajorType nullType = MajorType.newBuilder()
+        .setMinorType(MinorType.VARCHAR)
+        .setMode(DataMode.OPTIONAL)
+        .build();
+    final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
+
+    // Create a batch
+
+    final VectorContainer output = staticLoader.load(2);
+
+    // Verify values and types
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .add("unspecified", nullType)
+        .add("nullType", nullType)
+        .build();
+    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(null, null)
+        .addRow(null, null)
+        .build();
+
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(output));
+    staticLoader.close();
+  }
+
+  /**
+   * Drill requires "schema persistence": if a scan operator
+   * reads two files, F1 and F2, then the scan operator must
+   * provide the same vectors from both readers. Not just the
+   * same types, the same value vector instances (but, of course,
+   * populated with different data.)
+   * <p>
+   * Test the case in which the reader for F1 found columns
+   * (a, b, c) but, F2 found only (a, b), requiring that we
+   * fill in column c, filled with nulls, but of the same type that it
+   * was in file F1. We use a vector cache to pull off this trick.
+   * This test ensures that the null column mechanism looks in that
+   * vector cache when asked to create a nullable column.
+   */
+
+  @Test
+  public void testCachedTypesMapToNullable() {
+
+    final List<ResolvedNullColumn> defns = new ArrayList<>();
+    defns.add(makeNullCol("req"));
+    defns.add(makeNullCol("opt"));
+    defns.add(makeNullCol("rep"));
+    defns.add(makeNullCol("unk"));
+
+    // Populate the cache with a column of each mode.
+
+    final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+    cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
+    final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
+    final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
+
+    // Use nullable Varchar for unknown null columns.
+
+    final MajorType nullType = MajorType.newBuilder()
+        .setMinorType(MinorType.VARCHAR)
+        .setMode(DataMode.OPTIONAL)
+        .build();
+    final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
+
+    // Create a batch
+
+    final VectorContainer output = staticLoader.load(2);
+
+    // Verify vectors are reused
+
+    assertSame(opt, output.getValueVector(1).getValueVector());
+    assertSame(rep, output.getValueVector(2).getValueVector());
+
+    // Verify values and types
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("req", MinorType.FLOAT8)
+        .addNullable("opt", MinorType.FLOAT8)
+        .addArray("rep", MinorType.FLOAT8)
+        .addNullable("unk", MinorType.VARCHAR)
+        .build();
+    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(null, null, new int[] { }, null)
+        .addRow(null, null, new int[] { }, null)
+        .build();
+
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(output));
+    staticLoader.close();
+  }
+
+  /**
+   * Suppose, in the previous test, that one of the columns that
+   * goes missing is a required column. The null-column mechanism can
+   * create the "null" column as a required column, then fill it with
+   * empty values (zero or "") -- if the scan operator feels doing so would
+   * be helpful.
+   */
+
+  @Test
+  public void testCachedTypesAllowRequired() {
+
+    final List<ResolvedNullColumn> defns = new ArrayList<>();
+    defns.add(makeNullCol("req"));
+    defns.add(makeNullCol("opt"));
+    defns.add(makeNullCol("rep"));
+    defns.add(makeNullCol("unk"));
+
+    // Populate the cache with a column of each mode.
+
+    final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+    cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
+    final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
+    final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
+
+    // Use nullable Varchar for unknown null columns.
+
+    final MajorType nullType = MajorType.newBuilder()
+        .setMinorType(MinorType.VARCHAR)
+        .setMode(DataMode.OPTIONAL)
+        .build();
+    final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, true);
+
+    // Create a batch
+
+    final VectorContainer output = staticLoader.load(2);
+
+    // Verify vectors are reused
+
+    assertSame(opt, output.getValueVector(1).getValueVector());
+    assertSame(rep, output.getValueVector(2).getValueVector());
+
+    // Verify values and types
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .add("req", MinorType.FLOAT8)
+        .addNullable("opt", MinorType.FLOAT8)
+        .addArray("rep", MinorType.FLOAT8)
+        .addNullable("unk", MinorType.VARCHAR)
+        .build();
+    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(0.0, null, new int[] { }, null)
+        .addRow(0.0, null, new int[] { }, null)
+        .build();
+
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(output));
+    staticLoader.close();
+  }
+
+  /**
+   * Test the shim class that adapts between the null column loader
+   * and the projection mechanism. The projection mechanism uses this
+   * to pull in the null columns which the null column loader has
+   * created.
+   */
+
+  @Test
+  public void testNullColumnBuilder() {
+
+    final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+
+    builder.add("unspecified");
+    builder.add("nullType", Types.optional(MinorType.NULL));
+    builder.add("specifiedOpt", Types.optional(MinorType.VARCHAR));
+    builder.add("specifiedReq", Types.required(MinorType.VARCHAR));
+    builder.add("specifiedArray", Types.repeated(MinorType.VARCHAR));
+    builder.build(cache);
+
+    // Create a batch
+
+    builder.load(2);
+
+    // Verify values and types
+
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
+        .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
+        .addNullable("specifiedOpt", MinorType.VARCHAR)
+        .addNullable("specifiedReq", MinorType.VARCHAR)
+        .addArray("specifiedArray", MinorType.VARCHAR)
+        .build();
+    final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(null, null, null, null, new String[] {})
+        .addRow(null, null, null, null, new String[] {})
+        .build();
+
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(builder.output()));
+    builder.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java
new file mode 100644
index 00000000000..cb0365ca90e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.DrillBuf;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+
+
+/**
+ * Test the row batch merger by merging two batches. Tests both the
+ * "direct" and "exchange" cases. Direct means that the output container
+ * contains the source vector directly: they are the same vectors.
+ * Exchange means we have two vectors, but we swap the underlying
+ * Drillbufs to effectively shift data from source to destination
+ * vector.
+ */
+
+public class TestRowBatchMerger extends SubOperatorTest {
+
+  public static class RowSetSource implements VectorSource {
+
+    private SingleRowSet rowSet;
+
+    public RowSetSource(SingleRowSet rowSet) {
+      this.rowSet = rowSet;
+    }
+
+    public RowSet rowSet() { return rowSet; }
+
+    public void clear() {
+      rowSet.clear();
+    }
+
+    @Override
+    public ValueVector vector(int index) {
+      return rowSet.container().getValueVector(index).getValueVector();
+    }
+  }
+
+  public static final int SLAB_SIZE = 16 * 1024 * 1024;
+
+  @BeforeClass
+  public static void setup() {
+    // Party on 10 16MB blocks of memory to detect vector issues
+    DrillBuf bufs[] = new DrillBuf[10];
+    for (int i = 0; i < bufs.length; i++) {
+      bufs[i] = fixture.allocator().buffer(SLAB_SIZE);
+      for (int j = 0; j < SLAB_SIZE / 4; j++) {
+        bufs[i].setInt(j * 4, 0xDEADBEEF);
+      }
+    }
+    for (int i = 0; i < bufs.length; i++) {
+      bufs[i].release();
+    }
+  }
+
+  private RowSetSource makeFirst() {
+    BatchSchema firstSchema = new SchemaBuilder()
+        .add("d", MinorType.VARCHAR)
+        .add("a", MinorType.INT)
+        .build();
+    return new RowSetSource(
+        fixture.rowSetBuilder(firstSchema)
+          .addRow("barney", 10)
+          .addRow("wilma", 20)
+          .build());
+  }
+
+  private RowSetSource makeSecond() {
+    BatchSchema secondSchema = new SchemaBuilder()
+        .add("b", MinorType.INT)
+        .add("c", MinorType.VARCHAR)
+        .build();
+    return new RowSetSource(
+        fixture.rowSetBuilder(secondSchema)
+          .addRow(1, "foo.csv")
+          .addRow(2, "foo.csv")
+          .build());
+  }
+
+  public static class TestProjection extends ResolvedColumn {
+
+    public TestProjection(VectorSource source, int sourceIndex) {
+      super(source, sourceIndex);
+    }
+
+    @Override
+    public String name() { return null; }
+
+    @Override
+    public int nodeType() { return -1; }
+
+    @Override
+    public MaterializedField schema() { return null; }
+  }
+
+  @Test
+  public void testSimpleFlat() {
+
+    // Create the first batch
+
+    RowSetSource first = makeFirst();
+
+    // Create the second batch
+
+    RowSetSource second = makeSecond();
+
+    ResolvedRow resolvedTuple = new ResolvedRow(null);
+    resolvedTuple.add(new TestProjection(first, 1));
+    resolvedTuple.add(new TestProjection(second, 0));
+    resolvedTuple.add(new TestProjection(second, 1));
+    resolvedTuple.add(new TestProjection(first, 0));
+
+    // Do the merge
+
+    VectorContainer output = new VectorContainer(fixture.allocator());
+    resolvedTuple.project(null, output);
+    output.setRecordCount(first.rowSet().rowCount());
+    RowSet result = fixture.wrap(output);
+
+    // Verify
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .add("c", MinorType.VARCHAR)
+        .add("d", MinorType.VARCHAR)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(10, 1, "foo.csv", "barney")
+        .addRow(20, 2, "foo.csv", "wilma")
+        .build();
+
+    new RowSetComparison(expected)
+      .verifyAndClearAll(result);
+  }
+
+  @Test
+  public void testImplicitFlat() {
+
+    // Create the first batch
+
+    RowSetSource first = makeFirst();
+
+    // Create the second batch
+
+    RowSetSource second = makeSecond();
+
+    ResolvedRow resolvedTuple = new ResolvedRow(null);
+    resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+    resolvedTuple.add(new TestProjection(second, 0));
+    resolvedTuple.add(new TestProjection(second, 1));
+    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+    // Do the merge
+
+    VectorContainer output = new VectorContainer(fixture.allocator());
+    resolvedTuple.project(first.rowSet().container(), output);
+    output.setRecordCount(first.rowSet().rowCount());
+    RowSet result = fixture.wrap(output);
+
+    // Verify
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .add("c", MinorType.VARCHAR)
+        .add("d", MinorType.VARCHAR)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(10, 1, "foo.csv", "barney")
+        .addRow(20, 2, "foo.csv", "wilma")
+        .build();
+
+    new RowSetComparison(expected)
+      .verifyAndClearAll(result);
+  }
+
+  @Test
+  public void testFlatWithNulls() {
+
+    // Create the first batch
+
+    RowSetSource first = makeFirst();
+
+    // Create null columns
+
+    NullColumnBuilder builder = new NullColumnBuilder(null, false);
+
+    ResolvedRow resolvedTuple = new ResolvedRow(builder);
+    resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+    resolvedTuple.add(resolvedTuple.nullBuilder().add("null1"));
+    resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
+    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+    // Build the null values
+
+    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+    builder.build(cache);
+    builder.load(first.rowSet().rowCount());
+
+    // Do the merge
+
+    VectorContainer output = new VectorContainer(fixture.allocator());
+    resolvedTuple.project(first.rowSet().container(), output);
+    output.setRecordCount(first.rowSet().rowCount());
+    RowSet result = fixture.wrap(output);
+
+    // Verify
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("null1", MinorType.INT)
+        .addNullable("null2", MinorType.VARCHAR)
+        .add("d", MinorType.VARCHAR)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(10, null, null, "barney")
+        .addRow(20, null, null, "wilma")
+        .build();
+
+    new RowSetComparison(expected)
+      .verifyAndClearAll(result);
+    builder.close();
+  }
+
+  /**
+   * Test the ability to create maps from whole cloth if requested in
+   * the projection list, and the map is not available from the data
+   * source.
+   */
+
+  @Test
+  public void testNullMaps() {
+
+    // Create the first batch
+
+    RowSetSource first = makeFirst();
+
+    // Create null columns
+
+    NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    ResolvedRow resolvedTuple = new ResolvedRow(builder);
+    resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+
+    ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1");
+    ResolvedTuple nullMap = nullMapCol.members();
+    nullMap.add(nullMap.nullBuilder().add("null1"));
+    nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
+
+    ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2");
+    ResolvedTuple nullMap2 = nullMapCol2.members();
+    nullMap2.add(nullMap2.nullBuilder().add("null3"));
+    nullMap.add(nullMapCol2);
+
+    resolvedTuple.add(nullMapCol);
+    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+    // Build the null values
+
+    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+    resolvedTuple.buildNulls(cache);
+
+    // LoadNulls
+
+    resolvedTuple.loadNulls(first.rowSet().rowCount());
+
+    // Do the merge
+
+    VectorContainer output = new VectorContainer(fixture.allocator());
+    resolvedTuple.project(first.rowSet().container(), output);
+    resolvedTuple.setRowCount(first.rowSet().rowCount());
+    RowSet result = fixture.wrap(output);
+
+    // Verify
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("map1")
+          .addNullable("null1", MinorType.INT)
+          .addNullable("null2", MinorType.VARCHAR)
+          .addMap("map2")
+            .addNullable("null3", MinorType.INT)
+            .resumeMap()
+          .resumeSchema()
+        .add("d", MinorType.VARCHAR)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(10, mapValue(null, null, singleMap(null)), "barney")
+        .addRow(20, mapValue(null, null, singleMap(null)), "wilma")
+        .build();
+
+    new RowSetComparison(expected)
+      .verifyAndClearAll(result);
+    resolvedTuple.close();
+  }
+
+  /**
+   * Test that the merger mechanism can rewrite a map to include
+   * projected null columns.
+   */
+
+  @Test
+  public void testMapRevision() {
+
+    // Create the first batch
+
+    BatchSchema inputSchema = new SchemaBuilder()
+        .add("b", MinorType.VARCHAR)
+        .addMap("a")
+          .add("c", MinorType.INT)
+          .resumeSchema()
+        .build();
+    RowSetSource input = new RowSetSource(
+        fixture.rowSetBuilder(inputSchema)
+          .addRow("barney", singleMap(10))
+          .addRow("wilma", singleMap(20))
+          .build());
+
+    // Create mappings
+
+    NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    ResolvedRow resolvedTuple = new ResolvedRow(builder);
+
+    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+    ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
+        inputSchema.getColumn(1), 1);
+    resolvedTuple.add(mapCol);
+    ResolvedTuple map = mapCol.members();
+    map.add(new TestProjection(map, 0));
+    map.add(map.nullBuilder().add("null1"));
+
+    // Build the null values
+
+    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+    resolvedTuple.buildNulls(cache);
+
+    // LoadNulls
+
+    resolvedTuple.loadNulls(input.rowSet().rowCount());
+
+    // Do the merge
+
+    VectorContainer output = new VectorContainer(fixture.allocator());
+    resolvedTuple.project(input.rowSet().container(), output);
+    output.setRecordCount(input.rowSet().rowCount());
+    RowSet result = fixture.wrap(output);
+
+    // Verify
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("b", MinorType.VARCHAR)
+        .addMap("a")
+          .add("c", MinorType.INT)
+          .addNullable("null1", MinorType.INT)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("barney", mapValue(10, null))
+        .addRow("wilma", mapValue(20, null))
+        .build();
+
+    new RowSetComparison(expected)
+      .verifyAndClearAll(result);
+  }
+
+  /**
+   * Test that the merger mechanism can rewrite a map array to include
+   * projected null columns.
+   */
+
+  @Test
+  public void testMapArrayRevision() {
+
+    // Create the first batch
+
+    BatchSchema inputSchema = new SchemaBuilder()
+        .add("b", MinorType.VARCHAR)
+        .addMapArray("a")
+          .add("c", MinorType.INT)
+          .resumeSchema()
+        .build();
+    RowSetSource input = new RowSetSource(
+        fixture.rowSetBuilder(inputSchema)
+          .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12)))
+          .addRow("wilma", mapArray(singleMap(20), singleMap(21)))
+          .build());
+
+    // Create mappings
+
+    NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    ResolvedRow resolvedTuple = new ResolvedRow(builder);
+
+    resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+    ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
+        inputSchema.getColumn(1), 1);
+    resolvedTuple.add(mapCol);
+    ResolvedTuple map = mapCol.members();
+    map.add(new TestProjection(map, 0));
+    map.add(map.nullBuilder().add("null1"));
+
+    // Build the null values
+
+    ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+    resolvedTuple.buildNulls(cache);
+
+    // LoadNulls
+
+    resolvedTuple.loadNulls(input.rowSet().rowCount());
+
+    // Do the merge
+
+    VectorContainer output = new VectorContainer(fixture.allocator());
+    resolvedTuple.project(input.rowSet().container(), output);
+    output.setRecordCount(input.rowSet().rowCount());
+    RowSet result = fixture.wrap(output);
+
+    // Verify
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("b", MinorType.VARCHAR)
+        .addMapArray("a")
+          .add("c", MinorType.INT)
+          .addNullable("null1", MinorType.INT)
+          .resumeSchema()
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("barney", mapArray(
+            mapValue(10, null), mapValue(11, null), mapValue(12, null)))
+        .addRow("wilma", mapArray(
+            mapValue(20, null), mapValue(21, null)))
+        .build();
+
+    new RowSetComparison(expected)
+      .verifyAndClearAll(result);
+  }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
new file mode 100644
index 00000000000..e07ad9a6252
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.test.SubOperatorTest;
+import org.junit.Test;
+
+/**
+ * Test the level of projection done at the level of the scan as a whole;
+ * before knowledge of table "implicit" columns or the specific table schema.
+ */
+
+public class TestScanLevelProjection extends SubOperatorTest {
+
+  /**
+   * Basic test: select a set of columns (a, b, c) when the
+   * data source has an early schema of (a, c, d). (a, c) are
+   * projected, (d) is null.
+   */
+
+  @Test
+  public void testBasics() {
+
+    // Simulate SELECT a, b, c ...
+    // Build the projection plan and verify
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("a", "b", "c"),
+        ScanTestUtils.parsers());
+    assertFalse(scanProj.projectAll());
+    assertFalse(scanProj.projectNone());
+
+    assertEquals(3, scanProj.requestedCols().size());
+    assertEquals("a", scanProj.requestedCols().get(0).rootName());
+    assertEquals("b", scanProj.requestedCols().get(1).rootName());
+    assertEquals("c", scanProj.requestedCols().get(2).rootName());
+
+    assertEquals(3, scanProj.columns().size());
+    assertEquals("a", scanProj.columns().get(0).name());
+    assertEquals("b", scanProj.columns().get(1).name());
+    assertEquals("c", scanProj.columns().get(2).name());
+
+    // Verify column type
+
+    assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+  }
+
+  /**
+   * Map projection occurs when a query contains project-list items with
+   * a dot, such as "a.b". We may not know the type of "b", but have
+   * just learned that "a" must be a map.
+   */
+
+  @Test
+  public void testMap() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
+        ScanTestUtils.parsers());
+    assertFalse(scanProj.projectAll());
+    assertFalse(scanProj.projectNone());
+
+    assertEquals(3, scanProj.columns().size());
+    assertEquals("a", scanProj.columns().get(0).name());
+    assertEquals("b", scanProj.columns().get(1).name());
+    assertEquals("c", scanProj.columns().get(2).name());
+
+    // Verify column type
+
+    assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+
+    // Map structure
+
+    final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
+    assertTrue(a.isTuple());
+    assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("x"));
+    assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("y"));
+    assertEquals(ProjectionType.UNPROJECTED,  a.mapProjection().projectionType("z"));
+
+    final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element();
+    assertTrue(c.isSimple());
+  }
+
+  /**
+   * Similar to maps, if the project list contains "a[1]" then we've learned that
+   * a is an array, but we don't know what type.
+   */
+
+  @Test
+  public void testArray() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("a[1]", "a[3]"),
+        ScanTestUtils.parsers());
+    assertFalse(scanProj.projectAll());
+    assertFalse(scanProj.projectNone());
+
+    assertEquals(1, scanProj.columns().size());
+    assertEquals("a", scanProj.columns().get(0).name());
+
+    // Verify column type
+
+    assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+
+    // Map structure
+
+    final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
+    assertTrue(a.isArray());
+    assertFalse(a.hasIndex(0));
+    assertTrue(a.hasIndex(1));
+    assertFalse(a.hasIndex(2));
+    assertTrue(a.hasIndex(3));
+  }
+
+  /**
+   * Simulate a SELECT * query by passing "*" as a column name.
+   */
+
+  @Test
+  public void testWildcard() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    assertTrue(scanProj.projectAll());
+    assertFalse(scanProj.projectNone());
+    assertEquals(1, scanProj.requestedCols().size());
+    assertTrue(scanProj.requestedCols().get(0).isDynamicStar());
+
+    assertEquals(1, scanProj.columns().size());
+    assertEquals(SchemaPath.DYNAMIC_STAR, scanProj.columns().get(0).name());
+
+    // Verify bindings
+
+    assertEquals(scanProj.columns().get(0).name(), scanProj.requestedCols().get(0).rootName());
+
+    // Verify column type
+
+    assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType());
+  }
+
+  /**
+   * Test an empty projection which occurs in a
+   * SELECT COUNT(*) query.
+   */
+
+  @Test
+  public void testEmptyProjection() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList(),
+        ScanTestUtils.parsers());
+
+    assertFalse(scanProj.projectAll());
+    assertTrue(scanProj.projectNone());
+    assertEquals(0, scanProj.requestedCols().size());
+  }
+
+  /**
+   * Can't include both a wildcard and a column name.
+   */
+
+  @Test
+  public void testErrorWildcardAndColumns() {
+    try {
+      new ScanLevelProjection(
+          RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"),
+          ScanTestUtils.parsers());
+      fail();
+    } catch (final IllegalArgumentException e) {
+      // Expected
+    }
+  }
+
+  /**
+   * Can't include both a column name and a wildcard.
+   */
+
+  @Test
+  public void testErrorColumnAndWildcard() {
+    try {
+      new ScanLevelProjection(
+          RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR),
+          ScanTestUtils.parsers());
+      fail();
+    } catch (final IllegalArgumentException e) {
+      // Expected
+    }
+  }
+
+  /**
+   * Can't include a wildcard twice.
+   * <p>
+   * Note: Drill actually allows this, but the work should be done
+   * in the project operator; scan should see at most one wildcard.
+   */
+
+  @Test
+  public void testErrorTwoWildcards() {
+    try {
+      new ScanLevelProjection(
+          RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR),
+          ScanTestUtils.parsers());
+      fail();
+    } catch (final UserException e) {
+      // Expected
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java
new file mode 100644
index 00000000000..839e2caa4c4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
+import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * "Schema level projection" describes one side of the projection
+ * mechanism. When we project, we have the set of column the user
+ * wants "the schema level" and the set of columns on offer from
+ * the data source "the scan level." The projection mechanism
+ * combines these to map out the actual projection.
+ */
+
+public class TestSchemaLevelProjection extends SubOperatorTest {
+
+  /**
+   * Test wildcard projection: take all columns on offer from
+   * the data source, in the order that the data source specifies.
+   */
+
+  @Test
+  public void testWildcard() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+    assertEquals(1, scanProj.columns().size());
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .addNullable("c", MinorType.INT)
+        .addArray("d", MinorType.FLOAT8)
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    new WildcardSchemaProjection(
+        scanProj, tableSchema, rootTuple,
+        ScanTestUtils.resolvers());
+
+    final List<ResolvedColumn> columns = rootTuple.columns();
+    assertEquals(3, columns.size());
+
+    assertEquals("a", columns.get(0).name());
+    assertEquals(0, columns.get(0).sourceIndex());
+    assertSame(rootTuple, columns.get(0).source());
+    assertEquals("c", columns.get(1).name());
+    assertEquals(1, columns.get(1).sourceIndex());
+    assertSame(rootTuple, columns.get(1).source());
+    assertEquals("d", columns.get(2).name());
+    assertEquals(2, columns.get(2).sourceIndex());
+    assertSame(rootTuple, columns.get(2).source());
+  }
+
+  /**
+   * Test SELECT list with columns defined in a order and with
+   * name case different than the early-schema table.
+   */
+
+  @Test
+  public void testFullList() {
+
+    // Simulate SELECT c, b, a ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("c", "b", "a"),
+        ScanTestUtils.parsers());
+    assertEquals(3, scanProj.columns().size());
+
+    // Simulate a data source, with early schema, of (a, b, c)
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("A", MinorType.VARCHAR)
+        .add("B", MinorType.VARCHAR)
+        .add("C", MinorType.VARCHAR)
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    new ExplicitSchemaProjection(
+        scanProj, tableSchema, rootTuple,
+        ScanTestUtils.resolvers());
+
+    final List<ResolvedColumn> columns = rootTuple.columns();
+    assertEquals(3, columns.size());
+
+    assertEquals("c", columns.get(0).name());
+    assertEquals(2, columns.get(0).sourceIndex());
+    assertSame(rootTuple, columns.get(0).source());
+
+    assertEquals("b", columns.get(1).name());
+    assertEquals(1, columns.get(1).sourceIndex());
+    assertSame(rootTuple, columns.get(1).source());
+
+    assertEquals("a", columns.get(2).name());
+    assertEquals(0, columns.get(2).sourceIndex());
+    assertSame(rootTuple, columns.get(2).source());
+  }
+
+  /**
+   * Test SELECT list with columns missing from the table schema.
+   */
+
+  @Test
+  public void testMissing() {
+
+    // Simulate SELECT c, v, b, w ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("c", "v", "b", "w"),
+        ScanTestUtils.parsers());
+    assertEquals(4, scanProj.columns().size());
+
+    // Simulate a data source, with early schema, of (a, b, c)
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("A", MinorType.VARCHAR)
+        .add("B", MinorType.VARCHAR)
+        .add("C", MinorType.VARCHAR)
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    new ExplicitSchemaProjection(
+        scanProj, tableSchema, rootTuple,
+        ScanTestUtils.resolvers());
+
+    final List<ResolvedColumn> columns = rootTuple.columns();
+    assertEquals(4, columns.size());
+    final VectorSource nullBuilder = rootTuple.nullBuilder();
+
+    assertEquals("c", columns.get(0).name());
+    assertEquals(2, columns.get(0).sourceIndex());
+    assertSame(rootTuple, columns.get(0).source());
+
+    assertEquals("v", columns.get(1).name());
+    assertEquals(0, columns.get(1).sourceIndex());
+    assertSame(nullBuilder, columns.get(1).source());
+
+    assertEquals("b", columns.get(2).name());
+    assertEquals(1, columns.get(2).sourceIndex());
+    assertSame(rootTuple, columns.get(2).source());
+
+    assertEquals("w", columns.get(3).name());
+    assertEquals(1, columns.get(3).sourceIndex());
+    assertSame(nullBuilder, columns.get(3).source());
+  }
+
+  /**
+   * Test an explicit projection (providing columns) in which the
+   * names in the project lists are a different case than the data
+   * source, the order of columns differs, and we ask for a
+   * subset of data source columns.
+   */
+  @Test
+  public void testSubset() {
+
+    // Simulate SELECT c, a ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("c", "a"),
+        ScanTestUtils.parsers());
+    assertEquals(2, scanProj.columns().size());
+
+    // Simulate a data source, with early schema, of (a, b, c)
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("A", MinorType.VARCHAR)
+        .add("B", MinorType.VARCHAR)
+        .add("C", MinorType.VARCHAR)
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    new ExplicitSchemaProjection(
+        scanProj, tableSchema, rootTuple,
+        ScanTestUtils.resolvers());
+
+    final List<ResolvedColumn> columns = rootTuple.columns();
+    assertEquals(2, columns.size());
+
+    assertEquals("c", columns.get(0).name());
+    assertEquals(2, columns.get(0).sourceIndex());
+    assertSame(rootTuple, columns.get(0).source());
+
+    assertEquals("a", columns.get(1).name());
+    assertEquals(0, columns.get(1).sourceIndex());
+    assertSame(rootTuple, columns.get(1).source());
+  }
+
+  /**
+   * Drill is unique in that we can select (a, b) from a data source
+   * that only offers (c, d). We get null columns as a result.
+   */
+
+  @Test
+  public void testDisjoint() {
+
+    // Simulate SELECT c, a ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("b"),
+        ScanTestUtils.parsers());
+    assertEquals(1, scanProj.columns().size());
+
+    // Simulate a data source, with early schema, of (a)
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("A", MinorType.VARCHAR)
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    new ExplicitSchemaProjection(
+        scanProj, tableSchema, rootTuple,
+        ScanTestUtils.resolvers());
+
+    final List<ResolvedColumn> columns = rootTuple.columns();
+    assertEquals(1, columns.size());
+    final VectorSource nullBuilder = rootTuple.nullBuilder();
+
+    assertEquals("b", columns.get(0).name());
+    assertEquals(0, columns.get(0).sourceIndex());
+    assertSame(nullBuilder, columns.get(0).source());
+  }
+
+  /**
+   * Test the obscure case that the data source contains a map, but we
+   * project only one of the members of the map. The output should be a
+   * map that contains only the members we request.
+   */
+
+  @Test
+  public void testOmittedMap() {
+
+    // Simulate SELECT a, b.c.d ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("a", "b.c.d"),
+        ScanTestUtils.parsers());
+    assertEquals(2, scanProj.columns().size());
+    {
+      assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(1).nodeType());
+      final UnresolvedColumn bCol = (UnresolvedColumn) (scanProj.columns().get(1));
+      assertTrue(bCol.element().isTuple());
+    }
+
+    // Simulate a data source, with early schema, of (a)
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    new ExplicitSchemaProjection(
+        scanProj, tableSchema, rootTuple,
+        ScanTestUtils.resolvers());
+
+    final List<ResolvedColumn> columns = rootTuple.columns();
+    assertEquals(2, columns.size());
+
+    // Should have resolved a to a table column, b to a missing map.
+
+    // A is projected
+
+    final ResolvedColumn aCol = columns.get(0);
+    assertEquals("a", aCol.name());
+    assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
+
+    // B is not projected, is implicitly a map
+
+    final ResolvedColumn bCol = columns.get(1);
+    assertEquals("b", bCol.name());
+    assertEquals(ResolvedMapColumn.ID, bCol.nodeType());
+
+    final ResolvedMapColumn bMap = (ResolvedMapColumn) bCol;
+    final ResolvedTuple bMembers = bMap.members();
+    assertNotNull(bMembers);
+    assertEquals(1, bMembers.columns().size());
+
+    // C is a map within b
+
+    final ResolvedColumn cCol = bMembers.columns().get(0);
+    assertEquals(ResolvedMapColumn.ID, cCol.nodeType());
+
+    final ResolvedMapColumn cMap = (ResolvedMapColumn) cCol;
+    final ResolvedTuple cMembers = cMap.members();
+    assertNotNull(cMembers);
+    assertEquals(1, cMembers.columns().size());
+
+    // D is an unknown column type (not a map)
+
+    final ResolvedColumn dCol = cMembers.columns().get(0);
+    assertEquals(ResolvedNullColumn.ID, dCol.nodeType());
+  }
+
+  /**
+   * Test of a map with missing columns.
+   * table of (a{b, c}), project a.c, a.d, a.e.f
+   */
+
+  @Test
+  public void testOmittedMapMembers() {
+
+    // Simulate SELECT a.c, a.d, a.e.f ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("x", "a.c", "a.d", "a.e.f", "y"),
+        ScanTestUtils.parsers());
+    assertEquals(3, scanProj.columns().size());
+
+    // Simulate a data source, with early schema, of (x, y, a{b, c})
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("x", MinorType.VARCHAR)
+        .add("y", MinorType.INT)
+        .addMap("a")
+          .add("b", MinorType.BIGINT)
+          .add("c", MinorType.FLOAT8)
+          .resumeSchema()
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    new ExplicitSchemaProjection(
+        scanProj, tableSchema, rootTuple,
+        ScanTestUtils.resolvers());
+
+    final List<ResolvedColumn> columns = rootTuple.columns();
+    assertEquals(3, columns.size());
+
+    // Should have resolved a.b to a map column,
+    // a.d to a missing nested map, and a.e.f to a missing
+    // nested map member
+
+    // X is projected
+
+    final ResolvedColumn xCol = columns.get(0);
+    assertEquals("x", xCol.name());
+    assertEquals(ResolvedTableColumn.ID, xCol.nodeType());
+    assertSame(rootTuple, ((ResolvedTableColumn) (xCol)).source());
+    assertEquals(0, ((ResolvedTableColumn) (xCol)).sourceIndex());
+
+    // Y is projected
+
+    final ResolvedColumn yCol = columns.get(2);
+    assertEquals("y", yCol.name());
+    assertEquals(ResolvedTableColumn.ID, yCol.nodeType());
+    assertSame(rootTuple, ((ResolvedTableColumn) (yCol)).source());
+    assertEquals(1, ((ResolvedTableColumn) (yCol)).sourceIndex());
+
+    // A is projected
+
+    final ResolvedColumn aCol = columns.get(1);
+    assertEquals("a", aCol.name());
+    assertEquals(ResolvedMapColumn.ID, aCol.nodeType());
+
+    final ResolvedMapColumn aMap = (ResolvedMapColumn) aCol;
+    final ResolvedTuple aMembers = aMap.members();
+    assertFalse(aMembers.isSimpleProjection());
+    assertNotNull(aMembers);
+    assertEquals(3, aMembers.columns().size());
+
+    // a.c is projected
+
+    final ResolvedColumn acCol = aMembers.columns().get(0);
+    assertEquals("c", acCol.name());
+    assertEquals(ResolvedTableColumn.ID, acCol.nodeType());
+    assertEquals(1, ((ResolvedTableColumn) (acCol)).sourceIndex());
+
+    // a.d is not in the table, is null
+
+    final ResolvedColumn adCol = aMembers.columns().get(1);
+    assertEquals("d", adCol.name());
+    assertEquals(ResolvedNullColumn.ID, adCol.nodeType());
+
+    // a.e is not in the table, is implicitly a map
+
+    final ResolvedColumn aeCol = aMembers.columns().get(2);
+    assertEquals("e", aeCol.name());
+    assertEquals(ResolvedMapColumn.ID, aeCol.nodeType());
+
+    final ResolvedMapColumn aeMap = (ResolvedMapColumn) aeCol;
+    final ResolvedTuple aeMembers = aeMap.members();
+    assertFalse(aeMembers.isSimpleProjection());
+    assertNotNull(aeMembers);
+    assertEquals(1, aeMembers.columns().size());
+
+    // a.d.f is a null column
+
+    final ResolvedColumn aefCol = aeMembers.columns().get(0);
+    assertEquals("f", aefCol.name());
+    assertEquals(ResolvedNullColumn.ID, aefCol.nodeType());
+  }
+
+  /**
+   * Simple map project. This is an internal case in which the
+   * query asks for a set of columns inside a map, and the table
+   * loader produces exactly that set. No special projection is
+   * needed, the map is projected as a whole.
+   */
+
+  @Test
+  public void testSimpleMapProject() {
+
+    // Simulate SELECT a.b, a.c ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("a.b", "a.c"),
+        ScanTestUtils.parsers());
+    assertEquals(1, scanProj.columns().size());
+
+    // Simulate a data source, with early schema, of (a{b, c})
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .addMap("a")
+          .add("b", MinorType.BIGINT)
+          .add("c", MinorType.FLOAT8)
+          .resumeSchema()
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    new ExplicitSchemaProjection(
+        scanProj, tableSchema, rootTuple,
+        ScanTestUtils.resolvers());
+
+    final List<ResolvedColumn> columns = rootTuple.columns();
+    assertEquals(1, columns.size());
+
+    // Should have resolved a.b to a map column,
+    // a.d to a missing nested map, and a.e.f to a missing
+    // nested map member
+
+    // a is projected as a vector, not as a structured map
+
+    final ResolvedColumn aCol = columns.get(0);
+    assertEquals("a", aCol.name());
+    assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
+    assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
+    assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
+  }
+
+  /**
+   * Project of a non-map as a map
+   */
+
+  @Test
+  public void testMapMismatch() {
+
+    // Simulate SELECT a.b ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("a.b"),
+        ScanTestUtils.parsers());
+
+    // Simulate a data source, with early schema, of (a)
+    // where a is not a map.
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    try {
+      new ExplicitSchemaProjection(
+          scanProj, tableSchema, rootTuple,
+          ScanTestUtils.resolvers());
+      fail();
+    } catch (final UserException e) {
+      // Expected
+    }
+  }
+
+  /**
+   * Test project of an array. At the scan level, we just verify
+   * that the requested column is, indeed, an array.
+   */
+
+  @Test
+  public void testArrayProject() {
+
+    // Simulate SELECT a[0] ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("a[0]"),
+        ScanTestUtils.parsers());
+
+    // Simulate a data source, with early schema, of (a)
+    // where a is not an array.
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .addArray("a", MinorType.VARCHAR)
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    new ExplicitSchemaProjection(
+          scanProj, tableSchema, rootTuple,
+          ScanTestUtils.resolvers());
+
+    final List<ResolvedColumn> columns = rootTuple.columns();
+    assertEquals(1, columns.size());
+
+    final ResolvedColumn aCol = columns.get(0);
+    assertEquals("a", aCol.name());
+    assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
+    assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
+    assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
+  }
+
+  /**
+   * Project of a non-array as an array
+   */
+
+  @Test
+  public void testArrayMismatch() {
+
+    // Simulate SELECT a[0] ...
+
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectList("a[0]"),
+        ScanTestUtils.parsers());
+
+    // Simulate a data source, with early schema, of (a)
+    // where a is not an array.
+
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .buildSchema();
+
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    try {
+      new ExplicitSchemaProjection(
+          scanProj, tableSchema, rootTuple,
+          ScanTestUtils.resolvers());
+      fail();
+    } catch (final UserException e) {
+      // Expected
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
new file mode 100644
index 00000000000..977cd216b70
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
+import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
+import org.apache.drill.exec.physical.impl.scan.project.SmoothingProjection;
+import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Tests schema smoothing at the schema projection level.
+ * This level handles reusing prior types when filling null
+ * values. But, because no actual vectors are involved, it
+ * does not handle the schema chosen for a table ahead of
+ * time, only the schema as it is merged with prior schema to
+ * detect missing columns.
+ * <p>
+ * Focuses on the <tt>SmoothingProjection</tt> class itself.
+ * <p>
+ * Note that, at present, schema smoothing does not work for entire
+ * maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt>
+ * and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does
+ * not currently know how to recreate the map. The same is true of
+ * lists and unions. Handling such cases is complex and is probably
+ * better handled via a system that allows the user to specify their
+ * intent by providing a schema to apply to the two files.
+ * <p>
+ * Note that schema smoothing itself is an experimental work-around
+ * to a fundamental limitation in Drill:
+ * <ul>
+ * <li>Drill cannot predict the future: each file (or batch)
+ * may have a different schema.</ul>
+ * <li>Drill does not know about these differences until they
+ * occur.</li>
+ * <li>The scan operator is obligated to return the same schema
+ * (and same vectors) from one file to the next, else a "hard"
+ * schema change is sent down stream.</li>
+ * </ul>
+ *
+ * The problem is actually intractable. The schema smoother handles the
+ * cases that can be handled, such as required --> nullable, a column
+ * disappearing, etc. This whole mechanism should be scrapped if/when
+ * Drill can work with schemas. Or, keep this to handle, as best we can,
+ * odd schemas, but insist on a schema to resolve issues that this
+ * mechanism cannot handle (and that, indeed, no algorithm could handle
+ * because such an algorithm would require time-travel: looking into
+ * the future to know what data will be scanned.)
+ */
+
+public class TestSchemaSmoothing extends SubOperatorTest {
+
+  /**
+   * Low-level test of the smoothing projection, including the exceptions
+   * it throws when things are not going its way.
+   */
+
+  @Test
+  public void testSmoothingProjection() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    // Table 1: (a: nullable bigint, b)
+
+    final TupleMetadata schema1 = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addNullable("b", MinorType.VARCHAR)
+        .add("c", MinorType.FLOAT8)
+        .buildSchema();
+    ResolvedRow priorSchema;
+    {
+      final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+      final ResolvedRow rootTuple = new ResolvedRow(builder);
+      new WildcardSchemaProjection(
+          scanProj, schema1, rootTuple,
+          ScanTestUtils.resolvers());
+      priorSchema = rootTuple;
+    }
+
+    // Table 2: (a: nullable bigint, c), column omitted, original schema preserved
+
+    final TupleMetadata schema2 = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .add("c", MinorType.FLOAT8)
+        .buildSchema();
+    try {
+      final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+      final ResolvedRow rootTuple = new ResolvedRow(builder);
+      new SmoothingProjection(
+          scanProj, schema2, priorSchema, rootTuple,
+          ScanTestUtils.resolvers());
+      assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
+      priorSchema = rootTuple;
+    } catch (final IncompatibleSchemaException e) {
+      fail();
+    }
+
+    // Table 3: (a, c, d), column added, must replan schema
+
+    final TupleMetadata schema3 = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addNullable("b", MinorType.VARCHAR)
+        .add("c", MinorType.FLOAT8)
+        .add("d", MinorType.INT)
+        .buildSchema();
+    try {
+      final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+      final ResolvedRow rootTuple = new ResolvedRow(builder);
+      new SmoothingProjection(
+          scanProj, schema3, priorSchema, rootTuple,
+          ScanTestUtils.resolvers());
+      fail();
+    } catch (final IncompatibleSchemaException e) {
+      // Expected
+    }
+
+    // Table 4: (a: double), change type must replan schema
+
+    final TupleMetadata schema4 = new SchemaBuilder()
+        .addNullable("a", MinorType.FLOAT8)
+        .addNullable("b", MinorType.VARCHAR)
+        .add("c", MinorType.FLOAT8)
+        .buildSchema();
+    try {
+      final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+      final ResolvedRow rootTuple = new ResolvedRow(builder);
+      new SmoothingProjection(
+          scanProj, schema4, priorSchema, rootTuple,
+          ScanTestUtils.resolvers());
+      fail();
+    } catch (final IncompatibleSchemaException e) {
+      // Expected
+    }
+
+    // Table 5: Drop a non-nullable column, must replan
+
+    final TupleMetadata schema6 = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addNullable("b", MinorType.VARCHAR)
+        .buildSchema();
+    try {
+      final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+      final ResolvedRow rootTuple = new ResolvedRow(builder);
+      new SmoothingProjection(
+          scanProj, schema6, priorSchema, rootTuple,
+          ScanTestUtils.resolvers());
+      fail();
+    } catch (final IncompatibleSchemaException e) {
+      // Expected
+    }
+  }
+
+  /**
+   * Case in which the table schema is a superset of the prior
+   * schema. Discard prior schema. Turn off auto expansion of
+   * metadata for a simpler test.
+   */
+
+  @Test
+  public void testSmaller() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers());
+
+    final TupleMetadata priorSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    {
+      final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+      final ResolvedRow rootTuple = new ResolvedRow(builder);
+      smoother.resolve(priorSchema, rootTuple);
+      assertEquals(1, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+    }
+    {
+      final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+      final ResolvedRow rootTuple = new ResolvedRow(builder);
+      smoother.resolve(tableSchema, rootTuple);
+      assertEquals(2, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+    }
+  }
+
+  /**
+   * Case in which the table schema and prior are disjoint
+   * sets. Discard the prior schema.
+   */
+
+  @Test
+  public void testDisjoint() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers());
+
+    final TupleMetadata priorSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    {
+      doResolve(smoother, priorSchema);
+    }
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(2, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+    }
+  }
+
+  private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) {
+    final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+    final ResolvedRow rootTuple = new ResolvedRow(builder);
+    smoother.resolve(schema, rootTuple);
+    return rootTuple;
+  }
+
+  /**
+   * Column names match, but types differ. Discard the prior schema.
+   */
+
+  @Test
+  public void testDifferentTypes() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers());
+
+    final TupleMetadata priorSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    {
+      doResolve(smoother, priorSchema);
+    }
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(2, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+    }
+  }
+
+  /**
+   * The prior and table schemas are identical. Preserve the prior
+   * schema (though, the output is no different than if we discarded
+   * the prior schema...)
+   */
+
+  @Test
+  public void testSameSchemas() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers());
+
+    final TupleMetadata priorSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    {
+      doResolve(smoother, priorSchema);
+    }
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(1, smoother.schemaVersion());
+      final TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple);
+      assertTrue(actualSchema.isEquivalent(tableSchema));
+      assertTrue(actualSchema.isEquivalent(priorSchema));
+    }
+  }
+
+  /**
+   * The prior and table schemas are identical, but the cases of names differ.
+   * Preserve the case of the first schema.
+   */
+
+  @Test
+  public void testDifferentCase() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers());
+
+    final TupleMetadata priorSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("A", MinorType.INT)
+        .add("B", MinorType.VARCHAR)
+        .buildSchema();
+
+    {
+      doResolve(smoother, priorSchema);
+    }
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(1, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+      final List<ResolvedColumn> columns = rootTuple.columns();
+      assertEquals("a", columns.get(0).name());
+    }
+  }
+
+  /**
+   * Can't preserve the prior schema if it had required columns
+   * where the new schema has no columns.
+   */
+
+  @Test
+  public void testRequired() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers());
+
+    final TupleMetadata priorSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR)
+        .buildSchema();
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .addNullable("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    {
+      doResolve(smoother, priorSchema);
+    }
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(2, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+    }
+  }
+
+  /**
+   * Preserve the prior schema if table is a subset and missing columns
+   * are nullable or repeated.
+   */
+
+  @Test
+  public void testMissingNullableColumns() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers());
+
+    final TupleMetadata priorSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .addArray("c", MinorType.BIGINT)
+        .buildSchema();
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+
+    {
+      doResolve(smoother, priorSchema);
+    }
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(1, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+    }
+  }
+
+  /**
+   * Preserve the prior schema if table is a subset. Map the table
+   * columns to the output using the prior schema ordering.
+   */
+
+  @Test
+  public void testReordering() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers());
+
+    final TupleMetadata priorSchema = new SchemaBuilder()
+        .addNullable("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .addArray("c", MinorType.BIGINT)
+        .buildSchema();
+    final TupleMetadata tableSchema = new SchemaBuilder()
+        .add("b", MinorType.VARCHAR)
+        .addNullable("a", MinorType.INT)
+        .buildSchema();
+
+    {
+      doResolve(smoother, priorSchema);
+    }
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+      assertEquals(1, smoother.schemaVersion());
+      assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+    }
+  }
+  /**
+   * Integrated test across multiple schemas at the batch level.
+   */
+
+  @Test
+  public void testSmoothableSchemaBatches() {
+    final ScanLevelProjection scanProj = new ScanLevelProjection(
+        RowSetTestUtils.projectAll(),
+        ScanTestUtils.parsers());
+
+    final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+        ScanTestUtils.resolvers());
+
+    // Table 1: (a: bigint, b)
+
+    final TupleMetadata schema1 = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addNullable("b", MinorType.VARCHAR)
+        .add("c", MinorType.FLOAT8)
+        .buildSchema();
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, schema1);
+
+      // Just use the original schema.
+
+      assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
+      assertEquals(1, smoother.schemaVersion());
+    }
+
+    // Table 2: (a: nullable bigint, c), column ommitted, original schema preserved
+
+    final TupleMetadata schema2 = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .add("c", MinorType.FLOAT8)
+        .buildSchema();
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, schema2);
+      assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
+      assertEquals(1, smoother.schemaVersion());
+    }
+
+    // Table 3: (a, c, d), column added, must replan schema
+
+    final TupleMetadata schema3 = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addNullable("b", MinorType.VARCHAR)
+        .add("c", MinorType.FLOAT8)
+        .add("d", MinorType.INT)
+        .buildSchema();
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, schema3);
+      assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple)));
+      assertEquals(2, smoother.schemaVersion());
+    }
+
+    // Table 4: Drop a non-nullable column, must replan
+
+    final TupleMetadata schema4 = new SchemaBuilder()
+        .addNullable("a", MinorType.BIGINT)
+        .addNullable("b", MinorType.VARCHAR)
+        .buildSchema();
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, schema4);
+      assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple)));
+      assertEquals(3, smoother.schemaVersion());
+    }
+
+    // Table 5: (a: double), change type must replan schema
+
+    final TupleMetadata schema5 = new SchemaBuilder()
+        .addNullable("a", MinorType.FLOAT8)
+        .addNullable("b", MinorType.VARCHAR)
+        .buildSchema();
+    {
+      final ResolvedRow rootTuple = doResolve(smoother, schema5);
+      assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple)));
+       assertEquals(4, smoother.schemaVersion());
+    }
+  }
+
+  /**
+   * A SELECT * query uses the schema of the table as the output schema.
+   * This is trivial when the scanner has one table. But, if two or more
+   * tables occur, then things get interesting. The first table sets the
+   * schema. The second table then has:
+   * <ul>
+   * <li>The same schema, trivial case.</li>
+   * <li>A subset of the first table. The type of the "missing" column
+   * from the first table is used for a null column in the second table.</li>
+   * <li>A superset or disjoint set of the first schema. This triggers a hard schema
+   * change.</li>
+   * </ul>
+   * <p>
+   * It is an open question whether previous columns should be preserved on
+   * a hard reset. For now, the code implements, and this test verifies, that a
+   * hard reset clears the "memory" of prior schemas.
+   */
+
+  @Test
+  public void testWildcardSmoothing() {
+    final ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator());
+    projector.enableSchemaSmoothing(true);
+    projector.build(RowSetTestUtils.projectAll());
+
+    final TupleMetadata firstSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR, 10)
+        .addNullable("c", MinorType.BIGINT)
+        .buildSchema();
+    final TupleMetadata subsetSchema = new SchemaBuilder()
+        .addNullable("b", MinorType.VARCHAR, 10)
+        .add("a", MinorType.INT)
+        .buildSchema();
+    final TupleMetadata disjointSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR, 10)
+        .add("d", MinorType.VARCHAR)
+        .buildSchema();
+
+    final SchemaTracker tracker = new SchemaTracker();
+    int schemaVersion;
+    {
+      // First table, establishes the baseline
+      // ... FROM table 1
+
+      final ReaderSchemaOrchestrator reader = projector.startReader();
+      final ResultSetLoader loader = reader.makeTableLoader(firstSchema);
+
+      reader.startBatch();
+      loader.writer()
+          .addRow(10, "fred", 110L)
+          .addRow(20, "wilma", 110L);
+      reader.endBatch();
+
+      tracker.trackSchema(projector.output());
+      schemaVersion = tracker.schemaVersion();
+
+      final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
+          .addRow(10, "fred", 110L)
+          .addRow(20, "wilma", 110L)
+          .build();
+      new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(projector.output()));
+    }
+    {
+      // Second table, same schema, the trivial case
+      // ... FROM table 2
+
+      final ReaderSchemaOrchestrator reader = projector.startReader();
+      final ResultSetLoader loader = reader.makeTableLoader(firstSchema);
+
+      reader.startBatch();
+      loader.writer()
+          .addRow(70, "pebbles", 770L)
+          .addRow(80, "hoppy", 880L);
+      reader.endBatch();
+
+      tracker.trackSchema(projector.output());
+      assertEquals(schemaVersion, tracker.schemaVersion());
+
+      final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
+          .addRow(70, "pebbles", 770L)
+          .addRow(80, "hoppy", 880L)
+          .build();
+      new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(projector.output()));
+    }
+    {
+      // Third table: subset schema of first two
+      // ... FROM table 3
+
+      final ReaderSchemaOrchestrator reader = projector.startReader();
+      final ResultSetLoader loader = reader.makeTableLoader(subsetSchema);
+
+      reader.startBatch();
+      loader.writer()
+          .addRow("bambam", 30)
+          .addRow("betty", 40);
+      reader.endBatch();
+
+      tracker.trackSchema(projector.output());
+      assertEquals(schemaVersion, tracker.schemaVersion());
+
+      final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
+          .addRow(30, "bambam", null)
+          .addRow(40, "betty", null)
+          .build();
+      new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(projector.output()));
+    }
+    {
+      // Fourth table: disjoint schema, cases a schema reset
+      // ... FROM table 4
+
+      final ReaderSchemaOrchestrator reader = projector.startReader();
+      final ResultSetLoader loader = reader.makeTableLoader(disjointSchema);
+
+      reader.startBatch();
+      loader.writer()
+          .addRow(50, "dino", "supporting")
+          .addRow(60, "barney", "main");
+      reader.endBatch();
+
+      tracker.trackSchema(projector.output());
+      assertNotEquals(schemaVersion, tracker.schemaVersion());
+
+      final SingleRowSet expected = fixture.rowSetBuilder(disjointSchema)
+          .addRow(50, "dino", "supporting")
+          .addRow(60, "barney", "main")
+          .build();
+      new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(projector.output()));
+    }
+
+    projector.close();
+  }
+
+  // TODO: Test schema smoothing with repeated
+  // TODO: Test hard schema change
+  // TODO: Typed null column tests (resurrect)
+  // TODO: Test maps and arrays of maps
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services