You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/08/03 18:25:12 UTC

[GitHub] ilooner closed pull request #1244: DRILL-6373: Refactor Result Set Loader for Union, List support

ilooner closed pull request #1244: DRILL-6373: Refactor Result Set Loader for Union, List support
URL: https://github.com/apache/drill/pull/1244
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
index a63d9ad8ca6..43f255ebe49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
@@ -112,6 +112,16 @@
    */
 
   RowSetLoader writer();
+
+  /**
+   * Reports whether the loader is in a writable state. The writable state
+   * occurs only when a batch has been started, and before that batch
+   * becomes full.
+   *
+   * @return true if the client can add a row to the loader, false if
+   * not
+   */
+
   boolean writeable();
 
   /**
@@ -135,28 +145,52 @@
   ResultSetLoader setRow(Object...values);
 
   /**
-   * Return the output container, primarily to obtain the schema
-   * and set of vectors. Depending on when this is called, the
-   * data may or may not be populated: call
-   * {@link #harvest()} to obtain the container for a batch.
+   * Requests to skip the given number of rows. Returns the number of rows
+   * actually skipped (which is limited by batch count.)
    * <p>
-   * This method is useful when the schema is known and fixed.
-   * After declaring the schema, call this method to get the container
-   * that holds the vectors for use in planning projection, etc.
+   * Used in <tt>SELECT COUNT(*)</tt> style queries when the downstream
+   * operators want just record count, but no actual rows.
    * <p>
-   * If the result set schema changes, then a call to this method will
-   * return the latest schema. But, if the schema changes during the
-   * overflow row, then this method will not see those changes until
-   * after harvesting the current batch. (This avoid the appearance
-   * of phantom columns in the output since the new column won't
-   * appear until the next batch.)
+   * Also used to fill in a batch of only null values (such a filling
+   * in a set of null vectors for unprojected columns.)
+   *
+   * @param requestedCount
+   *          the number of rows to skip
+   * @return the actual number of rows skipped, which may be less than the
+   *         requested amount. If less, the client should call this method for
+   *         multiple batches until the requested count is reached
+   */
+
+  int skipRows(int requestedCount);
+
+  /**
+   * Reports if this is an empty projection such as occurs in a
+   * <tt>SELECT COUNT(*)</tt> query. If the projection is empty, then
+   * the downstream needs only the row count set in each batch, but no
+   * actual vectors will be created. In this case, the client can do
+   * the work to populate rows (the data will be discarded), or can call
+   * {@link #skipRows(int)} to skip over the number of rows that would
+   * have been read if any data had been projected.
    * <p>
-   * Never count on the data in the container; it may be empty, half
-   * written, or inconsistent. Always call
-   * {@link #harvest()} to obtain the container for a batch.
+   * Note that the empty schema case can also occur if the project list
+   * from the <tt>SELECT</tt> clause is disjoint from the table schema.
+   * For example, <tt>SELECT a, b</tt> from a table with schema
+   * <tt>(c, d)</tt>.
    *
-   * @return the output container including schema and value
-   * vectors
+   * @return true if no columns are actually projected, false if at
+   * least one column is projected
+   */
+
+  boolean isProjectionEmpty();
+
+  /**
+   * Returns the output container which holds (or will hold) batches
+   * from this loader. For use when the container is needed prior
+   * to "harvesting" a batch. The data is not valid until
+   * {@link #harvest()} is called, and is no longer valid once
+   * {@link #startBatch()} is called.
+   *
+   * @return container used to publish results from this loader
    */
 
   VectorContainer outputContainer();
@@ -193,6 +227,15 @@
 
   TupleMetadata harvestSchema();
 
+  /**
+   * Peek at the internal vector cache for readers that need a bit of help
+   * resolving types based on what was previously seen.
+   *
+   * @return real or dummy vector cache
+   */
+
+  ResultVectorCache vectorCache();
+
   /**
    * Called after all rows are returned, whether because no more data is
    * available, or the caller wishes to cancel the current row batch
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java
index 6e32b5d3d7c..441d3995e23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.rowSet;
 
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.ValueVector;
@@ -30,4 +31,7 @@
 public interface ResultVectorCache {
   BufferAllocator allocator();
   ValueVector addOrGet(MaterializedField colSchema);
+  MajorType getType(String name);
+  boolean isPermissive();
+  ResultVectorCache childCache(String colName);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java
index 070e9a9180a..588dd702053 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java
@@ -76,10 +76,22 @@
    *
    * @param values
    *          variable-length argument list of column values
+   * @return this writer
    */
 
   RowSetLoader addRow(Object... values);
 
+  /**
+   * Similar to {@link #addRow(Object...)}, but for the odd case in which a
+   * row consists of a single column that is an object array (such as for
+   * a list or map) and so is ambiguous.
+   *
+   * @param value value of the one and only column
+   * @return this writer
+   */
+
+  RowSetLoader addSingleCol(Object value);
+
   /**
    * Indicates that no more rows fit into the current row batch and that the row
    * batch should be harvested and sent downstream. Any overflow row is
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/BuildFromSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/BuildFromSchema.java
new file mode 100644
index 00000000000..df2abf171f0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/BuildFromSchema.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * Build the set of writers from a defined schema. Uses the same
+ * mechanism as dynamic schema: walks the schema tree adding each
+ * column, then recursively adding the contents of maps and variants.
+ * <p>
+ * Recursion is much easier if we can go bottom-up. But, writers
+ * require top-down construction.
+ */
+
+public class BuildFromSchema {
+
+  private interface ParentShim {
+    ObjectWriter add(ColumnMetadata colSchema);
+  }
+
+  private static class TupleShim implements ParentShim {
+    private final TupleWriter writer;
+
+    public TupleShim(TupleWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public ObjectWriter add(ColumnMetadata colSchema) {
+      int index = writer.addColumn(colSchema);
+      return writer.column(index);
+    }
+  }
+
+  /**
+   * When creating a schema up front, provide the schema of the desired tuple,
+   * then build vectors and writers to match. Allows up-front schema definition
+   * in addition to on-the-fly schema creation handled elsewhere.
+   *
+   * @param schema desired tuple schema to be materialized
+   */
+
+  public void buildTuple(TupleWriter writer, TupleMetadata schema) {
+    ParentShim tupleShim = new TupleShim(writer);
+    for (int i = 0; i < schema.size(); i++) {
+      ColumnMetadata colSchema = schema.metadata(i);
+      buildColumn(tupleShim, colSchema);
+    }
+  }
+
+  private void buildColumn(ParentShim parent, ColumnMetadata colSchema) {
+
+    if (colSchema.isMap()) {
+      buildMap(parent, colSchema);
+    } else {
+      buildPrimitive(parent, colSchema);
+    }
+  }
+
+  private boolean isSingleList(ColumnMetadata colSchema) {
+    return colSchema.isVariant() && colSchema.isArray() && colSchema.variantSchema().isSingleType();
+  }
+
+  private void buildPrimitive(ParentShim parent, ColumnMetadata colSchema) {
+    parent.add(colSchema);
+  }
+
+  private void buildMap(ParentShim parent, ColumnMetadata colSchema) {
+    ObjectWriter colWriter = parent.add(colSchema.cloneEmpty());
+    expandMap(colWriter, colSchema);
+  }
+
+  private void expandMap(ObjectWriter colWriter, ColumnMetadata colSchema) {
+    if (colSchema.isArray()) {
+      buildTuple(colWriter.array().tuple(), colSchema.mapSchema());
+    } else {
+      buildTuple(colWriter.tuple(), colSchema.mapSchema());
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
new file mode 100644
index 00000000000..6c39ea2a20b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.ArrayList;
+
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.PrimitiveColumnState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.SimpleVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapArrayState;
+import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapColumnState;
+import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.TupleState.SingleMapState;
+import org.apache.drill.exec.record.metadata.AbstractColumnMetadata;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter.TupleObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
+import org.apache.drill.exec.vector.accessor.writer.MapWriter;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+/**
+ * Algorithms for building a column given a metadata description of the column and
+ * the parent context that will hold the column.
+ * <p>
+ * Does not support recursive column creation. For the most part, composite columns
+ * (maps, map arrays, unions and lists) must start empty. Build the composite first,
+ * then add its members using the writer for the column. This ensures a uniform API
+ * for adding columns whether done dynamically at read time or statically at create
+ * time.
+ */
+
+public class ColumnBuilder {
+
+  private ColumnBuilder() { }
+
+  /**
+   * Implementation of the work to add a new column to this tuple given a
+   * schema description of the column.
+   *
+   * @param columnSchema schema of the column
+   * @return writer for the new column
+   */
+
+  public static ColumnState buildColumn(ContainerState parent, ColumnMetadata columnSchema) {
+
+    // Indicate projection in the metadata.
+
+    ((AbstractColumnMetadata) columnSchema).setProjected(
+        parent.projectionType(columnSchema.name()) != ProjectionType.UNPROJECTED);
+
+    // Build the column
+
+    switch (columnSchema.structureType()) {
+    case TUPLE:
+      return buildMap(parent, columnSchema);
+    default:
+      return buildPrimitive(parent, columnSchema);
+    }
+  }
+
+  /**
+   * Build a primitive column. Check if the column is projected. If not,
+   * allocate a dummy writer for the column. If projected, then allocate
+   * a vector, a writer, and the column state which binds the two together
+   * and manages the column.
+   *
+   * @param columnSchema schema of the new primitive column
+   * @return column state for the new column
+   */
+
+  @SuppressWarnings("resource")
+  private static ColumnState buildPrimitive(ContainerState parent, ColumnMetadata columnSchema) {
+    ValueVector vector;
+    if (columnSchema.isProjected()) {
+
+      // Create the vector for the column.
+
+      vector = parent.vectorCache().addOrGet(columnSchema.schema());
+
+      // In permissive mode, the mode or precision of the vector may differ
+      // from that requested. Update the schema to match.
+
+      if (parent.vectorCache().isPermissive() && ! vector.getField().isEquivalent(columnSchema.schema())) {
+        columnSchema = ((PrimitiveColumnMetadata) columnSchema).mergeWith(vector.getField());
+      }
+    } else {
+
+      // Column is not projected. No materialized backing for the column.
+
+      vector = null;
+    }
+
+    // Create the writer.
+
+    AbstractObjectWriter colWriter = ColumnWriterFactory.buildColumnWriter(columnSchema, vector);
+
+    // Build the vector state which manages the vector.
+
+    VectorState vectorState;
+    if (vector == null) {
+      vectorState = new NullVectorState();
+    } else if (columnSchema.isArray()) {
+      vectorState = new RepeatedVectorState(colWriter.array(), (RepeatedValueVector) vector);
+    } else if (columnSchema.isNullable()) {
+      vectorState = new NullableVectorState(
+          colWriter,
+          (NullableVector) vector);
+    } else {
+      vectorState = SimpleVectorState.vectorState(columnSchema,
+            colWriter.scalar(), vector);
+    }
+
+    // Create the column state which binds the vector and writer together.
+
+    return new PrimitiveColumnState(parent.loader(), colWriter,
+        vectorState);
+  }
+
+  /**
+   * Build a new map (single or repeated) column. Except for maps nested inside
+   * of unions, no map vector is created
+   * here, instead we create a tuple state to hold the columns, and defer the
+   * map vector (or vector container) until harvest time.
+   *
+   * @param columnSchema description of the map column
+   * @return column state for the map column
+   */
+
+  private static ColumnState buildMap(ContainerState parent, ColumnMetadata columnSchema) {
+
+    // When dynamically adding columns, must add the (empty)
+    // map by itself, then add columns to the map via separate
+    // calls.
+
+    assert columnSchema.isMap();
+    assert columnSchema.mapSchema().size() == 0;
+
+    // Create the vector, vector state and writer.
+
+    if (columnSchema.isArray()) {
+      return buildMapArray(parent, columnSchema);
+    } else {
+      return buildSingleMap(parent, columnSchema);
+    }
+  }
+
+  @SuppressWarnings("resource")
+  private static ColumnState buildSingleMap(ContainerState parent, ColumnMetadata columnSchema) {
+    MapVector vector;
+    VectorState vectorState;
+    if (columnSchema.isProjected()) {
+
+      // Don't get the map vector from the vector cache. Map vectors may
+      // have content that varies from batch to batch. Only the leaf
+      // vectors can be cached.
+
+      assert columnSchema.mapSchema().isEmpty();
+      vector = new MapVector(columnSchema.schema(), parent.loader().allocator(), null);
+      vectorState = new MapVectorState(vector, new NullVectorState());
+    } else {
+      vector = null;
+      vectorState = new NullVectorState();
+    }
+    TupleObjectWriter mapWriter = MapWriter.buildMap(columnSchema,
+        vector, new ArrayList<AbstractObjectWriter>());
+    SingleMapState mapState = new SingleMapState(parent.loader(),
+        parent.vectorCache().childCache(columnSchema.name()),
+        parent.projectionSet().mapProjection(columnSchema.name()));
+    return new MapColumnState(mapState, mapWriter, vectorState, parent.isVersioned());
+  }
+
+  @SuppressWarnings("resource")
+  private static ColumnState buildMapArray(ContainerState parent, ColumnMetadata columnSchema) {
+
+    // Create the map's offset vector.
+
+    RepeatedMapVector mapVector;
+    UInt4Vector offsetVector;
+    if (columnSchema.isProjected()) {
+
+      // Creating the map vector will create its contained vectors if we
+      // give it a materialized field with children. So, instead pass a clone
+      // without children so we can add them.
+
+      ColumnMetadata mapColSchema = columnSchema.cloneEmpty();
+
+      // Don't get the map vector from the vector cache. Map vectors may
+      // have content that varies from batch to batch. Only the leaf
+      // vectors can be cached.
+
+      assert columnSchema.mapSchema().isEmpty();
+      mapVector = new RepeatedMapVector(mapColSchema.schema(),
+          parent.loader().allocator(), null);
+      offsetVector = mapVector.getOffsetVector();
+    } else {
+      mapVector = null;
+      offsetVector = null;
+    }
+
+    // Create the writer using the offset vector
+
+    AbstractObjectWriter writer = MapWriter.buildMapArray(
+        columnSchema, mapVector,
+        new ArrayList<AbstractObjectWriter>());
+
+    // Wrap the offset vector in a vector state
+
+    VectorState offsetVectorState;
+    if (columnSchema.isProjected()) {
+      offsetVectorState = new OffsetVectorState(
+          (((AbstractArrayWriter) writer.array()).offsetWriter()),
+          offsetVector,
+          writer.array().entry());
+    } else {
+      offsetVectorState = new NullVectorState();
+    }
+    VectorState mapVectorState = new MapVectorState(mapVector, offsetVectorState);
+
+    // Assemble it all into the column state.
+
+    MapArrayState mapState = new MapArrayState(parent.loader(),
+        parent.vectorCache().childCache(columnSchema.name()),
+        parent.projectionSet().mapProjection(columnSchema.name()));
+    return new MapColumnState(mapState, writer, mapVectorState, parent.isVersioned());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
index 7fae5ada580..acc9556f84f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
@@ -17,20 +17,15 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
-import java.util.ArrayList;
-
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
-import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapState;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
-import org.apache.drill.exec.vector.accessor.writer.MapWriter;
-import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
 
 /**
  * Represents the write-time state for a column including the writer and the (optional)
@@ -42,114 +37,94 @@
  * <p>
  * Different columns need different kinds of vectors: a data vector, possibly an offset
  * vector, or even a non-existent vector. The {@link VectorState} class abstracts out
- * these diffrences.
+ * these differences.
  */
 
 public abstract class ColumnState {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnState.class);
 
-  public static abstract class BaseMapColumnState extends ColumnState {
-    protected final MapState mapState;
+  /**
+   * Primitive (non-map) column state. Handles all three cardinalities.
+   * Column metadata is hosted on the writer.
+   */
 
-    public BaseMapColumnState(ResultSetLoaderImpl resultSetLoader,
-         AbstractObjectWriter writer, VectorState vectorState,
-         ProjectionSet projectionSet) {
-      super(resultSetLoader, writer, vectorState);
-      mapState = new MapState(resultSetLoader, this, projectionSet);
+  public static class PrimitiveColumnState extends ColumnState implements ColumnWriterListener {
+
+    public PrimitiveColumnState(LoaderInternals loader,
+        AbstractObjectWriter colWriter,
+        VectorState vectorState) {
+      super(loader, colWriter, vectorState);
+      ScalarWriter scalarWriter;
+      if (colWriter.type() == ObjectType.ARRAY) {
+        scalarWriter = writer.array().scalar();
+      } else {
+        scalarWriter = writer.scalar();
+      }
+      ((AbstractScalarWriter) scalarWriter).bindListener(this);
     }
 
     @Override
-    public void rollover() {
-      super.rollover();
-      mapState.rollover();
+    public boolean canExpand(ScalarWriter writer, int delta) {
+      return loader.canExpand(delta);
     }
 
     @Override
-    public void startBatch() {
-      super.startBatch();
-      mapState.startBatch();
+    public void overflowed(ScalarWriter writer) {
+      loader.overflowed();
     }
 
+    /**
+     * Get the output schema. For a primitive (non-structured) column,
+     * the output schema is the same as the internal schema.
+     */
+
     @Override
-    public void harvestWithLookAhead() {
-      super.harvestWithLookAhead();
-      mapState.harvestWithLookAhead();
-    }
+    public ColumnMetadata outputSchema() { return schema(); }
 
     @Override
-    public void close() {
-      super.close();
-      mapState.close();
+    public void dump(HierarchicalFormatter format) {
+      // TODO Auto-generated method stub
     }
-
-    public MapState mapState() { return mapState; }
   }
 
-  public static class MapColumnState extends BaseMapColumnState {
+  public static abstract class BaseContainerColumnState extends ColumnState {
 
-    public MapColumnState(ResultSetLoaderImpl resultSetLoader,
-        ColumnMetadata columnSchema, MapVector mapVector,
-        ProjectionSet projectionSet) {
-      super(resultSetLoader,
-          MapWriter.buildMap(columnSchema, mapVector,
-              new ArrayList<AbstractObjectWriter>()),
-          new NullVectorState(),
-          projectionSet);
+    public BaseContainerColumnState(LoaderInternals loader,
+        AbstractObjectWriter writer, VectorState vectorState) {
+      super(loader, writer, vectorState);
     }
 
+    public abstract ContainerState container();
+
     @Override
     public void updateCardinality(int cardinality) {
       super.updateCardinality(cardinality);
-      mapState.updateCardinality(cardinality);
+      container().updateCardinality();
     }
-  }
-
-  public static class MapArrayColumnState extends BaseMapColumnState {
 
-    public MapArrayColumnState(ResultSetLoaderImpl resultSetLoader,
-        AbstractObjectWriter writer,
-        VectorState vectorState,
-        ProjectionSet projectionSet) {
-      super(resultSetLoader, writer,
-          vectorState,
-          projectionSet);
+    @Override
+    public void startBatch(boolean schemaOnly) {
+      super.startBatch(schemaOnly);
+      container().startBatch(schemaOnly);
     }
 
-    public static MapArrayColumnState build(ResultSetLoaderImpl resultSetLoader,
-        ColumnMetadata columnSchema,
-        ProjectionSet projectionSet) {
-
-      // Create the map's offset vector.
-
-      UInt4Vector offsetVector = new UInt4Vector(
-          BaseRepeatedValueVector.OFFSETS_FIELD,
-          resultSetLoader.allocator());
-
-      // Create the writer using the offset vector
-
-      AbstractObjectWriter writer = MapWriter.buildMapArray(
-          columnSchema, offsetVector,
-          new ArrayList<AbstractObjectWriter>());
-
-      // Wrap the offset vector in a vector state
-
-      VectorState vectorState = new OffsetVectorState(
-            ((AbstractArrayWriter) writer.array()).offsetWriter(),
-            offsetVector,
-            (AbstractObjectWriter) writer.array().entry());
-
-      // Assemble it all into the column state.
+    @Override
+    public void rollover() {
+      super.rollover();
+      container().rollover();
+    }
 
-      return new MapArrayColumnState(resultSetLoader,
-                  writer, vectorState, projectionSet);
+    @Override
+    public void harvestWithLookAhead() {
+      super.harvestWithLookAhead();
+      container().harvestWithLookAhead();
     }
 
     @Override
-    public void updateCardinality(int cardinality) {
-      super.updateCardinality(cardinality);
-      int childCardinality = cardinality * schema().expectedElementCount();
-      mapState.updateCardinality(childCardinality);
+    public void close() {
+      super.close();
+      container().close();
     }
   }
 
@@ -195,7 +170,7 @@ public void updateCardinality(int cardinality) {
     NEW_LOOK_AHEAD
   }
 
-  protected final ResultSetLoaderImpl resultSetLoader;
+  protected final LoaderInternals loader;
   protected final int addVersion;
   protected final VectorState vectorState;
   protected State state;
@@ -210,27 +185,28 @@ public void updateCardinality(int cardinality) {
    * vector.
    */
 
-  protected int outerCardinality;
+  protected int cardinality;
+  protected int outputIndex = -1;
 
-  public ColumnState(ResultSetLoaderImpl resultSetLoader,
+  public ColumnState(LoaderInternals loader,
       AbstractObjectWriter writer, VectorState vectorState) {
-    this.resultSetLoader = resultSetLoader;
+    this.loader = loader;
     this.vectorState = vectorState;
-    this.addVersion = resultSetLoader.bumpVersion();
-    state = resultSetLoader.hasOverflow() ?
+    this.addVersion = loader.bumpVersion();
+    state = loader.hasOverflow() ?
         State.NEW_LOOK_AHEAD : State.NORMAL;
     this.writer = writer;
   }
 
   public AbstractObjectWriter writer() { return writer; }
   public ColumnMetadata schema() { return writer.schema(); }
+  public VectorState vectorState() { return vectorState; }
 
-  public ValueVector vector() { return vectorState.vector(); }
+  public <T extends ValueVector> T vector() { return vectorState.vector(); }
 
   public void allocateVectors() {
-    assert outerCardinality != 0;
-    resultSetLoader.tallyAllocations(
-        vectorState.allocate(outerCardinality));
+    assert cardinality != 0;
+    loader.tallyAllocations(vectorState.allocate(cardinality));
   }
 
   /**
@@ -239,10 +215,12 @@ public void allocateVectors() {
    * active vector so we start writing where we left off.
    */
 
-  public void startBatch() {
+  public void startBatch(boolean schemaOnly) {
     switch (state) {
     case NORMAL:
-      resultSetLoader.tallyAllocations(vectorState.allocate(outerCardinality));
+      if (! schemaOnly) {
+        allocateVectors();
+      }
       break;
 
     case NEW_LOOK_AHEAD:
@@ -287,7 +265,7 @@ public void rollover() {
     // of thought to get right -- and, of course, completely defeats
     // the purpose of limiting vector size to avoid memory fragmentation...
 
-    if (resultSetLoader.writerIndex().vectorIndex() == 0) {
+    if (loader.rowIndex() == 0) {
       throw UserException
         .memoryError("A single column value is larger than the maximum allowed size of 16 MB")
         .build(logger);
@@ -295,7 +273,7 @@ public void rollover() {
 
     // Otherwise, do the roll-over to a look-ahead vector.
 
-    vectorState.rollover(outerCardinality);
+    vectorState.rollover(cardinality);
 
     // Remember that we did this overflow processing.
 
@@ -336,12 +314,31 @@ public void harvestWithLookAhead() {
     }
   }
 
-  public void close() {
-    vectorState.reset();
+  public boolean isProjected() {
+    return vectorState.isProjected();
   }
 
   public void updateCardinality(int cardinality) {
-    outerCardinality = cardinality;
+    this.cardinality = cardinality;
+  }
+
+  public int outerCardinality() { return cardinality; }
+
+  public int innerCardinality() {
+    ColumnMetadata schema = schema();
+    return schema.isArray()
+        ? cardinality * schema.expectedElementCount()
+        : cardinality;
+  }
+
+  public void buildOutput(TupleState tupleState) {
+    outputIndex = tupleState.addOutputColumn(vector(), outputSchema());
+  }
+
+  public abstract ColumnMetadata outputSchema();
+
+  public void close() {
+    vectorState.close();
   }
 
   public void dump(HierarchicalFormatter format) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java
new file mode 100644
index 00000000000..fb4bb325519
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ContainerState.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.Collection;
+
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+
+/**
+ * Abstract representation of a container of vectors: a row, a map, a
+ * repeated map, a list or a union.
+ * <p>
+ * The container is responsible for creating new columns in response
+ * from a writer listener event. Column creation requires a set of
+ * four items:
+ * <ul>
+ * <li>The value vector (which may be null if the column is not
+ * projected.</li>
+ * <li>The writer for the column.</li>
+ * <li>A vector state that manages allocation, overflow, cleanup
+ * and other vector-specific tasks.</li>
+ * <li>A column state which orchestrates the above three items.</li>
+ * <ul>
+ */
+
+public abstract class ContainerState {
+
+  protected final LoaderInternals loader;
+  protected final RequestedTuple projectionSet;
+  protected ColumnState parentColumn;
+
+  /**
+   * Vector cache for this loader.
+   * @see {@link OptionBuilder#setVectorCache()}.
+   */
+
+  protected final ResultVectorCache vectorCache;
+
+  public ContainerState(LoaderInternals loader, ResultVectorCache vectorCache, RequestedTuple projectionSet) {
+    this.loader = loader;
+    this.vectorCache = vectorCache;
+    this.projectionSet = projectionSet;
+  }
+
+  public void bindColumnState(ColumnState parentState) {
+    this.parentColumn = parentState;
+  }
+
+  public abstract int innerCardinality();
+  protected abstract void addColumn(ColumnState colState);
+  protected abstract Collection<ColumnState> columnStates();
+
+  /**
+   * Reports whether this container is subject to version management. Version
+   * management adds columns to the output container at harvest time based on
+   * whether they should appear in the output batch.
+   *
+   * @return <tt>true</tt> if versioned
+   */
+
+  protected abstract boolean isVersioned();
+
+  protected LoaderInternals loader() { return loader; }
+  public ResultVectorCache vectorCache() { return vectorCache; }
+  public RequestedTuple projectionSet() { return projectionSet; }
+
+  public ProjectionType projectionType(String columnName) {
+    return projectionSet.projectionType(columnName);
+  }
+
+  public ColumnState addColumn(ColumnMetadata columnSchema) {
+
+    // Create the vector, writer and column state
+
+    ColumnState colState = ColumnBuilder.buildColumn(this, columnSchema);
+
+    // Add the column to this container
+
+    addColumn(colState);
+
+    // Set initial cardinality
+
+    colState.updateCardinality(innerCardinality());
+
+    // Allocate vectors if a batch is in progress.
+
+    if (loader().writeable()) {
+      colState.allocateVectors();
+    }
+    return colState;
+  }
+
+  /**
+   * In order to allocate the correct-sized vectors, the container must know
+   * its member cardinality: the number of elements in each row. This
+   * is 1 for a single map or union, but may be any number for a map array
+   * or a list. Then,
+   * this value is recursively pushed downward to compute the cardinality
+   * of lists of maps that contains lists of maps, and so on.
+   */
+
+  public void updateCardinality() {
+    int innerCardinality = innerCardinality();
+    assert innerCardinality > 0;
+    for (ColumnState colState : columnStates()) {
+      colState.updateCardinality(innerCardinality);
+    }
+  }
+
+  /**
+   * Start a new batch by shifting the overflow buffers back into the main
+   * write vectors and updating the writers.
+   */
+
+  public void startBatch(boolean schemaOnly) {
+    for (ColumnState colState : columnStates()) {
+      colState.startBatch(schemaOnly);
+    }
+  }
+
+  /**
+   * A column within the row batch overflowed. Prepare to absorb the rest of the
+   * in-flight row by rolling values over to a new vector, saving the complete
+   * vector for later. This column could have a value for the overflow row, or
+   * for some previous row, depending on exactly when and where the overflow
+   * occurs.
+   */
+
+  public void rollover() {
+    for (ColumnState colState : columnStates()) {
+      colState.rollover();
+    }
+  }
+
+  /**
+   * Writing of a row batch is complete, and an overflow occurred. Prepare the
+   * vector for harvesting to send downstream. Set aside the look-ahead vector
+   * and put the full vector buffer back into the active vector.
+   */
+
+  public void harvestWithLookAhead() {
+    for (ColumnState colState : columnStates()) {
+      colState.harvestWithLookAhead();
+    }
+  }
+
+  /**
+   * Clean up state (such as backup vectors) associated with the state
+   * for each vector.
+   */
+
+  public void close() {
+    for (ColumnState colState : columnStates()) {
+      colState.close();
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java
new file mode 100644
index 00000000000..5baab212c51
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/LoaderInternals.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+
+/**
+ * The internal view of the result set loader. Provides operations to manage the batch
+ * and batch schema.
+ */
+
+interface LoaderInternals {
+
+  /**
+   * Allocator to use when allocating buffers for vectors
+   * @return buffer allocator
+   */
+
+  BufferAllocator allocator();
+
+  /**
+   * Increments the schema version when adding a new column anywhere in
+   * the writer hierarchy.
+   *
+   * @return the new schema version
+   */
+
+  int bumpVersion();
+
+  /**
+   * Accumulate the initial vector allocation sizes.
+   *
+   * @param allocationBytes number of bytes allocated to a vector
+   * in the batch setup step
+   */
+
+  void tallyAllocations(int allocationBytes);
+
+  /**
+   * Reports whether the loader is in the overflow state. The overflow
+   * state occurs when a vector has become full, but before the batch
+   * is harvested.
+   *
+   * @return <tt>true</tt> if an overflow has occurred in the present
+   * row
+   */
+
+  boolean hasOverflow();
+
+  /**
+   * Return whether a vector within the current batch can expand. Limits
+   * are enforce only if a limit was provided in the options.
+   *
+   * @param delta increase in vector size
+   * @return true if the vector can expand, false if an overflow
+   * event should occur
+   */
+
+  boolean canExpand(int delta);
+
+  /**
+   * Indicates that an overflow has just occurred. Triggers the overflow
+   * mechanism. Upon return, the writer that triggered the overflow will
+   * find that it is now working with a new vector, and a new write
+   * position, that should allow saving of the in-flight value (unless
+   * that one value is larger than the maximum vector size.)
+   */
+
+  void overflowed();
+
+  /**
+   * Current writer row index.
+   *
+   * @return the current write row index
+   */
+
+  int rowIndex();
+
+  /**
+   * The desired number of rows in the output batch.
+   *
+   * @return the target row count
+   */
+
+  int targetRowCount();
+
+  /**
+   * Indicates if the loader is in a writable state. A writable state
+   * occurs when a batch has been started, before the batch overflows
+   * or is harvested.
+   *
+   * @return <tt>true if values can be written to vectors
+   */
+
+  boolean writeable();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java
deleted file mode 100644
index 2fcc813d7a7..00000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.rowSet.impl;
-
-/**
- * Represents a wildcard: SELECT * when used at the root tuple.
- * When used with maps, means selection of all map columns, either
- * implicitly, or because the map itself is selected.
- */
-
-public class NullProjectionSet implements ProjectionSet {
-
-  private boolean allProjected;
-
-  public NullProjectionSet(boolean allProjected) {
-    this.allProjected = allProjected;
-  }
-
-  @Override
-  public boolean isProjected(String colName) { return allProjected; }
-
-  @Override
-  public ProjectionSet mapProjection(String colName) {
-    return new NullProjectionSet(allProjected);
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java
index 930dc302f35..d56d7604b6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java
@@ -17,12 +17,17 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.ValueVector;
 
+/**
+ * A vector cache implementation which does not actually cache.
+ */
+
 public class NullResultVectorCacheImpl implements ResultVectorCache {
 
   private final BufferAllocator allocator;
@@ -38,4 +43,15 @@ public NullResultVectorCacheImpl(BufferAllocator allocator) {
   public ValueVector addOrGet(MaterializedField colSchema) {
     return TypeHelper.getNewVector(colSchema, allocator, null);
   }
+
+  @Override
+  public MajorType getType(String name) { return null; }
+
+  @Override
+  public boolean isPermissive() { return false; }
+
+  @Override
+  public ResultVectorCache childCache(String colName) {
+    return new NullResultVectorCacheImpl(allocator);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java
index 83727589240..c26da0eaead 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java
@@ -27,12 +27,11 @@
 
 public class NullVectorState implements VectorState {
 
-  @Override public int allocate(int cardinality) { return 0; }
-  @Override public void rollover(int cardinality) { }
-  @Override public void harvestWithLookAhead() { }
-  @Override public void startBatchWithLookAhead() { }
-  @Override public void reset() { }
-  @Override public ValueVector vector() { return null; }
+  /**
+   * Near-do-nothing state for a vector that requires no work to
+   * allocate or roll-over, but where we do want to at least track
+   * the vector itself. (Used for map and union pseudo-vectors.)
+   */
 
   public static class UnmanagedVectorState extends NullVectorState {
     ValueVector vector;
@@ -41,10 +40,19 @@ public UnmanagedVectorState(ValueVector vector) {
       this.vector = vector;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public ValueVector vector() { return vector; }
+    public <T extends ValueVector> T vector() { return (T) vector; }
   }
 
+  @Override public int allocate(int cardinality) { return 0; }
+  @Override public void rollover(int cardinality) { }
+  @Override public void harvestWithLookAhead() { }
+  @Override public void startBatchWithLookAhead() { }
+  @Override public void close() { }
+  @Override public <T extends ValueVector> T vector() { return null; }
+  @Override public boolean isProjected() { return false; }
+
   @Override
   public void dump(HierarchicalFormatter format) {
     format.startObject(this).endObject();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
index 7e8080fd81e..f9c2bff2eba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
@@ -17,44 +17,31 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
-import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.FixedWidthVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.SimpleVectorState;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
 import org.apache.drill.exec.vector.accessor.writer.NullableScalarWriter;
 
 public class NullableVectorState implements VectorState {
 
-  public static class BitsVectorState extends ValuesVectorState {
-
-    public BitsVectorState(ColumnMetadata schema, AbstractScalarWriter writer, ValueVector mainVector) {
-      super(schema, writer, mainVector);
-    }
-
-    @Override
-    public int allocateVector(ValueVector vector, int cardinality) {
-      ((FixedWidthVector) vector).allocateNew(cardinality);
-      return vector.getBufferSize();
-    }
-  }
-
   private final ColumnMetadata schema;
   private final NullableScalarWriter writer;
   private final NullableVector vector;
-  private final ValuesVectorState bitsState;
-  private final ValuesVectorState valuesState;
+  private final SimpleVectorState bitsState;
+  private final SimpleVectorState valuesState;
 
   public NullableVectorState(AbstractObjectWriter writer, NullableVector vector) {
     this.schema = writer.schema();
     this.vector = vector;
 
     this.writer = (NullableScalarWriter) writer.scalar();
-    bitsState = new BitsVectorState(schema, this.writer.bitsWriter(), vector.getBitsVector());
-    valuesState = new ValuesVectorState(schema, this.writer.baseWriter(), vector.getValuesVector());
+    bitsState = new FixedWidthVectorState(this.writer.bitsWriter(), vector.getBitsVector());
+    valuesState = SimpleVectorState.vectorState(this.writer.schema(),
+        this.writer.baseWriter(), vector.getValuesVector());
   }
 
   @Override
@@ -82,13 +69,17 @@ public void startBatchWithLookAhead() {
   }
 
   @Override
-  public void reset() {
-    bitsState.reset();
-    valuesState.reset();
+  public void close() {
+    bitsState.close();
+    valuesState.close();
   }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends ValueVector> T vector() { return (T) vector; }
+
   @Override
-  public ValueVector vector() { return vector; }
+  public boolean isProjected() { return true; }
 
   @Override
   public void dump(HierarchicalFormatter format) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
index 06b82a3212e..12f5f9e9d46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
@@ -22,6 +22,7 @@
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -35,6 +36,7 @@
   protected int vectorSizeLimit;
   protected int rowCountLimit;
   protected Collection<SchemaPath> projection;
+  protected RequestedTuple projectionSet;
   protected ResultVectorCache vectorCache;
   protected TupleMetadata schema;
   protected long maxBatchSize;
@@ -83,13 +85,16 @@ public OptionBuilder setBatchSizeLimit(int bytes) {
    * @return this builder
    */
 
-  // TODO: Use SchemaPath in place of strings.
-
   public OptionBuilder setProjection(Collection<SchemaPath> projection) {
     this.projection = projection;
     return this;
   }
 
+  public OptionBuilder setProjectionSet(RequestedTuple projectionSet) {
+    this.projectionSet = projectionSet;
+    return this;
+  }
+
   /**
    * Downstream operators require "vector persistence": the same vector
    * must represent the same column in every batch. For the scan operator,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java
deleted file mode 100644
index 55ccb748d5f..00000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.rowSet.impl;
-
-import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
-import org.apache.drill.exec.vector.NullableVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.ObjectType;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
-import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
-import org.apache.drill.exec.vector.complex.RepeatedValueVector;
-
-/**
- * Primitive (non-map) column state. Handles all three cardinalities.
- * Column metadata is hosted on the writer.
- */
-
-public class PrimitiveColumnState extends ColumnState implements ColumnWriterListener {
-
-  public PrimitiveColumnState(ResultSetLoaderImpl resultSetLoader,
-      AbstractObjectWriter colWriter,
-      VectorState vectorState) {
-    super(resultSetLoader, colWriter, vectorState);
-    ScalarWriter scalarWriter;
-    if (colWriter.type() == ObjectType.ARRAY) {
-      scalarWriter = writer.array().scalar();
-    } else {
-      scalarWriter = writer.scalar();
-    }
-    ((AbstractScalarWriter) scalarWriter).bindListener(this);
-  }
-
-  public static PrimitiveColumnState newPrimitive(
-      ResultSetLoaderImpl resultSetLoader,
-      ValueVector vector,
-      AbstractObjectWriter writer) {
-    VectorState vectorState;
-    if (vector == null) {
-      vectorState = new NullVectorState();
-    } else {
-      vectorState = new ValuesVectorState(
-          writer.schema(),
-          (AbstractScalarWriter) writer.scalar(),
-          vector);
-    }
-    return new PrimitiveColumnState(resultSetLoader, writer,
-        vectorState);
-  }
-
-  public static PrimitiveColumnState newNullablePrimitive(
-      ResultSetLoaderImpl resultSetLoader,
-      ValueVector vector,
-      AbstractObjectWriter writer) {
-    VectorState vectorState;
-    if (vector == null) {
-      vectorState = new NullVectorState();
-    } else {
-      vectorState = new NullableVectorState(
-          writer,
-          (NullableVector) vector);
-    }
-    return new PrimitiveColumnState(resultSetLoader, writer,
-        vectorState);
-  }
-
-  public static PrimitiveColumnState newPrimitiveArray(
-      ResultSetLoaderImpl resultSetLoader,
-      ValueVector vector,
-      AbstractObjectWriter writer) {
-    VectorState vectorState;
-    if (vector == null) {
-      vectorState = new NullVectorState();
-    } else {
-      vectorState = new RepeatedVectorState(writer, (RepeatedValueVector) vector);
-    }
-    return new PrimitiveColumnState(resultSetLoader, writer,
-        vectorState);
-  }
-
-  @Override
-  public void overflowed(ScalarWriter writer) {
-    resultSetLoader.overflowed();
-  }
-
-  @Override
-  public void dump(HierarchicalFormatter format) {
-    // TODO Auto-generated method stub
-  }
-
-  @Override
-  public boolean canExpand(ScalarWriter writer, int delta) {
-    return resultSetLoader.canExpand(delta);
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java
deleted file mode 100644
index 9ea118f277f..00000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.rowSet.impl;
-
-/**
- * Represents the set of columns projected for a tuple (row or map.)
- * The projected columns might themselves be columns, so returns a
- * projection set for such columns.
- * <p>
- * Three implementations exist:
- * <ul>
- * <li>Project all ({@link NullProjectionSet): used for a tuple when
- * all columns are projected. Example: the root tuple (the row) in
- * a <tt>SELECT *</tt> query.</li>
- * <li>Project none  (also {@link NullProjectionSet): used when no
- * columns are projected from a tuple, such as when a map itself is
- * not projected, so none of its member columns are projected.</li>
- * <li>Project some ({@link ProjectionSetImpl}: used in the
- * <tt>SELECT a, c, e</tt> case in which the query identifies which
- * columns to project (implicitly leaving out others, such as b and
- * d in our example.)</li>
- * </ul>
- * <p>
- * The result is that each tuple (row and map) has an associated
- * projection set which the code can query to determine if a newly
- * added column is wanted (and so should have a backing vector) or
- * is unwanted (and can just receive a dummy writer.)
- */
-
-interface ProjectionSet {
-  boolean isProjected(String colName);
-  ProjectionSet mapProjection(String colName);
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java
deleted file mode 100644
index e17f48644f4..00000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.rowSet.impl;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.PathSegment.NameSegment;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.map.CaseInsensitiveMap;
-
-/**
- * Represents an explicit projection at some tuple level.
- * <p>
- * A column is projected if it is explicitly listed in the selection list.
- * <p>
- * If a column is a map, then the projection for the map's columns is based on
- * two rules:
- * <ol>
- * <li>If the projection list includes at least one explicit mention of a map
- * member, then include only those columns explicitly listed.</li>
- * <li>If the projection at the parent level lists only the map column itself
- * (which the projection can't know is a map), then assume this implies all
- * columns, as if the entry where "map.*".</li>
- * </ol>
- * <p>
- * Examples:<br>
- * <code>m</code><br>
- * If m turns out to be a map, project all members of m.<br>
- * <code>m.a</code><br>
- * Column m must be a map. Project only column a.<br>
- * <code>m, m.a</code><br>
- * Tricky case. We interpret this as projecting only the "a" element of map m.
- * <p>
- * The projection set is build from a list of columns, represented as
- * {@link SchemaPath} objects, provided by the physical plan. The structure of
- * <tt>SchemaPath</tt> is a bit awkward:
- * <p>
- * <ul>
- * <li><tt>SchemaPath> is a wrapper for a column which directly holds the
- * <tt>NameSegment</tt> for the top-level column.</li>
- * <li><tt>NameSegment</tt> holds a name. This can be a top name such as
- * `a`, or parts of a compound name such as `a`.`b`. Each <tt>NameSegment</tt>
- * has a "child" that points to the option following parts of the name.</li>
- * <li><PathSegment</tt> is the base class for the parts of a name.</tt>
- * <li><tt>ArraySegment</tt> is the other kind of name part and represents
- * an array index such as the "[1]" in `columns`[1].</li>
- * <ul>
- * The parser here consumes only names, this mechanism does not consider
- * array indexes. As a result, there may be multiple projected columns that
- * map to the same projection here: `columns`[1] and `columns`[2] both map to
- * the name `columns`, for example.
- */
-
-public class ProjectionSetImpl implements ProjectionSet {
-
-  Set<String> projection = new HashSet<>();
-  Map<String, ProjectionSetImpl> mapProjections = CaseInsensitiveMap
-      .newHashMap();
-
-  @Override
-  public boolean isProjected(String colName) {
-    return projection.contains(colName.toLowerCase());
-  }
-
-  @Override
-  public ProjectionSet mapProjection(String colName) {
-    ProjectionSet mapProj = mapProjections.get(colName.toLowerCase());
-    if (mapProj != null) {
-      return mapProj;
-    }
-
-    // No explicit information for the map. Members inherit the
-    // same projection as the map itself.
-
-    return new NullProjectionSet(isProjected(colName));
-  }
-
-  /**
-   * Parse a projection list. The list should consist of a list of column
-   * names; any wildcards should have been processed by the caller. An
-   * empty or null list means everything is projected (that is, an
-   * empty list here is equivalent to a wildcard in the SELECT
-   * statement.)
-   *
-   * @param projList
-   * @return
-   */
-  public static ProjectionSet parse(Collection<SchemaPath> projList) {
-    if (projList == null || projList.isEmpty()) {
-      return new NullProjectionSet(true);
-    }
-    ProjectionSetImpl projSet = new ProjectionSetImpl();
-    for (SchemaPath col : projList) {
-      projSet.addSegment(col.getRootSegment());
-    }
-    return projSet;
-  }
-
-  private void addSegment(NameSegment rootSegment) {
-    String rootKey = rootSegment.getPath().toLowerCase();
-    projection.add(rootKey);
-    PathSegment child = rootSegment.getChild();
-    if (child == null) {
-      return;
-    }
-    if (child.isArray()) {
-      // Ignore the [x] array suffix.
-      return;
-    }
-    ProjectionSetImpl map = mapProjections.get(rootKey);
-    if (map == null) {
-      map = new ProjectionSetImpl();
-      mapProjections.put(rootKey, map);
-    }
-    map.addSegment((NameSegment) child);
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
index afc1caccf5f..8dd4839ce7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
@@ -18,12 +18,11 @@
 package org.apache.drill.exec.physical.rowSet.impl;
 
 import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
-import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.SimpleVectorState;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
@@ -33,14 +32,12 @@
  */
 
 public class RepeatedVectorState implements VectorState {
-  private final ColumnMetadata schema;
   private final AbstractArrayWriter arrayWriter;
   private final RepeatedValueVector vector;
   private final OffsetVectorState offsetsState;
-  private final ValuesVectorState valuesState;
+  private final SimpleVectorState valuesState;
 
-  public RepeatedVectorState(AbstractObjectWriter writer, RepeatedValueVector vector) {
-    this.schema = writer.schema();
+  public RepeatedVectorState(ArrayWriter writer, RepeatedValueVector vector) {
 
     // Get the repeated vector
 
@@ -49,9 +46,9 @@ public RepeatedVectorState(AbstractObjectWriter writer, RepeatedValueVector vect
     // Create the values state using the value (data) portion of the repeated
     // vector, and the scalar (value) portion of the array writer.
 
-    arrayWriter = (AbstractArrayWriter) writer.array();
-    AbstractScalarWriter colWriter = (AbstractScalarWriter) arrayWriter.scalar();
-    valuesState = new ValuesVectorState(schema, colWriter, vector.getDataVector());
+    arrayWriter = (AbstractArrayWriter) writer;
+    AbstractScalarWriter colWriter = (AbstractScalarWriter) writer.scalar();
+    valuesState = SimpleVectorState.vectorState(writer.schema(), colWriter, vector.getDataVector());
 
     // Create the offsets state with the offset vector portion of the repeated
     // vector, and the offset writer portion of the array writer.
@@ -59,11 +56,12 @@ public RepeatedVectorState(AbstractObjectWriter writer, RepeatedValueVector vect
     offsetsState = new OffsetVectorState(
         arrayWriter.offsetWriter(),
         vector.getOffsetVector(),
-        (AbstractObjectWriter) arrayWriter.entry());
+        colWriter);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public ValueVector vector() { return vector; }
+  public <T extends ValueVector> T vector() { return (T) vector; }
 
   @Override
   public int allocate(int cardinality) {
@@ -72,7 +70,7 @@ public int allocate(int cardinality) {
   }
 
   private int childCardinality(int cardinality) {
-    return cardinality * schema.expectedElementCount();
+    return cardinality * arrayWriter.schema().expectedElementCount();
   }
 
   /**
@@ -146,16 +144,19 @@ public void startBatchWithLookAhead() {
   }
 
   @Override
-  public void reset() {
-    offsetsState.reset();
-    valuesState.reset();
+  public void close() {
+    offsetsState.close();
+    valuesState.close();
   }
 
+  @Override
+  public boolean isProjected() { return true; }
+
   @Override
   public void dump(HierarchicalFormatter format) {
     format
       .startObject(this)
-      .attribute("schema", schema)
+      .attribute("schema", arrayWriter.schema())
       .attributeIdentity("writer", arrayWriter)
       .attributeIdentity("vector", vector)
       .attribute("offsetsState");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
index c3869799adf..c7c6fdc5d33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
@@ -17,26 +17,28 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
-import java.util.Collection;
-
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.physical.rowSet.impl.TupleState.RowState;
+import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 
 /**
- * Implementation of the result set loader.
+ * Implementation of the result set loader. Caches vectors
+ * for a row or map.
+ *
  * @see {@link ResultSetLoader}
  */
 
-public class ResultSetLoaderImpl implements ResultSetLoader {
+public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
 
   /**
    * Read-only set of options for the result set loader.
@@ -46,26 +48,35 @@
     public final int vectorSizeLimit;
     public final int rowCountLimit;
     public final ResultVectorCache vectorCache;
-    public final Collection<SchemaPath> projection;
+    public final RequestedTuple projectionSet;
     public final TupleMetadata schema;
     public final long maxBatchSize;
 
     public ResultSetOptions() {
       vectorSizeLimit = ValueVector.MAX_BUFFER_SIZE;
       rowCountLimit = DEFAULT_ROW_COUNT;
-      projection = null;
+      projectionSet = new ImpliedTupleRequest(true);
       vectorCache = null;
       schema = null;
       maxBatchSize = -1;
     }
 
     public ResultSetOptions(OptionBuilder builder) {
-      this.vectorSizeLimit = builder.vectorSizeLimit;
-      this.rowCountLimit = builder.rowCountLimit;
-      this.projection = builder.projection;
-      this.vectorCache = builder.vectorCache;
-      this.schema = builder.schema;
-      this.maxBatchSize = builder.maxBatchSize;
+      vectorSizeLimit = builder.vectorSizeLimit;
+      rowCountLimit = builder.rowCountLimit;
+      vectorCache = builder.vectorCache;
+      schema = builder.schema;
+      maxBatchSize = builder.maxBatchSize;
+
+      // If projection, build the projection map.
+      // The caller might have already built the map. If so,
+      // use it.
+
+      if (builder.projectionSet != null) {
+        projectionSet = builder.projectionSet;
+      } else {
+        projectionSet = RequestedTupleImpl.parse(builder.projection);
+      }
     }
 
     public void dump(HierarchicalFormatter format) {
@@ -73,12 +84,13 @@ public void dump(HierarchicalFormatter format) {
         .startObject(this)
         .attribute("vectorSizeLimit", vectorSizeLimit)
         .attribute("rowCountLimit", rowCountLimit)
-        .attribute("projection", projection)
+//        .attribute("projection", projection)
         .endObject();
     }
   }
 
   private enum State {
+
     /**
      * Before the first batch.
      */
@@ -191,13 +203,6 @@ public void dump(HierarchicalFormatter format) {
 
   private final RowSetLoaderImpl rootWriter;
 
-  /**
-   * Vector cache for this loader.
-   * @see {@link OptionBuilder#setVectorCache()}.
-   */
-
-  private final ResultVectorCache vectorCache;
-
   /**
    * Tracks the state of the row set loader. Handling vector overflow requires
    * careful stepping through a variety of states as the write proceeds.
@@ -225,14 +230,6 @@ public void dump(HierarchicalFormatter format) {
 
   private int harvestSchemaVersion;
 
-  /**
-   * Builds the harvest vector container that includes only the columns that
-   * are included in the harvest schema version. That is, it excludes columns
-   * added while writing the overflow row.
-   */
-
-  private VectorContainerBuilder containerBuilder;
-
   /**
    * Counts the batches harvested (sent downstream) from this loader. Does
    * not include the current, in-flight batch.
@@ -271,7 +268,7 @@ public void dump(HierarchicalFormatter format) {
 
   protected int accumulatedBatchSize;
 
-  protected final ProjectionSet projectionSet;
+  protected final RequestedTuple projectionSet;
 
   public ResultSetLoaderImpl(BufferAllocator allocator, ResultSetOptions options) {
     this.allocator = allocator;
@@ -279,19 +276,22 @@ public ResultSetLoaderImpl(BufferAllocator allocator, ResultSetOptions options)
     targetRowCount = options.rowCountLimit;
     writerIndex = new WriterIndexImpl(this);
 
+    // Set the projections
+
+    projectionSet = options.projectionSet;
+
+    // Determine the root vector cache
+
+    ResultVectorCache vectorCache;
     if (options.vectorCache == null) {
       vectorCache = new NullResultVectorCacheImpl(allocator);
     } else {
       vectorCache = options.vectorCache;
     }
 
-    // If projection, build the projection map.
-
-    projectionSet = ProjectionSetImpl.parse(options.projection);
-
     // Build the row set model depending on whether a schema is provided.
 
-    rootState = new RowState(this);
+    rootState = new RowState(this, vectorCache);
     rootWriter = rootState.rootWriter();
 
     // If no schema, columns will be added incrementally as they
@@ -304,21 +304,23 @@ public ResultSetLoaderImpl(BufferAllocator allocator, ResultSetOptions options)
       // won't be if known up front.
 
       logger.debug("Schema: " + options.schema.toString());
-      rootState.buildSchema(options.schema);
+      new BuildFromSchema().buildTuple(rootWriter, options.schema);
     }
   }
 
   private void updateCardinality() {
-    rootState.updateCardinality(targetRowCount());
+    rootState.updateCardinality();
   }
 
   public ResultSetLoaderImpl(BufferAllocator allocator) {
     this(allocator, new ResultSetOptions());
   }
 
+  @Override
   public BufferAllocator allocator() { return allocator; }
 
-  protected int bumpVersion() {
+  @Override
+  public int bumpVersion() {
 
     // Update the active schema version. We cannot update the published
     // schema version at this point because a column later in this same
@@ -341,17 +343,49 @@ protected int bumpVersion() {
   }
 
   @Override
-  public int schemaVersion() { return harvestSchemaVersion; }
+  public int schemaVersion() {
+    switch (state) {
+    case ACTIVE:
+    case IN_OVERFLOW:
+    case OVERFLOW:
+    case FULL_BATCH:
+
+      // Write in progress: use current writer schema
+
+      return activeSchemaVersion;
+    case HARVESTED:
+    case LOOK_AHEAD:
+    case START:
+
+      // Batch is published. Use harvest schema.
+
+      return harvestSchemaVersion;
+    default:
+
+      // Not really in a position to give a schema
+      // version.
+
+      throw new IllegalStateException("Unexpected state: " + state);
+    }
+  }
 
   @Override
   public void startBatch() {
+    startBatch(false);
+  }
+
+  public void startEmptyBatch() {
+    startBatch(true);
+  }
+
+  public void startBatch(boolean schemaOnly) {
     switch (state) {
     case HARVESTED:
     case START:
       logger.trace("Start batch");
       accumulatedBatchSize = 0;
       updateCardinality();
-      rootState.startBatch();
+      rootState.startBatch(schemaOnly);
       checkInitialAllocation();
 
       // The previous batch ended without overflow, so start
@@ -369,7 +403,7 @@ public void startBatch() {
       // a column-by-column basis, which is done by the visitor.
 
       logger.trace("Start batch after overflow");
-      rootState.startBatch();
+      rootState.startBatch(schemaOnly);
 
       // Note: no need to do anything with the writers; they were left
       // pointing to the correct positions in the look-ahead batch.
@@ -496,7 +530,7 @@ public boolean writeable() {
 
   private boolean isBatchActive() {
     return state == State.ACTIVE || state == State.OVERFLOW ||
-           state == State.FULL_BATCH ;
+           state == State.FULL_BATCH;
   }
 
   /**
@@ -531,7 +565,27 @@ public void setTargetRowCount(int rowCount) {
   @Override
   public int targetVectorSize() { return options.vectorSizeLimit; }
 
-  protected void overflowed() {
+  @Override
+  public int skipRows(int requestedCount) {
+
+    // Can only skip rows when a batch is active.
+
+    if (state != State.ACTIVE) {
+      throw new IllegalStateException("No batch is active.");
+    }
+
+    // Skip as many rows as the vector limit allows.
+
+    return writerIndex.skipRows(requestedCount);
+  }
+
+  @Override
+  public boolean isProjectionEmpty() {
+    return ! rootState.hasProjections();
+  }
+
+  @Override
+  public void overflowed() {
     logger.trace("Vector overflow");
 
     // If we see overflow when we are already handling overflow, it means
@@ -569,8 +623,6 @@ protected void overflowed() {
 
     updateCardinality();
 
-//    rootWriter.dump(new HierarchicalPrinter());
-
     // Wrap up the completed rows into a batch. Sets
     // vector value counts. The rollover data still exists so
     // it can be moved, but it is now past the recorded
@@ -585,7 +637,7 @@ protected void overflowed() {
     rootState.rollover();
 
     // Adjust writer state to match the new vector values. This is
-    // surprisingly easy if we not that the current row is shifted to
+    // surprisingly easy if we note that the current row is shifted to
     // the 0 position in the new vector, so we just shift all offsets
     // downward by the current row position at each repeat level.
 
@@ -612,7 +664,13 @@ protected void overflowed() {
     state = State.OVERFLOW;
   }
 
-  protected boolean hasOverflow() { return state == State.OVERFLOW; }
+  @Override
+  public boolean hasOverflow() { return state == State.OVERFLOW; }
+
+  @Override
+  public VectorContainer outputContainer() {
+    return rootState.outputContainer();
+  }
 
   @Override
   public VectorContainer harvest() {
@@ -631,9 +689,8 @@ public VectorContainer harvest() {
       throw new IllegalStateException("Unexpected state: " + state);
     }
 
-    // Build the output container
-
-    VectorContainer container = outputContainer();
+    rootState.updateOutput(harvestSchemaVersion);
+    VectorContainer container = rootState.outputContainer();
     container.setRecordCount(rowCount);
 
     // Finalize: update counts, set state.
@@ -659,20 +716,9 @@ private int harvestOverflowBatch() {
     return pendingRowCount;
   }
 
-  @Override
-  public VectorContainer outputContainer() {
-    // Build the output container.
-
-    if (containerBuilder == null) {
-      containerBuilder = new VectorContainerBuilder(this);
-    }
-    containerBuilder.update(harvestSchemaVersion);
-    return containerBuilder.container();
-  }
-
   @Override
   public TupleMetadata harvestSchema() {
-    return containerBuilder.schema();
+    return rootState.outputSchema();
   }
 
   @Override
@@ -702,18 +748,9 @@ public int totalRowCount() {
     return total;
   }
 
-  public ResultVectorCache vectorCache() { return vectorCache; }
   public RowState rootState() { return rootState; }
 
-  /**
-   * Return whether a vector within the current batch can expand. Limits
-   * are enforce only if a limit was provided in the options.
-   *
-   * @param delta increase in vector size
-   * @return true if the vector can expand, false if an overflow
-   * event should occur
-   */
-
+  @Override
   public boolean canExpand(int delta) {
     accumulatedBatchSize += delta;
     return state == State.IN_OVERFLOW ||
@@ -721,13 +758,7 @@ public boolean canExpand(int delta) {
            accumulatedBatchSize <= options.maxBatchSize;
   }
 
-  /**
-   * Accumulate the initial vector allocation sizes.
-   *
-   * @param allocationBytes number of bytes allocated to a vector
-   * in the batch setup step
-   */
-
+  @Override
   public void tallyAllocations(int allocationBytes) {
     accumulatedBatchSize += allocationBytes;
   }
@@ -772,4 +803,14 @@ public void dump(HierarchicalFormatter format) {
     rootWriter.dump(format);
     format.endObject();
   }
+
+  @Override
+  public ResultVectorCache vectorCache() {
+    return rootState.vectorCache();
+  }
+
+  @Override
+  public int rowIndex() {
+    return writerIndex().vectorIndex();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java
index c7288b241b5..1cd0545fb37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java
@@ -22,7 +22,9 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
@@ -88,20 +90,45 @@ public VectorState(String name) {
       this.name = name;
     }
 
-    public boolean satisfies(MaterializedField colSchema) {
+    public boolean satisfies(MaterializedField colSchema, boolean permissive) {
       if (vector == null) {
         return false;
       }
       MaterializedField vectorSchema = vector.getField();
-      return vectorSchema.getType().equals(colSchema.getType());
+      if (permissive) {
+        return colSchema.isPromotableTo(vectorSchema, true);
+      } else {
+        return Types.isEquivalent(vectorSchema.getType(),
+            colSchema.getType());
+      }
     }
   }
 
   private final BufferAllocator allocator;
-  private final Map<String, VectorState> vectors = new HashMap<>();
+
+  /**
+   * Permissive mode loosens the rules for finding a match.
+   * <ul>
+   * <li>A request for a non-nullable vector matches a nullable
+   * vector in the cache.</li>
+   * <li>A request for a smaller precision Varchar matches a
+   * larger precision Varchar in the cache.</li>
+   * </ul>
+   * When not in permissive mode, an exact match is required.
+   */
+
+  private final boolean permissiveMode;
+  private final Map<String, VectorState> vectors = CaseInsensitiveMap.newHashMap();
+  private Map<String, ResultVectorCacheImpl> children;
 
   public ResultVectorCacheImpl(BufferAllocator allocator) {
     this.allocator = allocator;
+    permissiveMode = false;
+  }
+
+  public ResultVectorCacheImpl(BufferAllocator allocator, boolean permissiveMode) {
+    this.allocator = allocator;
+    this.permissiveMode = permissiveMode;
   }
 
   @Override
@@ -146,7 +173,7 @@ public ValueVector addOrGet(MaterializedField colSchema) {
 
     // If the vector is found, and is of the right type, reuse it.
 
-    if (vs != null && vs.satisfies(colSchema)) {
+    if (vs != null && vs.satisfies(colSchema, permissiveMode)) {
       return vs.vector;
     }
 
@@ -169,6 +196,7 @@ public ValueVector addOrGet(MaterializedField colSchema) {
     return vs.vector;
   }
 
+  @Override
   public MajorType getType(String name) {
     VectorState vs = vectors.get(name);
     if (vs == null || vs.vector == null) {
@@ -182,5 +210,28 @@ public void close() {
       vs.vector.close();
     }
     vectors.clear();
+    if (children != null) {
+      for (ResultVectorCacheImpl child : children.values()) {
+        child.close();
+      }
+      children = null;
+    }
+  }
+
+  @Override
+  public boolean isPermissive() { return permissiveMode; }
+
+  @Override
+  public ResultVectorCache childCache(String colName) {
+    if (children == null) {
+      children = new HashMap<>();
+    }
+    String key = colName.toLowerCase();
+    ResultVectorCacheImpl child = children.get(key);
+    if (child == null) {
+      child = new ResultVectorCacheImpl(allocator);
+      children.put(key, child);
+    }
+    return child;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java
index 68edbee7a2b..41884c26b8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java
@@ -56,6 +56,16 @@ public RowSetLoader addRow(Object...values) {
     return this;
   }
 
+  @Override
+  public RowSetLoader addSingleCol(Object value) {
+    if (! start()) {
+      throw new IllegalStateException("Batch is full.");
+    }
+    set(0, value);
+    save();
+    return this;
+  }
+
   @Override
   public int rowIndex() { return rsLoader.writerIndex().vectorIndex(); }
 
@@ -92,7 +102,7 @@ public void endBatch() {
   }
 
   @Override
-  public boolean isFull( ) { return rsLoader.isFull(); }
+  public boolean isFull() { return rsLoader.isFull(); }
 
   @Override
   public int rowCount() { return rsLoader.rowCount(); }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
index 0efb6f1d4c1..cfb7130cece 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
@@ -17,16 +17,17 @@
  */
 package org.apache.drill.exec.physical.rowSet.impl;
 
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.accessor.WriterPosition;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
 import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter;
 
 /**
@@ -38,33 +39,11 @@
 
 public abstract class SingleVectorState implements VectorState {
 
-  /**
-   * State for a scalar value vector. The vector might be for a simple (non-array)
-   * vector, or might be the payload part of a scalar array (repeated scalar)
-   * vector.
-   */
+  public abstract static class SimpleVectorState extends SingleVectorState {
 
-  public static class ValuesVectorState extends SingleVectorState {
-
-    private final ColumnMetadata schema;
-
-    public ValuesVectorState(ColumnMetadata schema, AbstractScalarWriter writer, ValueVector mainVector) {
+    public SimpleVectorState(WriterPosition writer,
+        ValueVector mainVector) {
       super(writer, mainVector);
-      this.schema = schema;
-    }
-
-    @Override
-    public int allocateVector(ValueVector vector, int cardinality) {
-      if (schema.isVariableWidth()) {
-
-        // Cap the allocated size to the maximum.
-
-        int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) cardinality * schema.expectedWidth());
-        ((VariableWidthVector) vector).allocateNew(size, cardinality);
-      } else {
-        ((FixedWidthVector) vector).allocateNew(cardinality);
-      }
-      return vector.getBufferSize();
     }
 
     @Override
@@ -87,6 +66,51 @@ protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
     }
   }
 
+  /**
+   * State for a scalar value vector. The vector might be for a simple (non-array)
+   * vector, or might be the payload part of a scalar array (repeated scalar)
+   * vector.
+   */
+
+  public static class FixedWidthVectorState extends SimpleVectorState {
+
+     public FixedWidthVectorState(WriterPosition writer, ValueVector mainVector) {
+      super(writer, mainVector);
+    }
+
+    @Override
+    public int allocateVector(ValueVector vector, int cardinality) {
+      ((FixedWidthVector) vector).allocateNew(cardinality);
+      return vector.getAllocatedSize();
+    }
+  }
+
+  /**
+   * State for a scalar value vector. The vector might be for a simple (non-array)
+   * vector, or might be the payload part of a scalar array (repeated scalar)
+   * vector.
+   */
+
+  public static class VariableWidthVectorState extends SimpleVectorState {
+
+    private final ColumnMetadata schema;
+
+    public VariableWidthVectorState(ColumnMetadata schema, WriterPosition writer, ValueVector mainVector) {
+      super(writer, mainVector);
+      this.schema = schema;
+    }
+
+    @Override
+    public int allocateVector(ValueVector vector, int cardinality) {
+
+      // Cap the allocated size to the maximum.
+
+      final int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) cardinality * schema.expectedWidth());
+      ((VariableWidthVector) vector).allocateNew(size, cardinality);
+      return vector.getAllocatedSize();
+    }
+  }
+
   /**
    * Special case for an offset vector. Offset vectors are managed like any other
    * vector with respect to overflow and allocation. This means that the loader
@@ -97,14 +121,25 @@ protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
 
   public static class OffsetVectorState extends SingleVectorState {
 
-    private final AbstractObjectWriter childWriter;
+    /**
+     * The child writer used to determine positions on overflow.
+     * The repeated list vector defers creating the child until the
+     * child type is know so this field cannot be final. It will,
+     * however, change value only once: from null to a valid writer.
+     */
+
+    private WriterPosition childWriter;
 
     public OffsetVectorState(WriterPosition writer, ValueVector mainVector,
-        AbstractObjectWriter childWriter) {
+        WriterPosition childWriter) {
       super(writer, mainVector);
       this.childWriter = childWriter;
     }
 
+    public void setChildWriter(WriterPosition childWriter) {
+      this.childWriter = childWriter;
+    }
+
     @Override
     public int allocateVector(ValueVector toAlloc, int cardinality) {
       ((UInt4Vector) toAlloc).allocateNew(cardinality);
@@ -122,6 +157,8 @@ protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
         return;
       }
 
+      assert childWriter != null;
+
       // This is an offset vector. The data to copy is one greater
       // than the row index.
 
@@ -144,9 +181,9 @@ protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
       // for the current row. We must subtract that offset from each copied
       // value to adjust the offset for the destination.
 
-      UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) backupVector).getAccessor();
-      UInt4Vector.Mutator destMutator = ((UInt4Vector) mainVector).getMutator();
-      int offset = childWriter.rowStartIndex();
+      final UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) backupVector).getAccessor();
+      final UInt4Vector.Mutator destMutator = ((UInt4Vector) mainVector).getMutator();
+      final int offset = childWriter.rowStartIndex();
       int newIndex = 1;
       ResultSetLoaderImpl.logger.trace("Offset vector: copy {} values from {} to {} with offset {}",
           Math.max(0, sourceEndIndex - sourceStartIndex + 1),
@@ -159,6 +196,12 @@ protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
       for (int src = sourceStartIndex; src <= sourceEndIndex; src++, newIndex++) {
         destMutator.set(newIndex, sourceAccessor.get(src) - offset);
       }
+
+      // Getting offsets right was a pain. If you modify this code,
+      // you'll likely relive that experience. Enabling the next two
+      // lines will help reveal some of the mystery around offsets and their
+      // confusing off-by-one design.
+
 //      VectorPrinter.printOffsets((UInt4Vector) backupVector, sourceStartIndex - 1, sourceEndIndex - sourceStartIndex + 3);
 //      VectorPrinter.printOffsets((UInt4Vector) mainVector, 0, newIndex);
     }
@@ -173,8 +216,9 @@ public SingleVectorState(WriterPosition writer, ValueVector mainVector) {
     this.mainVector = mainVector;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public ValueVector vector() { return mainVector; }
+  public <T extends ValueVector> T vector() { return (T) mainVector; }
 
   @Override
   public int allocate(int cardinality) {
@@ -199,20 +243,21 @@ public int allocate(int cardinality) {
   @Override
   public void rollover(int cardinality) {
 
-    int sourceStartIndex = writer.rowStartIndex();
+    final int sourceStartIndex = writer.rowStartIndex();
 
     // Remember the last write index for the original vector.
     // This tells us the end of the set of values to move, while the
     // sourceStartIndex above tells us the start.
 
-    int sourceEndIndex = writer.lastWriteIndex();
+    final int sourceEndIndex = writer.lastWriteIndex();
 
     // Switch buffers between the backup vector and the writer's output
     // vector. Done this way because writers are bound to vectors and
     // we wish to keep the binding.
 
     if (backupVector == null) {
-      backupVector = TypeHelper.getNewVector(mainVector.getField(), mainVector.getAllocator(), null);
+      backupVector = TypeHelper.getNewVector(mainVector.getField(),
+          parseVectorType(mainVector), mainVector.getAllocator(), null);
     }
     assert cardinality > 0;
     allocateVector(backupVector, cardinality);
@@ -229,6 +274,37 @@ public void rollover(int cardinality) {
     // the retained values.
   }
 
+  /**
+   * The vector mechanism here relies on the vector metadata. However, if the
+   * main vector is nullable, it will contain a <code>values</code> vector which
+   * is required. But the <code>values</code> vector will carry metadata that
+   * declares it to be nullable. While this is clearly a bug, it is a bug that has
+   * become a "feature" and cannot be changed. This code works around this feature
+   * by parsing out the actual type of the vector.
+   *
+   * @param vector the vector to clone, the type of which may not match the
+   * metadata declared within that vector
+   * @return the actual major type of the vector
+   */
+
+  protected static MajorType parseVectorType(ValueVector vector) {
+    final MajorType purportedType = vector.getField().getType();
+    if (purportedType.getMode() != DataMode.OPTIONAL) {
+      return purportedType;
+    }
+
+    // For nullable vectors, the purported type can be wrong. The "outer"
+    // vector is nullable, but the internal "values" vector is required, though
+    // it carries a nullable type -- that is, the metadata lies.
+
+    if (vector instanceof NullableVector) {
+      return purportedType;
+    }
+    return purportedType.toBuilder()
+        .setMode(DataMode.REQUIRED)
+        .build();
+  }
+
   protected abstract void copyOverflow(int sourceStartIndex, int sourceEndIndex);
 
   /**
@@ -256,13 +332,24 @@ public void startBatchWithLookAhead() {
   }
 
   @Override
-  public void reset() {
+  public void close() {
     mainVector.clear();
     if (backupVector != null) {
       backupVector.clear();
     }
   }
 
+  @Override
+  public boolean isProjected() { return true; }
+
+  public static SimpleVectorState vectorState(ColumnMetadata schema, WriterPosition writer, ValueVector mainVector) {
+    if (schema.isVariableWidth()) {
+      return new VariableWidthVectorState(schema, writer, mainVector);
+    } else {
+      return new FixedWidthVectorState(writer, mainVector);
+    }
+  }
+
   @Override
   public void dump(HierarchicalFormatter format) {
     format
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
index bdf303ca3d0..ec87e38f6df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
@@ -18,28 +18,28 @@
 package org.apache.drill.exec.physical.rowSet.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
-import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseMapColumnState;
-import org.apache.drill.exec.physical.rowSet.impl.ColumnState.MapArrayColumnState;
-import org.apache.drill.exec.physical.rowSet.impl.ColumnState.MapColumnState;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseContainerColumnState;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.metadata.AbstractColumnMetadata;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
-import org.apache.drill.exec.record.metadata.ProjectionType;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ObjectWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
-import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
-import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
 
 /**
  * Represents the loader state for a tuple: a row or a map. This is "state" in
@@ -53,9 +53,188 @@
  * a variety of values. So, the "tuple" here is structural, not a specific
  * set of values, but rather the collection of vectors that hold tuple
  * values.
+ *
+ * Drill vector containers and maps are both tuples, but they irritatingly
+ * have completely different APIs for working with their child vectors.
+ * These classes are a proxy to wrap the two APIs to provide a common
+ * view for the use the result set builder and its internals.
+ *
+ * <h4>Output Container</h4>
+ *
+ * Builds the harvest vector container that includes only the columns that
+ * are included in the harvest schema version. That is, it excludes columns
+ * added while writing an overflow row.
+ * <p>
+ * Because a Drill row is actually a hierarchy, walks the internal hierarchy
+ * and builds a corresponding output hierarchy.
+ * <ul>
+ * <li>The root node is the row itself (vector container),</li>
+ * <li>Internal nodes are maps (structures),</li>
+ * <li>Leaf notes are primitive vectors (which may be arrays).</li>
+ * </ul>
+ * The basic algorithm is to identify the version of the output schema,
+ * then add any new columns added up to that version. This object maintains
+ * the output container across batches, meaning that updates are incremental:
+ * we need only add columns that are new since the last update. And, those new
+ * columns will always appear directly after all existing columns in the row
+ * or in a map.
+ * <p>
+ * As special case occurs when columns are added in the overflow row. These
+ * columns <i>do not</i> appear in the output container for the main part
+ * of the batch; instead they appear in the <i>next</i> output container
+ * that includes the overflow row.
+ * <p>
+ * Since the container here may contain a subset of the internal columns, an
+ * interesting case occurs for maps. The maps in the output container are
+ * <b>not</b> the same as those used internally. Since a map column can contain
+ * either one list of columns or another, the internal and external maps must
+ * differ. The set of child vectors (except for child maps) are shared.
  */
 
-public abstract class TupleState implements TupleWriterListener {
+public abstract class TupleState extends ContainerState
+  implements TupleWriterListener {
+
+  /**
+   * Represents a map column (either single or repeated). Includes maps that
+   * are top-level, nested within other maps, or nested inside a union.
+   * Schema management is a bit complex:
+   * <table border=1>
+   * <tr><th rowspan=2>Condition</th><th colspan=2>Action</th></tr>
+   * <tr><th>Outside of Union</th><th>Inside of Union<th></tr>
+   * <tr><td>Unprojected</td><td>N/A</td><td>Omitted from output</td></tr>
+   * <tr><td>Added in prior batch</td><td colspan=2>Included in output</td></tr>
+   * <tr><td>Added in present batch, before overflow</td>
+   *     <td colspan=2>Included in output</td></tr>
+   * <tr><td>Added in present batch, after overflow</td>
+   *     <td>Omitted from output this batch (added next batch)</td>
+   *     <td>Included in output</td></tr>
+   * </table>
+   * <p>
+   * The above rules say that, for maps in a union, the output schema
+   * is identical to the internal writer schema. But, for maps outside
+   * of union, the output schema is a subset of the internal schema with
+   * two types of omissions:
+   * <ul>
+   * <li>Unprojected columns</li>
+   * <li>Columns added after overflow</li>
+   * </ul
+   * <p>
+   * New columns can be added at any time for data readers that discover
+   * their schema as data is read (such as JSON). In this case, new columns
+   * always appear at the end of the map (remember, in Drill, a "map" is actually
+   * a structured: an ordered, named list of columns.) When looking for newly
+   * added columns, they will always be at the end.
+   */
+
+  public static class MapColumnState extends BaseContainerColumnState {
+    protected final MapState mapState;
+    protected boolean isVersioned;
+    protected final ColumnMetadata outputSchema;
+
+    public MapColumnState(MapState mapState,
+        AbstractObjectWriter writer,
+        VectorState vectorState,
+        boolean isVersioned) {
+      super(mapState.loader(), writer, vectorState);
+      this.mapState = mapState;
+      mapState.bindColumnState(this);
+      this.isVersioned = isVersioned;
+      if (isVersioned) {
+        outputSchema = schema().cloneEmpty();
+      } else {
+        outputSchema = schema();
+      }
+      mapState.bindOutputSchema(outputSchema.mapSchema());
+    }
+
+    public MapState mapState() { return mapState; }
+
+    @Override
+    public ContainerState container() { return mapState; }
+
+    @Override
+    public boolean isProjected() {
+      return mapState.hasProjections();
+    }
+
+    /**
+     * Indicate if this map is versioned. A versionable map has three attributes:
+     * <ol>
+     * <li>Columns can be unprojected. (Columns appear as writers for the client
+     * of the result set loader, but are not materialized and do not appear in
+     * the projected output container.</li>
+     * <li>Columns appear in the output only if added before the overflow row.</li>
+     * <li>As a result, the output schema is a subset of the internal input
+     * schema.</li>
+     * </ul>
+     * @return <tt>true</tt> if this map is versioned as described above
+     */
+
+    public boolean isVersioned() { return isVersioned; }
+
+    @Override
+    public ColumnMetadata outputSchema() { return outputSchema; }
+  }
+
+  /**
+   * State for a map vector. If the map is repeated, it will have an offset
+   * vector. The map vector itself is a pseudo-vector that is simply a
+   * container for other vectors, and so needs no management itself.
+   */
+
+  public static class MapVectorState implements VectorState {
+
+    private final AbstractMapVector mapVector;
+    private final VectorState offsets;
+
+    public MapVectorState(AbstractMapVector mapVector, VectorState offsets) {
+      this.mapVector = mapVector;
+      this.offsets = offsets;
+    }
+
+    @Override
+    public int allocate(int cardinality) {
+      // The mapVector is a pseudo-vector; nothing to allocate.
+
+      return offsets.allocate(cardinality);
+    }
+
+    @Override
+    public void rollover(int cardinality) {
+      offsets.rollover(cardinality);
+    }
+
+    @Override
+    public void harvestWithLookAhead() {
+      offsets.harvestWithLookAhead();
+    }
+
+    @Override
+    public void startBatchWithLookAhead() {
+      offsets.harvestWithLookAhead();
+    }
+
+    @Override
+    public void close() {
+      offsets.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public AbstractMapVector vector() { return mapVector; }
+
+    public VectorState offsetVectorState() { return offsets; }
+
+    @Override
+    public boolean isProjected() {
+      return offsets.isProjected();
+    }
+
+    @Override
+    public void dump(HierarchicalFormatter format) {
+      // TODO Auto-generated method stub
+    }
+  }
 
   /**
    * Handles the details of the top-level tuple, the data row itself.
@@ -72,10 +251,21 @@
 
     private final RowSetLoaderImpl writer;
 
-    public RowState(ResultSetLoaderImpl rsLoader) {
-      super(rsLoader, rsLoader.projectionSet);
+    /**
+     * The projected set of columns presented to the consumer of the
+     * row set loader. Excludes non-projected columns presented to the
+     * consumer of the writers. Also excludes columns if added during
+     * an overflow row.
+     */
+
+    private final VectorContainer outputContainer;
+
+    public RowState(ResultSetLoaderImpl rsLoader, ResultVectorCache vectorCache) {
+      super(rsLoader, vectorCache, rsLoader.projectionSet);
       writer = new RowSetLoaderImpl(rsLoader, schema);
       writer.bindListener(this);
+      outputContainer = new VectorContainer(rsLoader.allocator());
+      outputSchema = new TupleSchema();
     }
 
     public RowSetLoaderImpl rootWriter() { return writer; }
@@ -84,7 +274,32 @@ public RowState(ResultSetLoaderImpl rsLoader) {
     public AbstractTupleWriter writer() { return writer; }
 
     @Override
-    public int innerCardinality() { return resultSetLoader.targetRowCount();}
+    public int innerCardinality() { return loader.targetRowCount();}
+
+    /**
+     * The row as a whole is versioned.
+     *
+     * @return <tt>true</tt>
+     */
+
+    @Override
+    protected boolean isVersioned() { return true; }
+
+    @Override
+    protected void updateOutput(int curSchemaVersion) {
+      super.updateOutput(curSchemaVersion);
+      outputContainer.buildSchema(SelectionVectorMode.NONE);
+    }
+
+    @Override
+    public int addOutputColumn(ValueVector vector, ColumnMetadata colSchema) {
+      outputContainer.add(vector);
+      int index = outputSchema.addColumn(colSchema);
+      assert outputContainer.getNumberOfColumns() == outputSchema.size();
+      return index;
+    }
+
+    public VectorContainer outputContainer() { return outputContainer; }
   }
 
   /**
@@ -93,83 +308,167 @@ public RowState(ResultSetLoaderImpl rsLoader) {
    * a batch. This design supports the obscure case in which a new column
    * is added during an overflow row, so exists within this abstraction,
    * but is not published to the map that makes up the output.
+   * <p>
+   * The map state is associated with a map vector. This vector is built
+   * either during harvest time (normal maps) or on the fly (union maps.)
    */
 
-  public static class MapState extends TupleState {
+  public static abstract class MapState extends TupleState {
 
-    protected final BaseMapColumnState mapColumnState;
-    protected int outerCardinality;
+    public MapState(LoaderInternals events,
+        ResultVectorCache vectorCache,
+        RequestedTuple projectionSet) {
+      super(events, vectorCache, projectionSet);
+    }
 
-    public MapState(ResultSetLoaderImpl rsLoader,
-        BaseMapColumnState mapColumnState,
-        ProjectionSet projectionSet) {
-      super(rsLoader, projectionSet);
-      this.mapColumnState = mapColumnState;
-      if (mapColumnState.schema().isArray()) {
-        mapColumnState.writer().array().tuple().bindListener(this);
-      } else {
-        mapColumnState.writer().tuple().bindListener(this);
-      }
+    public void bindColumnState(MapColumnState colState) {
+      super.bindColumnState(colState);
+      writer().bindListener(this);
     }
 
-    /**
-     * Return the tuple writer for the map. If this is a single
-     * map, then it is the writer itself. If this is a map array,
-     * then the tuple is nested inside the array.
-     */
+    @SuppressWarnings("resource")
+    @Override
+    public int addOutputColumn(ValueVector vector, ColumnMetadata colSchema) {
+      AbstractMapVector mapVector = parentColumn.vector();
+      if (isVersioned()) {
+        mapVector.putChild(colSchema.name(), vector);
+      }
+      int index = outputSchema.addColumn(colSchema);
+      assert mapVector.size() == outputSchema.size();
+      assert mapVector.getField().getChildren().size() == outputSchema.size();
+      return index;
+    }
 
     @Override
-    public AbstractTupleWriter writer() {
-      AbstractObjectWriter objWriter = mapColumnState.writer();
-      TupleWriter tupleWriter;
-      if (objWriter.type() == ObjectType.ARRAY) {
-        tupleWriter = objWriter.array().tuple();
-      } else {
-        tupleWriter = objWriter.tuple();
+    protected void addColumn(ColumnState colState) {
+      super.addColumn(colState);
+
+      // If the map is materialized (because it is nested inside a union)
+      // then add the new vector to the map at add time. But, for top-level
+      // maps, or those nested inside other maps (but not a union or
+      // repeated list), defer
+      // adding the column until harvest time, to allow for the case that
+      // new columns are added in the overflow row. Such columns may be
+      // required, and not allow back-filling. But, inside unions, all
+      // columns must be nullable, so back-filling of nulls is possible.
+
+      if (! isVersioned()) {
+        @SuppressWarnings("resource")
+        AbstractMapVector mapVector = parentColumn.vector();
+        mapVector.putChild(colState.schema().name(), colState.vector());
       }
-      return (AbstractTupleWriter) tupleWriter;
     }
 
     /**
-     * In order to allocate the correct-sized vectors, the map must know
-     * its member cardinality: the number of elements in each row. This
-     * is 1 for a single map, but may be any number for a map array. Then,
-     * this value is recursively pushed downward to compute the cardinality
-     * of lists of maps that contains lists of maps, and so on.
+     * A map is within a union if the map vector has been materialized.
+     * Top-level maps are built at harvest time. But, due to the complexity
+     * of unions, maps within unions are materialized. This method ensures
+     * that maps are materialized regardless of nesting depth within
+     * a union.
      */
 
     @Override
-    public void updateCardinality(int outerCardinality) {
-      this.outerCardinality = outerCardinality;
-      super.updateCardinality(outerCardinality);
+    protected boolean isVersioned() {
+      return ((MapColumnState) parentColumn).isVersioned();
     }
 
     @Override
     public int innerCardinality() {
-      return outerCardinality * mapColumnState.schema().expectedElementCount();
+      return parentColumn.innerCardinality();
     }
 
     @Override
     public void dump(HierarchicalFormatter format) {
       format
         .startObject(this)
-        .attribute("column", mapColumnState.schema().name())
-        .attribute("cardinality", outerCardinality)
+        .attribute("column", parentColumn.schema().name())
+        .attribute("cardinality", innerCardinality())
         .endObject();
     }
   }
 
-  protected final ResultSetLoaderImpl resultSetLoader;
+  public static class SingleMapState extends MapState {
+
+    public SingleMapState(LoaderInternals events,
+        ResultVectorCache vectorCache,
+        RequestedTuple projectionSet) {
+      super(events, vectorCache, projectionSet);
+     }
+
+    /**
+     * Return the tuple writer for the map. If this is a single
+     * map, then it is the writer itself. If this is a map array,
+     * then the tuple is nested inside the array.
+     */
+
+    @Override
+    public AbstractTupleWriter writer() {
+      return (AbstractTupleWriter) parentColumn.writer().tuple();
+    }
+  }
+
+  public static class MapArrayState extends MapState {
+
+    public MapArrayState(LoaderInternals events,
+        ResultVectorCache vectorCache,
+        RequestedTuple projectionSet) {
+      super(events, vectorCache, projectionSet);
+    }
+
+    /**
+     * Return the tuple writer for the map. If this is a single
+     * map, then it is the writer itself. If this is a map array,
+     * then the tuple is nested inside the array.
+     */
+
+    @Override
+    public AbstractTupleWriter writer() {
+      return (AbstractTupleWriter) parentColumn.writer().array().tuple();
+    }
+  }
+
+  /**
+   * The set of columns added via the writers: includes both projected
+   * and unprojected columns. (The writer is free to add columns that the
+   * query does not project; the result set loader creates a dummy column
+   * and dummy writer, then does not project the column to the output.)
+   */
+
   protected final List<ColumnState> columns = new ArrayList<>();
+
+  /**
+   * Internal writer schema that matches the column list.
+   */
+
   protected final TupleSchema schema = new TupleSchema();
-  protected final ProjectionSet projectionSet;
 
-  protected TupleState(ResultSetLoaderImpl rsLoader, ProjectionSet projectionSet) {
-    this.resultSetLoader = rsLoader;
-    this.projectionSet = projectionSet;
+  /**
+   * Metadata description of the output container (for the row) or map
+   * (for map or repeated map.)
+   * <p>
+   * Rows and maps have an output schema which may differ from the internal schema.
+   * The output schema excludes unprojected columns. It also excludes
+   * columns added in an overflow row.
+   * <p>
+   * The output schema is built slightly differently for maps inside a
+   * union vs. normal top-level (or nested) maps. Maps inside a union do
+   * not defer columns because of the muddy semantics (and infrequent use)
+   * of unions.
+   */
+
+  protected TupleMetadata outputSchema;
+
+  private int prevHarvestIndex = -1;
+
+  protected TupleState(LoaderInternals events,
+      ResultVectorCache vectorCache,
+      RequestedTuple projectionSet) {
+    super(events, vectorCache, projectionSet);
   }
 
-  public abstract int innerCardinality();
+  protected void bindOutputSchema(TupleMetadata outputSchema) {
+    this.outputSchema = outputSchema;
+  }
 
   /**
    * Returns an ordered set of the columns which make up the tuple.
@@ -200,198 +499,78 @@ public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSche
     TupleMetadata tupleSchema = schema();
     String colName = columnSchema.name();
     if (tupleSchema.column(colName) != null) {
-      throw new IllegalArgumentException("Duplicate column: " + colName);
+      throw UserException
+        .validationError()
+        .message("Duplicate column name: ", colName)
+        .build(ResultSetLoaderImpl.logger);
     }
 
-    return addColumn(columnSchema);
+    return addColumn(columnSchema).writer();
   }
 
   @Override
-  public ProjectionType projectionType(String columnName) {
-    return projectionSet.isProjected(columnName) ?
-        ProjectionType.TUPLE :
-        ProjectionType.UNPROJECTED;
-  }
-
-  /**
-   * Implementation of the work to add a new column to this tuple given a
-   * schema description of the column.
-   *
-   * @param columnSchema schema of the column
-   * @return writer for the new column
-   */
-
-  private AbstractObjectWriter addColumn(ColumnMetadata columnSchema) {
-
-    // Indicate projection in the metadata.
-
-    ((AbstractColumnMetadata) columnSchema).setProjected(
-        projectionSet.isProjected(columnSchema.name()));
-
-    // Build the column
-
-    ColumnState colState;
-    if (columnSchema.isMap()) {
-      colState = buildMap(columnSchema);
-    } else {
-      colState = buildPrimitive(columnSchema);
-    }
+  protected void addColumn(ColumnState colState) {
     columns.add(colState);
-    colState.updateCardinality(innerCardinality());
-    colState.allocateVectors();
-    return colState.writer();
   }
 
-  /**
-   * Build a primitive column. Check if the column is projected. If not,
-   * allocate a dummy writer for the column. If projected, then allocate
-   * a vector, a writer, and the column state which binds the two together
-   * and manages the column.
-   *
-   * @param columnSchema schema of the new primitive column
-   * @return column state for the new column
-   */
-
-  @SuppressWarnings("resource")
-  private ColumnState buildPrimitive(ColumnMetadata columnSchema) {
-    ValueVector vector;
-    if (columnSchema.isProjected()) {
-
-      // Create the vector for the column.
-
-      vector = resultSetLoader.vectorCache().addOrGet(columnSchema.schema());
-    } else {
-
-      // Column is not projected. No materialized backing for the column.
-
-      vector = null;
-    }
-
-    // Create the writer. Will be returned to the tuple writer.
-
-    AbstractObjectWriter colWriter = ColumnWriterFactory.buildColumnWriter(columnSchema, vector);
-
-    if (columnSchema.isArray()) {
-      return PrimitiveColumnState.newPrimitiveArray(resultSetLoader, vector, colWriter);
-    } else {
-      return PrimitiveColumnState.newPrimitive(resultSetLoader, vector, colWriter);
+  public boolean hasProjections() {
+    for (ColumnState colState : columns) {
+      if (colState.isProjected()) {
+        return true;
+      }
     }
+    return false;
   }
 
-  /**
-   * Build a new map (single or repeated) column. No map vector is created
-   * here, instead we create a tuple state to hold the columns, and defer the
-   * map vector (or vector container) until harvest time.
-   *
-   * @param columnSchema description of the map column
-   * @return column state for the map column
-   */
+  @Override
+  protected Collection<ColumnState> columnStates() {
+    return columns;
+  }
 
-  private ColumnState buildMap(ColumnMetadata columnSchema) {
+  protected void updateOutput(int curSchemaVersion) {
 
-    // When dynamically adding columns, must add the (empty)
-    // map by itself, then add columns to the map via separate
-    // calls.
+    // Scan all columns
 
-    assert columnSchema.isMap();
-    assert columnSchema.mapSchema().size() == 0;
+    for (int i = 0; i < columns.size(); i++) {
+      ColumnState colState = columns.get(i);
 
-    // Create the writer. Will be returned to the tuple writer.
+      // Ignore unprojected columns
 
-    ProjectionSet childProjection = projectionSet.mapProjection(columnSchema.name());
-    if (columnSchema.isArray()) {
-      return MapArrayColumnState.build(resultSetLoader,
-          columnSchema,
-          childProjection);
-    } else {
-      MapVector vector;
-      if (columnSchema.isProjected()) {
-        vector = new MapVector(columnSchema.schema(), resultSetLoader.allocator(), null);
-      } else {
-        vector = null;
+      if (! colState.schema().isProjected()) {
+        continue;
       }
-      return new MapColumnState(resultSetLoader,
-          columnSchema, vector,
-          childProjection);
-    }
-  }
-
-  /**
-   * When creating a schema up front, provide the schema of the desired tuple,
-   * then build vectors and writers to match. Allows up-front schema definition
-   * in addition to on-the-fly schema creation handled elsewhere.
-   *
-   * @param schema desired tuple schema to be materialized
-   */
 
-  public void buildSchema(TupleMetadata schema) {
-    for (int i = 0; i < schema.size(); i++) {
-      ColumnMetadata colSchema = schema.metadata(i);
-      AbstractObjectWriter colWriter;
-      if (colSchema.isMap()) {
-        colWriter = addColumn(colSchema.cloneEmpty());
-        BaseMapColumnState mapColState = (BaseMapColumnState) columns.get(columns.size() - 1);
-        mapColState.mapState().buildSchema(colSchema.mapSchema());
-      } else {
-        colWriter = addColumn(colSchema);
+      // If this is a new column added since the last
+      // output, then we may have to add the column to this output.
+      // For the row itself, and for maps outside of unions, If the column was
+      // added after the output schema version cutoff, skip that column for now.
+      // But, if this tuple is within a union,
+      // then we always add all columns because union semantics are too
+      // muddy to play the deferred column game. Further, all columns in
+      // a map within a union must be nullable, so we know we can fill
+      // the column with nulls. (Something that is not true for normal
+      // maps.)
+
+      if (i > prevHarvestIndex && (! isVersioned() || colState.addVersion <= curSchemaVersion)) {
+        colState.buildOutput(this);
+        prevHarvestIndex = i;
       }
-      writer().addColumnWriter(colWriter);
-    }
-  }
 
-  public void updateCardinality(int cardinality) {
-    for (ColumnState colState : columns) {
-      colState.updateCardinality(cardinality);
-    }
-  }
+      // If the column is a map, then we have to recurse into the map
+      // itself. If the map is inside a union, then the map's vectors
+      // already appear in the map vector, but we still must update the
+      // output schema.
 
-  /**
-   * A column within the row batch overflowed. Prepare to absorb the rest of the
-   * in-flight row by rolling values over to a new vector, saving the complete
-   * vector for later. This column could have a value for the overflow row, or
-   * for some previous row, depending on exactly when and where the overflow
-   * occurs.
-   */
-
-  public void rollover() {
-    for (ColumnState colState : columns) {
-      colState.rollover();
-    }
-  }
-
-  /**
-   * Writing of a row batch is complete, and an overflow occurred. Prepare the
-   * vector for harvesting to send downstream. Set aside the look-ahead vector
-   * and put the full vector buffer back into the active vector.
-   */
-
-  public void harvestWithLookAhead() {
-    for (ColumnState colState : columns) {
-      colState.harvestWithLookAhead();
-    }
-  }
-
-  /**
-   * Start a new batch by shifting the overflow buffers back into the main
-   * write vectors and updating the writers.
-   */
-
-  public void startBatch() {
-    for (ColumnState colState : columns) {
-      colState.startBatch();
+      if (colState.schema().isMap()) {
+        MapState childMap = ((MapColumnState) colState).mapState();
+        childMap.updateOutput(curSchemaVersion);
+      }
     }
   }
 
-  /**
-   * Clean up state (such as backup vectors) associated with the state
-   * for each vector.
-   */
+  public abstract int addOutputColumn(ValueVector vector, ColumnMetadata colSchema);
 
-  public void close() {
-    for (ColumnState colState : columns) {
-      colState.close();
-    }
-  }
+  public TupleMetadata outputSchema() { return outputSchema; }
 
   public void dump(HierarchicalFormatter format) {
     format
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java
deleted file mode 100644
index 741117633e7..00000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.rowSet.impl;
-
-import java.util.List;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseMapColumnState;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.TupleSchema;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.vector.UInt4Vector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.AbstractMapVector;
-import org.apache.drill.exec.vector.complex.MapVector;
-import org.apache.drill.exec.vector.complex.RepeatedMapVector;
-
-/**
- * Builds the harvest vector container that includes only the columns that
- * are included in the harvest schema version. That is, it excludes columns
- * added while writing an overflow row.
- * <p>
- * Because a Drill row is actually a hierarchy, walks the internal hierarchy
- * and builds a corresponding output hierarchy.
- * <ul>
- * <li>The root node is the row itself (vector container),</li>
- * <li>Internal nodes are maps (structures),</li>
- * <li>Leaf notes are primitive vectors (which may be arrays).</li>
- * </ul>
- * The basic algorithm is to identify the version of the output schema,
- * then add any new columns added up to that version. This object maintains
- * the output container across batches, meaning that updates are incremental:
- * we need only add columns that are new since the last update. And, those new
- * columns will always appear directly after all existing columns in the row
- * or in a map.
- * <p>
- * As special case occurs when columns are added in the overflow row. These
- * columns <i>do not</i> appear in the output container for the main part
- * of the batch; instead they appear in the <i>next</i> output container
- * that includes the overflow row.
- * <p>
- * Since the container here may contain a subset of the internal columns, an
- * interesting case occurs for maps. The maps in the output container are
- * <b>not</b> the same as those used internally. Since a map column can contain
- * either one list of columns or another, the internal and external maps must
- * differ. The set of child vectors (except for child maps) are shared.
- */
-
-public class VectorContainerBuilder {
-
-  /**
-   * Drill vector containers and maps are both tuples, but they irritatingly
-   * have completely different APIs for working with their child vectors.
-   * This class acts as a proxy to wrap the two APIs to provide a common
-   * view for the use of the container builder.
-   */
-
-  public static abstract class TupleProxy {
-    protected TupleMetadata schema;
-
-    public TupleProxy(TupleMetadata schema) {
-      this.schema = schema;
-    }
-
-    protected abstract int size();
-    protected abstract ValueVector vector(int index);
-    protected abstract void add(ValueVector vector);
-
-    protected TupleProxy mapProxy(int index) {
-      return new MapProxy(
-          schema.metadata(index).mapSchema(),
-          (AbstractMapVector) vector(index));
-    }
-  }
-
-  /**
-   * Proxy wrapper class for a vector container.
-   */
-
-  protected static class ContainerProxy extends TupleProxy {
-
-    private VectorContainer container;
-
-    protected ContainerProxy(TupleMetadata schema, VectorContainer container) {
-      super(schema);
-      this.container = container;
-    }
-
-    @Override
-    protected int size() {
-      return container.getNumberOfColumns();
-    }
-
-    @Override
-    protected ValueVector vector(int index) {
-      return container.getValueVector(index).getValueVector();
-    }
-
-    @Override
-    protected void add(ValueVector vector) {
-      container.add(vector);
-    }
-  }
-
-  /**
-   * Proxy wrapper for a map container.
-   */
-
-  protected static class MapProxy extends TupleProxy {
-
-    private AbstractMapVector mapVector;
-
-    protected MapProxy(TupleMetadata schema, AbstractMapVector mapVector) {
-      super(schema);
-      this.mapVector = mapVector;
-    }
-
-    @Override
-    protected int size() {
-      return mapVector.size();
-    }
-
-    @Override
-    protected ValueVector vector(int index) {
-      return mapVector.getChildByOrdinal(index);
-    }
-
-    @Override
-    protected void add(ValueVector vector) {
-      mapVector.putChild(vector.getField().getName(), vector);
-    }
-  }
-
-  private final ResultSetLoaderImpl resultSetLoader;
-  private int outputSchemaVersion = -1;
-  private TupleMetadata schema;
-  private VectorContainer container;
-
-  public VectorContainerBuilder(ResultSetLoaderImpl rsLoader) {
-    this.resultSetLoader = rsLoader;
-    container = new VectorContainer(rsLoader.allocator);
-    schema = new TupleSchema();
-  }
-
-  public void update(int targetVersion) {
-    if (outputSchemaVersion >= targetVersion) {
-      return;
-    }
-    outputSchemaVersion = targetVersion;
-    updateTuple(resultSetLoader.rootState(), new ContainerProxy(schema, container));
-    container.buildSchema(SelectionVectorMode.NONE);
-  }
-
-  public VectorContainer container() { return container; }
-
-  public int outputSchemaVersion() { return outputSchemaVersion; }
-
-  public BufferAllocator allocator() {
-     return resultSetLoader.allocator();
-  }
-
-  private void updateTuple(TupleState sourceModel, TupleProxy destProxy) {
-    int prevCount = destProxy.size();
-    List<ColumnState> cols = sourceModel.columns();
-    int currentCount = cols.size();
-
-    // Scan any existing maps for column additions
-
-    for (int i = 0; i < prevCount; i++) {
-      ColumnState colState = cols.get(i);
-      if (! colState.schema().isProjected()) {
-        continue;
-      }
-      if (colState.schema().isMap()) {
-        updateTuple((TupleState) ((BaseMapColumnState) colState).mapState(), destProxy.mapProxy(i));
-      }
-    }
-
-    // Add new columns, which may be maps
-
-    for (int i = prevCount; i < currentCount; i++) {
-      ColumnState colState = cols.get(i);
-      if (! colState.schema().isProjected()) {
-        continue;
-      }
-
-      // If the column was added after the output schema version cutoff,
-      // skip that column for now.
-
-      if (colState.addVersion > outputSchemaVersion) {
-        break;
-      }
-      if (colState.schema().isMap()) {
-        buildMap(destProxy, (BaseMapColumnState) colState);
-      } else {
-        destProxy.add(colState.vector());
-        destProxy.schema.addColumn(colState.schema());
-        assert destProxy.size() == destProxy.schema.size();
-      }
-    }
-  }
-
-  @SuppressWarnings("resource")
-  private void buildMap(TupleProxy parentTuple, BaseMapColumnState colModel) {
-
-    // Creating the map vector will create its contained vectors if we
-    // give it a materialized field with children. So, instead pass a clone
-    // without children so we can add them.
-
-    ColumnMetadata mapColSchema = colModel.schema().cloneEmpty();
-
-    // Don't get the map vector from the vector cache. Map vectors may
-    // have content that varies from batch to batch. Only the leaf
-    // vectors can be cached.
-
-    AbstractMapVector mapVector;
-    if (mapColSchema.isArray()) {
-
-      // A repeated map shares an offset vector with the internal
-      // repeated map.
-
-      UInt4Vector offsets = (UInt4Vector) colModel.vector();
-      mapVector = new RepeatedMapVector(mapColSchema.schema(), offsets, null);
-    } else {
-      mapVector = new MapVector(mapColSchema.schema(), allocator(), null);
-    }
-
-    // Add the map vector and schema to the parent tuple
-
-    parentTuple.add(mapVector);
-    int index = parentTuple.schema.addColumn(mapColSchema);
-    assert parentTuple.size() == parentTuple.size();
-
-    // Update the tuple, which will add the new columns in the map
-
-    updateTuple(colModel.mapState(), parentTuple.mapProxy(index));
-  }
-
-  public TupleMetadata schema() { return schema; }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java
index 4a1c6982671..134855b1ed2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java
@@ -89,14 +89,24 @@
    * Clear the vector(s) associated with this state.
    */
 
-  void reset();
+  void close();
 
   /**
    * Underlying vector: the one presented to the consumer of the
    * result set loader.
    */
 
-  ValueVector vector();
+  <T extends ValueVector> T vector();
+
+  /**
+   * Report whether this column is projected (has materialized vectors),
+   * or is unprojected (has no materialized backing.)
+   *
+   * @return true if the column is projected to the output, false if
+   * not
+   */
+
+  boolean isProjected();
 
   void dump(HierarchicalFormatter format);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
index 2158dd12846..9fb3e4ea292 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.rowSet.impl;
 
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 
 /**
@@ -60,6 +61,26 @@ public boolean next() {
     }
   }
 
+  public int skipRows(int requestedCount) {
+
+    // Determine the number of rows that can be skipped. Note that since the
+    // batch must have no columns, there is no need to observe the normal
+    // batch size limit.
+
+    int skipped = Math.min(requestedCount,
+        ValueVector.MAX_ROW_COUNT - rowIndex);
+
+    // Update the row index with the skip count.
+
+    rowIndex += skipped;
+
+    // Tell the client how many rows were actually skipped.
+    // The client can make more requests in later batches to get
+    // the full amount.
+
+    return skipped;
+  }
+
   public int size() {
 
     // The index always points to the next slot past the
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java
index 27e3c3d4e0d..5212afb0814 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java
@@ -18,9 +18,14 @@
 package org.apache.drill.exec.physical.rowSet.model;
 
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.AbstractColumnMetadata;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.RepeatedListColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.record.metadata.VariantMetadata;
+import org.apache.drill.exec.record.metadata.VariantSchema;
 
 /**
  * Interface for retrieving and/or creating metadata given
@@ -31,15 +36,24 @@
   ColumnMetadata metadata(int index, MaterializedField field);
   MetadataProvider childProvider(ColumnMetadata colMetadata);
   TupleMetadata tuple();
+  VariantMetadata variant();
 
   public static class VectorDescrip {
     public final MetadataProvider parent;
     public final ColumnMetadata metadata;
 
+    public VectorDescrip(MetadataProvider provider, ColumnMetadata metadata) {
+      parent = provider;
+      this.metadata = metadata;
+    }
+
     public VectorDescrip(MetadataProvider provider, int index,
         MaterializedField field) {
-      parent = provider;
-      metadata = provider.metadata(index, field);
+      this(provider, provider.metadata(index, field));
+    }
+
+    public MetadataProvider childProvider() {
+      return parent.childProvider(metadata);
     }
   }
 
@@ -62,11 +76,91 @@ public ColumnMetadata metadata(int index, MaterializedField field) {
 
     @Override
     public MetadataProvider childProvider(ColumnMetadata colMetadata) {
-      return new MetadataCreator((TupleSchema) colMetadata.mapSchema());
+      return makeProvider(colMetadata);
+    }
+
+    public static MetadataProvider makeProvider(ColumnMetadata colMetadata) {
+      switch (colMetadata.structureType()) {
+      case MULTI_ARRAY:
+        return new ArraySchemaCreator((RepeatedListColumnMetadata) colMetadata);
+      case VARIANT:
+        return new VariantSchemaCreator((VariantSchema) colMetadata.variantSchema());
+      case TUPLE:
+        return new MetadataCreator((TupleSchema) colMetadata.mapSchema());
+      default:
+        throw new UnsupportedOperationException();
+      }
     }
 
     @Override
     public TupleMetadata tuple() { return tuple; }
+
+    @Override
+    public VariantMetadata variant() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public static class VariantSchemaCreator implements MetadataProvider {
+
+    private final VariantSchema variantSchema;
+
+    public VariantSchemaCreator(VariantSchema variantSchema) {
+      this.variantSchema = variantSchema;
+    }
+
+    @Override
+    public ColumnMetadata metadata(int index, MaterializedField field) {
+      return variantSchema.addType(field);
+    }
+
+    @Override
+    public MetadataProvider childProvider(ColumnMetadata colMetadata) {
+      return MetadataCreator.makeProvider(colMetadata);
+    }
+
+    @Override
+    public TupleMetadata tuple() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public VariantMetadata variant() {
+      return variantSchema;
+    }
+  }
+
+  public static class ArraySchemaCreator implements MetadataProvider {
+
+    private final RepeatedListColumnMetadata arraySchema;
+
+    public ArraySchemaCreator(RepeatedListColumnMetadata arraySchema) {
+      this.arraySchema = arraySchema;
+    }
+
+    @Override
+    public ColumnMetadata metadata(int index, MaterializedField field) {
+      assert index == 0;
+      assert arraySchema.childSchema() == null;
+      AbstractColumnMetadata childSchema = MetadataUtils.fromField(field.cloneEmpty());
+      arraySchema.childSchema(childSchema);
+      return childSchema;
+    }
+
+    @Override
+    public MetadataProvider childProvider(ColumnMetadata colMetadata) {
+      return MetadataCreator.makeProvider(colMetadata);
+    }
+
+    @Override
+    public TupleMetadata tuple() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public VariantMetadata variant() {
+      throw new UnsupportedOperationException();
+    }
   }
 
   public static class MetadataRetrieval implements MetadataProvider {
@@ -84,10 +178,87 @@ public ColumnMetadata metadata(int index, MaterializedField field) {
 
     @Override
     public MetadataProvider childProvider(ColumnMetadata colMetadata) {
-      return new MetadataRetrieval((TupleSchema) colMetadata.mapSchema());
+      return makeProvider(colMetadata);
+    }
+
+    public static MetadataProvider makeProvider(ColumnMetadata colMetadata) {
+      switch (colMetadata.structureType()) {
+      case MULTI_ARRAY:
+        return new ArraySchemaRetrieval(colMetadata);
+      case VARIANT:
+        return new VariantSchemaRetrieval((VariantSchema) colMetadata.variantSchema());
+      case TUPLE:
+        return new MetadataRetrieval(colMetadata.mapSchema());
+      default:
+        throw new UnsupportedOperationException();
+      }
     }
 
     @Override
     public TupleMetadata tuple() { return tuple; }
+
+    @Override
+    public VariantMetadata variant() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public static class VariantSchemaRetrieval implements MetadataProvider {
+
+    private final VariantSchema variantSchema;
+
+    public VariantSchemaRetrieval(VariantSchema variantSchema) {
+      this.variantSchema = variantSchema;
+    }
+
+    @Override
+    public ColumnMetadata metadata(int index, MaterializedField field) {
+      return variantSchema.member(field.getType().getMinorType());
+    }
+
+    @Override
+    public MetadataProvider childProvider(ColumnMetadata colMetadata) {
+      return MetadataRetrieval.makeProvider(colMetadata);
+    }
+
+    @Override
+    public TupleMetadata tuple() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public VariantMetadata variant() {
+      return variantSchema;
+    }
+  }
+
+  public static class ArraySchemaRetrieval implements MetadataProvider {
+
+    private final ColumnMetadata arraySchema;
+
+    public ArraySchemaRetrieval(ColumnMetadata arraySchema) {
+      this.arraySchema = arraySchema;
+    }
+
+    @Override
+    public ColumnMetadata metadata(int index, MaterializedField field) {
+      assert index == 0;
+      return arraySchema.childSchema();
+    }
+
+    @Override
+    public MetadataProvider childProvider(ColumnMetadata colMetadata) {
+      return MetadataRetrieval.makeProvider(colMetadata);
+    }
+
+    @Override
+    public TupleMetadata tuple() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public VariantMetadata variant() {
+      throw new UnsupportedOperationException();
+    }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java
index 50568db2099..24583091662 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java
@@ -17,13 +17,13 @@
  */
 package org.apache.drill.exec.physical.rowSet.model.single;
 
+import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
 
@@ -54,9 +54,10 @@ public VectorContainer build(TupleMetadata schema) {
   }
 
   private ValueVector buildVector(ColumnMetadata metadata) {
-    if (metadata.isMap()) {
+    switch (metadata.structureType()) {
+    case TUPLE:
       return buildMap(metadata);
-    } else {
+    default:
       return TypeHelper.getNewVector(metadata.schema(), allocator, null);
     }
   }
@@ -73,25 +74,31 @@ private AbstractMapVector buildMap(ColumnMetadata schema) {
 
     // Creating the map vector will create its contained vectors if we
     // give it a materialized field with children. So, instead pass a clone
-    // without children so we can add them.
-
-    MaterializedField mapField = schema.schema();
-    MaterializedField emptyClone = MaterializedField.create(mapField.getName(), mapField.getType());
-
-    // Don't get the map vector from the vector cache. Map vectors may
-    // have content that varies from batch to batch. Only the leaf
-    // vectors can be cached.
+    // without children so we can add the children as we add vectors.
 
-    AbstractMapVector mapVector = (AbstractMapVector) TypeHelper.getNewVector(emptyClone, allocator, null);
+    AbstractMapVector mapVector = (AbstractMapVector) TypeHelper.getNewVector(schema.emptySchema(), allocator, null);
+    populateMap(mapVector, schema.mapSchema(), false);
+    return mapVector;
+  }
 
-    // Create the contents building the model as we go.
+  /**
+   * Create the contents building the model as we go.
+   *
+   * @param mapVector the vector to populate
+   * @param mapSchema description of the map
+   */
 
-    TupleMetadata mapSchema = schema.mapSchema();
+  private void populateMap(AbstractMapVector mapVector,
+      TupleMetadata mapSchema, boolean inUnion) {
     for (int i = 0; i < mapSchema.size(); i++) {
       ColumnMetadata childSchema = mapSchema.metadata(i);
+
+      // Check for union-compatible types. But, maps must be required.
+
+      if (inUnion && ! childSchema.isMap() && childSchema.mode() == DataMode.REQUIRED) {
+        throw new IllegalArgumentException("Map members in a list or union must not be non-nullable");
+      }
       mapVector.putChild(childSchema.name(), buildVector(childSchema));
     }
-
-    return mapVector;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
new file mode 100644
index 00000000000..c1e383eb676
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+
+/**
+ * Represents a wildcard: SELECT * when used at the root tuple.
+ * When used with maps, means selection of all map columns, either
+ * implicitly, or because the map itself is selected.
+ */
+
+public class ImpliedTupleRequest implements RequestedTuple {
+
+  public static final RequestedTuple ALL_MEMBERS =
+      new ImpliedTupleRequest(true);
+  public static final RequestedTuple NO_MEMBERS =
+      new ImpliedTupleRequest(false);
+  public static final List<RequestedColumn> EMPTY_COLS = new ArrayList<>();
+
+  private boolean allProjected;
+
+  public ImpliedTupleRequest(boolean allProjected) {
+    this.allProjected = allProjected;
+  }
+
+  @Override
+  public ProjectionType projectionType(String colName) {
+    return allProjected
+      ? ProjectionType.UNSPECIFIED
+      : ProjectionType.UNPROJECTED;
+  }
+
+  @Override
+  public RequestedTuple mapProjection(String colName) {
+    return allProjected ? ALL_MEMBERS : NO_MEMBERS;
+  }
+
+  @Override
+  public void parseSegment(PathSegment child) { }
+
+  @Override
+  public RequestedColumn get(String colName) { return null; }
+
+  @Override
+  public List<RequestedColumn> projections() { return EMPTY_COLS; }
+
+  @Override
+  public void buildName(StringBuilder buf) { }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedColumnImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedColumnImpl.java
new file mode 100644
index 00000000000..b76f267343e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedColumnImpl.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.project;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+
+/**
+ * Represents one name element. Like a {@link NameSegment}, except that this
+ * version is an aggregate. If the projection list contains `a.b` and `a.c`,
+ * then one name segment exists for a, and contains segments for both b and c.
+ */
+
+public class RequestedColumnImpl implements RequestedColumn {
+
+  /**
+   * Special marker to indicate that that a) the item is an
+   * array, and b) that all indexes are to be projected.
+   * Used when seeing both a and a[x].
+   */
+
+  private static final Set<Integer> ALL_INDEXES = new HashSet<>();
+
+  private final RequestedTuple parent;
+  private final String name;
+  private RequestedTuple members;
+  private Set<Integer> indexes;
+  private ProjectionType type;
+
+  public RequestedColumnImpl(RequestedTuple parent, String name) {
+    this.parent = parent;
+    this.name = name;
+    setType();
+  }
+
+  @Override
+  public String name() { return name; }
+  @Override
+  public ProjectionType type() { return type; }
+  @Override
+  public boolean isWildcard() { return type == ProjectionType.WILDCARD; }
+  @Override
+  public boolean isSimple() { return type == ProjectionType.UNSPECIFIED; }
+
+  @Override
+  public boolean isArray() {
+    return type == ProjectionType.ARRAY || type == ProjectionType.TUPLE_ARRAY;
+  }
+
+  @Override
+  public boolean isTuple() {
+    return type == ProjectionType.TUPLE || type == ProjectionType.TUPLE_ARRAY;
+  }
+
+  public RequestedTuple asTuple() {
+    if (members == null) {
+      members = new RequestedTupleImpl(this);
+      setType();
+    }
+    return members;
+  }
+
+  public RequestedTuple projectAllMembers(boolean projectAll) {
+    members = projectAll ? ImpliedTupleRequest.ALL_MEMBERS : ImpliedTupleRequest.NO_MEMBERS;
+    setType();
+    return members;
+  }
+
+  public void addIndex(int index) {
+    if (indexes == null) {
+      indexes = new HashSet<>();
+    }
+    if (indexes != ALL_INDEXES) {
+      indexes.add(index);
+    }
+    setType();
+  }
+
+  public void projectAllElements() {
+    indexes = ALL_INDEXES;
+    setType();
+  }
+
+  @Override
+  public boolean hasIndexes() {
+    return indexes != null && indexes != ALL_INDEXES;
+  }
+
+  @Override
+  public boolean hasIndex(int index) {
+    return hasIndexes() ? indexes.contains(index) : false;
+  }
+
+  @Override
+  public int maxIndex() {
+    if (! hasIndexes()) {
+      return 0;
+    }
+    int max = 0;
+    for (Integer index : indexes) {
+      max = Math.max(max, index);
+    }
+    return max;
+  }
+
+  @Override
+  public boolean[] indexes() {
+    if (! hasIndexes()) {
+      return null;
+    }
+    int max = maxIndex();
+    boolean map[] = new boolean[max+1];
+    for (Integer index : indexes) {
+      map[index] = true;
+    }
+    return map;
+  }
+
+  @Override
+  public String fullName() {
+    StringBuilder buf = new StringBuilder();
+    buildName(buf);
+    return buf.toString();
+  }
+
+  public boolean isRoot() { return parent == null; }
+
+  private void setType() {
+    if (name.equals(SchemaPath.DYNAMIC_STAR)) {
+      type = ProjectionType.WILDCARD;
+    } else if (indexes != null && members != null) {
+      type = ProjectionType.TUPLE_ARRAY;
+    }
+    else if (indexes != null) {
+      type = ProjectionType.ARRAY;
+    } else if (members != null) {
+      type = ProjectionType.TUPLE;
+    } else {
+      type = ProjectionType.UNSPECIFIED;
+    }
+  }
+
+  protected void buildName(StringBuilder buf) {
+    parent.buildName(buf);
+    buf.append('`')
+       .append(name)
+       .append('`');
+  }
+
+  @Override
+  public String summary() {
+    switch (type) {
+    case ARRAY:
+      return "array column";
+    case TUPLE:
+      return "map column";
+    case TUPLE_ARRAY:
+      return "repeated map";
+    case WILDCARD:
+      return "wildcard";
+    default:
+      return "column";
+    }
+  }
+
+  @Override
+  public boolean nameEquals(String target) {
+    return name.equalsIgnoreCase(target);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf
+      .append("[")
+      .append(getClass().getSimpleName())
+      .append(" name=")
+      .append(name())
+      .append(", type=")
+      .append(summary());
+    if (isArray()) {
+      buf
+        .append(", array=")
+        .append(indexes);
+    }
+    if (isTuple()) {
+      buf
+        .append(", tuple=")
+        .append(members);
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+
+  @Override
+  public RequestedTuple mapProjection() { return members; }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTuple.java
new file mode 100644
index 00000000000..fee0892949a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTuple.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.project;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+
+/**
+ * Represents the set of columns projected for a tuple (row or map.)
+ * The projected columns might themselves be columns, so returns a
+ * projection set for such columns. Represents the set of requested
+ * columns and tuples as expressed in the physical plan.
+ * <p>
+ * Three variations exist:
+ * <ul>
+ * <li>Project all ({@link ImpliedTupleRequest#ALL_MEMBERS}): used for a tuple when
+ * all columns are projected. Example: the root tuple (the row) in
+ * a <tt>SELECT *</tt> query.</li>
+ * <li>Project none  (also {@link ImpliedTupleRequest#NO_MEMBERS}): used when no
+ * columns are projected from a tuple, such as when a map itself is
+ * not projected, so none of its member columns are projected.</li>
+ * <li>Project some ({@link RequestedTupleImpl}: used in the
+ * <tt>SELECT a, c, e</tt> case in which the query identifies which
+ * columns to project (implicitly leaving out others, such as b and
+ * d in our example.)</li>
+ * </ul>
+ * <p>
+ * The result is that each tuple (row and map) has an associated
+ * projection set which the code can query to determine if a newly
+ * added column is wanted (and so should have a backing vector) or
+ * is unwanted (and can just receive a dummy writer.)
+ */
+
+public interface RequestedTuple {
+
+  /**
+   * Plan-time properties of a requested column. Represents
+   * a consolidated view of the set of references to a column.
+   * For example, the project list might contain:<br>
+   * <tt>SELECT columns[4], columns[8]</tt><br>
+   * <tt>SELECT a.b, a.c</tt><br>
+   * <tt>SELECT columns, columns[1]</tt><br>
+   * <tt>SELECT a, a.b</tt><br>
+   * In each case, the same column is referenced in different
+   * forms which are consolidated in to this abstraction.
+   * <p>
+   * Depending on the syntax, we can infer if a column must
+   * be an array or map. This is definitive: though we know that
+   * columns of the form above must be an array or a map,
+   * we cannot know if a simple column reference might refer
+   * to an array or map.
+   */
+
+  public interface RequestedColumn {
+
+    String name();
+    ProjectionType type();
+    boolean isWildcard();
+    boolean isSimple();
+    boolean isArray();
+    boolean isTuple();
+    String fullName();
+    RequestedTuple mapProjection();
+    boolean nameEquals(String target);
+    int maxIndex();
+    boolean[] indexes();
+    boolean hasIndexes();
+    boolean hasIndex(int index);
+    String summary();
+  }
+
+  void parseSegment(PathSegment child);
+  RequestedColumn get(String colName);
+  ProjectionType projectionType(String colName);
+  RequestedTuple mapProjection(String colName);
+  List<RequestedColumn> projections();
+  void buildName(StringBuilder buf);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
new file mode 100644
index 00000000000..45a704c070c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.project;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.ArraySegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.exec.record.metadata.TupleNameSpace;
+
+/**
+ * Represents an explicit projection at some tuple level.
+ * <p>
+ * A column is projected if it is explicitly listed in the selection list.
+ * <p>
+ * If a column is a map, then the projection for the map's columns is based on
+ * two rules:
+ * <ol>
+ * <li>If the projection list includes at least one explicit mention of a map
+ * member, then include only those columns explicitly listed.</li>
+ * <li>If the projection at the parent level lists only the map column itself
+ * (which the projection can't know is a map), then assume this implies all
+ * columns, as if the entry where "map.*".</li>
+ * </ol>
+ * <p>
+ * Examples:<br>
+ * <code>m</code><br>
+ * If m turns out to be a map, project all members of m.<br>
+ * <code>m.a</code><br>
+ * Column m must be a map. Project only column a.<br>
+ * <code>m, m.a</code><br>
+ * Tricky case. We interpret this as projecting only the "a" element of map m.
+ * <p>
+ * The projection set is build from a list of columns, represented as
+ * {@link SchemaPath} objects, provided by the physical plan. The structure of
+ * <tt>SchemaPath</tt> is a bit awkward:
+ * <p>
+ * <ul>
+ * <li><tt>SchemaPath> is a wrapper for a column which directly holds the
+ * <tt>NameSegment</tt> for the top-level column.</li>
+ * <li><tt>NameSegment</tt> holds a name. This can be a top name such as
+ * `a`, or parts of a compound name such as `a`.`b`. Each <tt>NameSegment</tt>
+ * has a "child" that points to the option following parts of the name.</li>
+ * <li><PathSegment</tt> is the base class for the parts of a name.</tt>
+ * <li><tt>ArraySegment</tt> is the other kind of name part and represents
+ * an array index such as the "[1]" in `columns`[1].</li>
+ * <ul>
+ * The parser here consumes only names, this mechanism does not consider
+ * array indexes. As a result, there may be multiple projected columns that
+ * map to the same projection here: `columns`[1] and `columns`[2] both map to
+ * the name `columns`, for example.
+ */
+
+public class RequestedTupleImpl implements RequestedTuple {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class);
+
+  private final RequestedColumnImpl parent;
+  private final TupleNameSpace<RequestedColumn> projection = new TupleNameSpace<>();
+
+  public RequestedTupleImpl() {
+    parent = null;
+  }
+
+  public RequestedTupleImpl(RequestedColumnImpl parent) {
+    this.parent = parent;
+  }
+
+  @Override
+  public RequestedColumn get(String colName) {
+    return projection.get(colName.toLowerCase());
+  }
+
+  private RequestedColumnImpl getImpl(String colName) {
+    return (RequestedColumnImpl) get(colName);
+  }
+
+  @Override
+  public ProjectionType projectionType(String colName) {
+    RequestedColumn col = get(colName);
+    return col == null ? ProjectionType.UNPROJECTED : col.type();
+  }
+
+  @Override
+  public RequestedTuple mapProjection(String colName) {
+    RequestedColumnImpl col = getImpl(colName);
+    RequestedTuple mapProj = (col == null) ? null : col.mapProjection();
+    if (mapProj != null) {
+      return mapProj;
+    }
+
+    // No explicit information for the map. Members inherit the
+    // same projection as the map itself.
+
+    if (col != null) {
+      return col.projectAllMembers(true);
+    }
+    return ImpliedTupleRequest.NO_MEMBERS;
+  }
+
+  /**
+   * Parse a projection list. The list should consist of a list of column names;
+   * any wildcards should have been processed by the caller. An empty list means
+   * nothing is projected. A null list means everything is projected (that is, a
+   * null list here is equivalent to a wildcard in the SELECT statement.)
+   *
+   * @param projList
+   *          the list of projected columns, or null if no projection is to be
+   *          done
+   * @return a projection set that implements the specified projection
+   */
+
+  public static RequestedTuple parse(Collection<SchemaPath> projList) {
+    if (projList == null) {
+      return new ImpliedTupleRequest(true);
+    }
+    if (projList.isEmpty()) {
+      return new ImpliedTupleRequest(false);
+    }
+    RequestedTupleImpl projSet = new RequestedTupleImpl();
+    for (SchemaPath col : projList) {
+      projSet.parseSegment(col.getRootSegment());
+    }
+    return projSet;
+  }
+
+  @Override
+  public void parseSegment(PathSegment pathSeg) {
+    if (pathSeg.isLastPath()) {
+      parseLeaf((NameSegment) pathSeg);
+    } else if (pathSeg.getChild().isArray()) {
+      parseArray((NameSegment) pathSeg);
+    } else {
+      parseInternal((NameSegment) pathSeg);
+    }
+  }
+
+  private void parseLeaf(NameSegment nameSeg) {
+    String name = nameSeg.getPath();
+    RequestedColumnImpl member = getImpl(name);
+    if (member == null) {
+      projection.add(name, new RequestedColumnImpl(this, name));
+      return;
+    }
+    if (member.isSimple() || member.isWildcard()) {
+      throw UserException
+        .validationError()
+        .message("Duplicate column in project list: %s",
+            member.fullName())
+        .build(logger);
+    }
+    if (member.isArray()) {
+
+      // Saw both a and a[x]. Occurs in project list.
+      // Project all elements.
+
+      member.projectAllElements();
+      return;
+    }
+
+    // Else the column is a known map.
+
+    assert member.isTuple();
+
+    // Allow both a.b (existing) and a (this column)
+    // Since we we know a is a map, and we've projected the
+    // whole map, modify the projection of the column to
+    // project the entire map.
+
+    member.projectAllMembers(true);
+  }
+
+  private void parseInternal(NameSegment nameSeg) {
+    String name = nameSeg.getPath();
+    RequestedColumnImpl member = getImpl(name);
+    RequestedTuple map;
+    if (member == null) {
+      // New member. Since this is internal, this new member
+      // must be a map.
+
+      member = new RequestedColumnImpl(this, name);
+      projection.add(name, member);
+      map = member.asTuple();
+    } else if (member.isTuple()) {
+
+      // Known map. Add to it.
+
+      map = member.asTuple();
+    } else {
+
+      // Member was previously projected by itself. We now
+      // know it is a map. So, project entire map. (Earlier
+      // we saw `a`. Now we see `a`.`b`.)
+
+      map = member.projectAllMembers(true);
+    }
+    map.parseSegment(nameSeg.getChild());
+  }
+
+  private void parseArray(NameSegment arraySeg) {
+    String name = arraySeg.getPath();
+    int index = ((ArraySegment) arraySeg.getChild()).getIndex();
+    RequestedColumnImpl member = getImpl(name);
+    if (member == null) {
+      member = new RequestedColumnImpl(this, name);
+      projection.add(name, member);
+    } else if (member.isSimple()) {
+
+      // Saw both a and a[x]. Occurs in project list.
+      // Project all elements.
+
+      member.projectAllElements();
+      return;
+    } else if (member.hasIndex(index)) {
+      throw UserException
+        .validationError()
+        .message("Duplicate array index in project list: %s[%d]",
+            member.fullName(), index)
+        .build(logger);
+    }
+    member.addIndex(index);
+  }
+
+  @Override
+  public List<RequestedColumn> projections() {
+    return projection.entries();
+  }
+
+  @Override
+  public void buildName(StringBuilder buf) {
+    if (parent != null) {
+      parent.buildName(buf);
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
new file mode 100644
index 00000000000..69d56a1bc6b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestProjectedTuple {
+
+  @Test
+  public void testProjectionAll() {
+
+    // Null map means everything is projected
+
+    RequestedTuple projSet = RequestedTupleImpl.parse(null);
+    assertTrue(projSet instanceof ImpliedTupleRequest);
+    assertEquals(ProjectionType.UNSPECIFIED, projSet.projectionType("foo"));
+  }
+
+  /**
+   * Test an empty projection which occurs in a
+   * SELECT COUNT(*) query.
+   */
+
+  @Test
+  public void testProjectionNone() {
+
+    // Empty list means nothing is projected
+
+    RequestedTuple projSet = RequestedTupleImpl.parse(new ArrayList<SchemaPath>());
+    assertTrue(projSet instanceof ImpliedTupleRequest);
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(0, cols.size());
+    assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("foo"));
+  }
+
+  @Test
+  public void testProjectionSimple() {
+
+    // Simple non-map columns
+
+    RequestedTuple projSet = RequestedTupleImpl.parse(
+        RowSetTestUtils.projectList("a", "b", "c"));
+    assertTrue(projSet instanceof RequestedTupleImpl);
+    assertEquals(ProjectionType.UNSPECIFIED, projSet.projectionType("a"));
+    assertEquals(ProjectionType.UNSPECIFIED, projSet.projectionType("b"));
+    assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("d"));
+
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(3, cols.size());
+
+    RequestedColumn a = cols.get(0);
+    assertEquals("a", a.name());
+    assertEquals(ProjectionType.UNSPECIFIED, a.type());
+    assertTrue(a.isSimple());
+    assertFalse(a.isWildcard());
+    assertNull(a.mapProjection());
+    assertNull(a.indexes());
+
+    assertEquals("b", cols.get(1).name());
+    assertEquals(ProjectionType.UNSPECIFIED, cols.get(1).type());
+    assertTrue(cols.get(1).isSimple());
+
+    assertEquals("c", cols.get(2).name());
+    assertEquals(ProjectionType.UNSPECIFIED, cols.get(2).type());
+    assertTrue(cols.get(2).isSimple());
+  }
+
+  @Test
+  public void testProjectionWholeMap() {
+
+    // Whole-map projection (note, fully projected maps are
+    // identical to projected simple columns at this level of
+    // abstraction.)
+
+    List<SchemaPath> projCols = new ArrayList<>();
+    projCols.add(SchemaPath.getSimplePath("map"));
+    RequestedTuple projSet = RequestedTupleImpl.parse(projCols);
+
+    assertTrue(projSet instanceof RequestedTupleImpl);
+    assertEquals(ProjectionType.UNSPECIFIED, projSet.projectionType("map"));
+    assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("another"));
+    RequestedTuple mapProj = projSet.mapProjection("map");
+    assertNotNull(mapProj);
+    assertTrue(mapProj instanceof ImpliedTupleRequest);
+    assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("foo"));
+    assertNotNull(projSet.mapProjection("another"));
+    assertEquals(ProjectionType.UNPROJECTED, projSet.mapProjection("another").projectionType("anyCol"));
+  }
+
+  @Test
+  public void testProjectionMapSubset() {
+
+    // Selected map projection, multiple levels, full projection
+    // at leaf level.
+
+    List<SchemaPath> projCols = new ArrayList<>();
+    projCols.add(SchemaPath.getCompoundPath("map", "a"));
+    projCols.add(SchemaPath.getCompoundPath("map", "b"));
+    projCols.add(SchemaPath.getCompoundPath("map", "map2", "x"));
+    RequestedTuple projSet = RequestedTupleImpl.parse(projCols);
+    assertTrue(projSet instanceof RequestedTupleImpl);
+    assertEquals(ProjectionType.TUPLE, projSet.projectionType("map"));
+
+    // Map: an explicit map at top level
+
+    RequestedTuple mapProj = projSet.mapProjection("map");
+    assertTrue(mapProj instanceof RequestedTupleImpl);
+    assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("a"));
+    assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("b"));
+    assertEquals(ProjectionType.TUPLE, mapProj.projectionType("map2"));
+    assertEquals(ProjectionType.UNPROJECTED, mapProj.projectionType("bogus"));
+
+    // Map b: an implied nested map
+
+    RequestedTuple bMapProj = mapProj.mapProjection("b");
+    assertNotNull(bMapProj);
+    assertTrue(bMapProj instanceof ImpliedTupleRequest);
+    assertEquals(ProjectionType.UNSPECIFIED, bMapProj.projectionType("foo"));
+
+    // Map2, an nested map, has an explicit projection
+
+    RequestedTuple map2Proj = mapProj.mapProjection("map2");
+    assertNotNull(map2Proj);
+    assertTrue(map2Proj instanceof RequestedTupleImpl);
+    assertEquals(ProjectionType.UNSPECIFIED, map2Proj.projectionType("x"));
+    assertEquals(ProjectionType.UNPROJECTED, map2Proj.projectionType("bogus"));
+  }
+
+  @Test
+  public void testProjectionMapFieldAndMap() {
+
+    // Project both a map member and the entire map.
+
+    {
+      List<SchemaPath> projCols = new ArrayList<>();
+      projCols.add(SchemaPath.getCompoundPath("map", "a"));
+      projCols.add(SchemaPath.getCompoundPath("map"));
+
+      RequestedTuple projSet = RequestedTupleImpl.parse(projCols);
+      assertTrue(projSet instanceof RequestedTupleImpl);
+      assertEquals(ProjectionType.TUPLE, projSet.projectionType("map"));
+
+      RequestedTuple mapProj = projSet.mapProjection("map");
+      assertTrue(mapProj instanceof ImpliedTupleRequest);
+      assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("a"));
+
+      // Didn't ask for b, but did ask for whole map.
+
+      assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("b"));
+    }
+
+    // Now the other way around.
+
+    {
+      List<SchemaPath> projCols = new ArrayList<>();
+      projCols.add(SchemaPath.getCompoundPath("map"));
+      projCols.add(SchemaPath.getCompoundPath("map", "a"));
+
+      RequestedTuple projSet = RequestedTupleImpl.parse(projCols);
+      assertTrue(projSet instanceof RequestedTupleImpl);
+      assertEquals(ProjectionType.TUPLE, projSet.projectionType("map"));
+
+      RequestedTuple mapProj = projSet.mapProjection("map");
+      assertTrue(mapProj instanceof ImpliedTupleRequest);
+      assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("a"));
+      assertEquals(ProjectionType.UNSPECIFIED, mapProj.projectionType("b"));
+    }
+  }
+
+  @Test
+  public void testMapDetails() {
+    RequestedTuple projSet = RequestedTupleImpl.parse(
+        RowSetTestUtils.projectList("a.b.c", "a.c", "d"));
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(2, cols.size());
+
+    RequestedColumn a = cols.get(0);
+    assertEquals("a", a.name());
+    assertFalse(a.isSimple());
+    assertFalse(a.isArray());
+    assertTrue(a.isTuple());
+
+    {
+      assertNotNull(a.mapProjection());
+      List<RequestedColumn> aMembers = a.mapProjection().projections();
+      assertEquals(2, aMembers.size());
+
+      RequestedColumn a_b = aMembers.get(0);
+      assertEquals("b", a_b.name());
+      assertTrue(a_b.isTuple());
+
+      {
+        assertNotNull(a_b.mapProjection());
+        List<RequestedColumn> a_bMembers = a_b.mapProjection().projections();
+        assertEquals(1, a_bMembers.size());
+        assertEquals("c", a_bMembers.get(0).name());
+        assertTrue(a_bMembers.get(0).isSimple());
+      }
+
+      assertEquals("c", aMembers.get(1).name());
+      assertTrue(aMembers.get(1).isSimple());
+    }
+
+    assertEquals("d", cols.get(1).name());
+    assertTrue(cols.get(1).isSimple());
+  }
+
+  @Test
+  public void testMapDups() {
+    try {
+      RequestedTupleImpl.parse(
+          RowSetTestUtils.projectList("a.b", "a.c", "a.b"));
+      fail();
+    } catch (UserException e) {
+      // Expected
+    }
+  }
+
+  /**
+   * When the project list includes references to both the
+   * map as a whole, and members, then the parser is forgiving
+   * of duplicate map members since all members are projected.
+   */
+
+  @Test
+  public void testMapDupsIgnored() {
+    RequestedTuple projSet = RequestedTupleImpl.parse(
+          RowSetTestUtils.projectList("a", "a.b", "a.c", "a.b"));
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(1, cols.size());
+  }
+
+  @Test
+  public void testWildcard() {
+    RequestedTuple projSet = RequestedTupleImpl.parse(
+        RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR));
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(1, cols.size());
+
+    RequestedColumn wildcard = cols.get(0);
+    assertEquals(ProjectionType.WILDCARD, wildcard.type());
+    assertEquals(SchemaPath.DYNAMIC_STAR, wildcard.name());
+    assertTrue(! wildcard.isSimple());
+    assertTrue(wildcard.isWildcard());
+    assertNull(wildcard.mapProjection());
+    assertNull(wildcard.indexes());
+  }
+
+  @Test
+  public void testSimpleDups() {
+    try {
+      RequestedTupleImpl.parse(RowSetTestUtils.projectList("a", "b", "a"));
+      fail();
+    } catch (UserException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testArray() {
+    RequestedTuple projSet = RequestedTupleImpl.parse(
+        RowSetTestUtils.projectList("a[1]", "a[3]"));
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(1, cols.size());
+
+    assertEquals(ProjectionType.ARRAY, projSet.projectionType("a"));
+    RequestedColumn a = cols.get(0);
+    assertEquals("a", a.name());
+    assertTrue(a.isArray());
+    assertFalse(a.isSimple());
+    assertFalse(a.isTuple());
+    boolean indexes[] = a.indexes();
+    assertNotNull(indexes);
+    assertEquals(4, indexes.length);
+    assertFalse(indexes[0]);
+    assertTrue(indexes[1]);
+    assertFalse(indexes[2]);
+    assertTrue(indexes[3]);
+  }
+
+  @Test
+  public void testArrayDups() {
+    try {
+      RequestedTupleImpl.parse(
+          RowSetTestUtils.projectList("a[1]", "a[3]", "a[1]"));
+      fail();
+    } catch (UserException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testArrayAndSimple() {
+    RequestedTuple projSet = RequestedTupleImpl.parse(
+        RowSetTestUtils.projectList("a[1]", "a"));
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(1, cols.size());
+
+    RequestedColumn a = cols.get(0);
+    assertEquals("a", a.name());
+    assertTrue(a.isArray());
+    assertNull(a.indexes());
+  }
+
+  @Test
+  public void testSimpleAndArray() {
+    RequestedTuple projSet = RequestedTupleImpl.parse(
+        RowSetTestUtils.projectList("a", "a[1]"));
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(1, cols.size());
+
+    RequestedColumn a = cols.get(0);
+    assertEquals("a", a.name());
+    assertTrue(a.isArray());
+    assertNull(a.indexes());
+    assertEquals(ProjectionType.ARRAY, projSet.projectionType("a"));
+    assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("foo"));
+  }
+
+  @Test
+  @Ignore("Drill syntax does not support map arrays")
+  public void testMapArray() {
+    RequestedTuple projSet = RequestedTupleImpl.parse(
+        RowSetTestUtils.projectList("a[1].x"));
+    List<RequestedColumn> cols = projSet.projections();
+    assertEquals(1, cols.size());
+
+    assertEquals(ProjectionType.TUPLE_ARRAY, cols.get(0).type());
+    assertEquals(ProjectionType.TUPLE_ARRAY, projSet.projectionType("a"));
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderEmptyProject.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderEmptyProject.java
new file mode 100644
index 00000000000..f32a7e549b3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderEmptyProject.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestResultSetLoaderEmptyProject extends SubOperatorTest {
+
+  /**
+   * Verify that empty projection works: allows skipping rows and
+   * reporting those rows as a batch with no vectors but with the
+   * desired row count.
+   */
+
+  @Test
+  public void testEmptyTopSchema() {
+    List<SchemaPath> selection = Lists.newArrayList();
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    assertTrue(rsLoader.isProjectionEmpty());
+
+    // Can't skip rows if batch not started.
+
+    int rowCount = 100_000;
+    try {
+      rsLoader.skipRows(10);
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Loop to skip 100,000 rows. Should occur in two batches.
+
+    rsLoader.startBatch();
+    int skipped = rsLoader.skipRows(rowCount);
+    assertEquals(skipped, ValueVector.MAX_ROW_COUNT);
+
+    VectorContainer output = rsLoader.harvest();
+    assertEquals(skipped, output.getRecordCount());
+    assertEquals(0, output.getNumberOfColumns());
+    output.zeroVectors();
+
+    // Second batch
+
+    rowCount -= skipped;
+    rsLoader.startBatch();
+    skipped = rsLoader.skipRows(rowCount);
+    assertEquals(skipped, rowCount);
+
+    output = rsLoader.harvest();
+    assertEquals(skipped, output.getRecordCount());
+    assertEquals(0, output.getNumberOfColumns());
+    output.zeroVectors();
+
+    rsLoader.close();
+  }
+
+  /**
+   * Verify that a disjoint schema (projection does not overlap with
+   * table schema) is treated the same as an empty projection.
+   */
+
+  @Test
+  public void testDisjointSchema() {
+    List<SchemaPath> selection = Lists.newArrayList(
+        SchemaPath.getSimplePath("a"),
+        SchemaPath.getSimplePath("b"));
+    TupleMetadata schema = new SchemaBuilder()
+        .add("c", MinorType.INT)
+        .add("d", MinorType.INT)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    assertTrue(rsLoader.isProjectionEmpty());
+    rsLoader.close();
+  }
+
+  /**
+   * Verify that skip rows works even if the the projection is non-empty.
+   */
+
+  @Test
+  public void testNonEmptySchema() {
+    List<SchemaPath> selection = Lists.newArrayList(
+        SchemaPath.getSimplePath("a"),
+        SchemaPath.getSimplePath("b"));
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    assertFalse(rsLoader.isProjectionEmpty());
+
+    // Skip 10 rows. Columns are of required types, so are filled
+    // with zeros.
+
+    rsLoader.startBatch();
+    int rowCount = 10;
+    rsLoader.skipRows(rowCount);
+
+    // Verify
+
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    for (int i = 0; i < rowCount; i++) {
+      builder.addRow(0, 0);
+    }
+    new RowSetComparison(builder.build())
+        .verifyAndClearAll(fixture.wrap(rsLoader.harvest()));
+
+    rsLoader.close();
+  }
+
+  @Test
+  public void testEmptyMapProjection() {
+    List<SchemaPath> selection = Lists.newArrayList();
+    TupleMetadata schema = new SchemaBuilder()
+        .addMap("map")
+          .add("a", MinorType.INT)
+          .add("b", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    assertTrue(rsLoader.isProjectionEmpty());
+
+    // Sanity test to verify row skipping with maps
+
+    int rowCount = 5000;
+    rsLoader.startBatch();
+    int skipped = rsLoader.skipRows(rowCount);
+    assertEquals(skipped, rowCount);
+
+    VectorContainer output = rsLoader.harvest();
+    assertEquals(rowCount, output.getRecordCount());
+    assertEquals(0, output.getNumberOfColumns());
+    output.zeroVectors();
+
+    rsLoader.close();
+  }
+
+
+  /**
+   * Test disjoint projection, but with maps. Project top-level columns
+   * a, b, when those columns actually appear in a map which is not
+   * projected.
+   */
+
+  @Test
+  public void testDisjointMapProjection() {
+    List<SchemaPath> selection = Lists.newArrayList(
+        SchemaPath.getSimplePath("a"),
+        SchemaPath.getSimplePath("b"));
+    TupleMetadata schema = new SchemaBuilder()
+        .addMap("map")
+          .add("a", MinorType.INT)
+          .add("b", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    assertTrue(rsLoader.isProjectionEmpty());
+
+    rsLoader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java
index 3ed1dc2cfb2..653137eef47 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java
@@ -22,9 +22,11 @@
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
+import java.util.Iterator;
 
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -82,6 +84,11 @@ public void testBasics() {
     assertTrue(actualSchema.metadata(1).isMap());
     assertEquals(2, actualSchema.metadata("m").mapSchema().size());
     assertEquals(2, actualSchema.column("m").getChildren().size());
+    TupleWriter mapWriter = rootWriter.array("m").tuple();
+    assertSame(actualSchema.metadata("m").mapSchema(), mapWriter.schema().mapSchema());
+    assertSame(mapWriter.tupleSchema(), mapWriter.schema().mapSchema());
+    assertSame(mapWriter.tupleSchema().metadata(0), mapWriter.scalar(0).schema());
+    assertSame(mapWriter.tupleSchema().metadata(1), mapWriter.scalar(1).schema());
 
     // Write a couple of rows with arrays.
 
@@ -103,6 +110,9 @@ public void testBasics() {
     RepeatedMapVector mapVector = (RepeatedMapVector) actual.container().getValueVector(1).getValueVector();
     MaterializedField mapField = mapVector.getField();
     assertEquals(2, mapField.getChildren().size());
+    Iterator<MaterializedField> iter = mapField.getChildren().iterator();
+    assertTrue(mapWriter.scalar(0).schema().schema().isEquivalent(iter.next()));
+    assertTrue(mapWriter.scalar(1).schema().schema().isEquivalent(iter.next()));
 
     SingleRowSet expected = fixture.rowSetBuilder(schema)
         .addRow(10, mapArray(
@@ -125,7 +135,6 @@ public void testBasics() {
           mapValue(410, "d4.1"),
           mapValue(420, "d4.2")));
 
-    TupleWriter mapWriter = rootWriter.array("m").tuple();
     mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.OPTIONAL));
 
     rootWriter
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java
index a6ba11be49a..98ba3ed6b85 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java
@@ -21,11 +21,14 @@
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Arrays;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
@@ -42,7 +45,6 @@
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
@@ -69,6 +71,7 @@ public void testBasics() {
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertFalse(rsLoader.isProjectionEmpty());
     RowSetLoader rootWriter = rsLoader.writer();
 
     // Verify structure and schema
@@ -102,7 +105,7 @@ public void testBasics() {
     try {
       mWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL));
       fail();
-    } catch (IllegalArgumentException e) {
+    } catch (UserException e) {
       // Expected
     }
 
@@ -115,6 +118,9 @@ public void testBasics() {
     RowSet actual = fixture.wrap(rsLoader.harvest());
     assertEquals(5, rsLoader.schemaVersion());
     assertEquals(2, actual.rowCount());
+    @SuppressWarnings("resource")
+    MapVector mapVector = (MapVector) actual.container().getValueVector(1).getValueVector();
+    assertEquals(2, mapVector.getAccessor().getValueCount());
 
     // Validate data
 
@@ -238,6 +244,8 @@ public void testMapAddition() {
     // Ensure metadata was added
 
     assertTrue(mapWriter.tupleSchema().size() == 1);
+    assertSame(mapWriter.tupleSchema(), mapWriter.schema().mapSchema());
+    assertSame(mapWriter.tupleSchema().metadata(colIndex), mapWriter.scalar(colIndex).schema());
 
     rootWriter
       .addRow(20, mapValue("fred"))
@@ -250,6 +258,8 @@ public void testMapAddition() {
     MapVector mapVector = (MapVector) actual.container().getValueVector(1).getValueVector();
     MaterializedField mapField = mapVector.getField();
     assertEquals(1, mapField.getChildren().size());
+    assertTrue(mapWriter.scalar(colIndex).schema().schema().isEquivalent(
+        mapField.getChildren().iterator().next()));
 
     // Validate first batch
 
@@ -584,8 +594,7 @@ public void testMapWithArray() {
         .addRow(70, mapValue(intArray(710), strArray(), strArray("e7.1", "e7.2")))
         .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(actual);
-
+    RowSetUtilities.verify(expected, actual);
     rsLoader.close();
   }
 
@@ -641,6 +650,16 @@ public void testMapWithOverflow() {
 
     RowSet result = fixture.wrap(rsLoader.harvest());
     assertEquals(expectedCount, result.rowCount());
+
+    // Ensure the odd map vector value count variable is set correctly.
+
+    @SuppressWarnings("resource")
+    MapVector m1Vector = (MapVector) result.container().getValueVector(1).getValueVector();
+    assertEquals(expectedCount, m1Vector.getAccessor().getValueCount());
+    @SuppressWarnings("resource")
+    MapVector m2Vector = (MapVector) m1Vector.getChildByOrdinal(1);
+    assertEquals(expectedCount, m2Vector.getAccessor().getValueCount());
+
     result.clear();
 
     // Next batch should start with the overflow row
@@ -837,6 +856,7 @@ public void testNameSpace() {
         .setSchema(schema)
         .build();
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertFalse(rsLoader.isProjectionEmpty());
     RowSetLoader rootWriter = rsLoader.writer();
 
     rsLoader.startBatch();
@@ -877,7 +897,7 @@ public void testNameSpace() {
         .addRow(31, mapValue(32, mapValue(33)))
         .build();
 
-    new RowSetComparison(expected).verifyAndClearAll(actual);
+    RowSetUtilities.verify(expected, actual);
     rsLoader.close();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
index 7372e14c84f..57c1182e9ab 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
@@ -217,6 +217,52 @@ public void testMapProjection() {
     rsLoader.close();
   }
 
+  @Test
+  public void testMapProjectionMemberAndMap() {
+    List<SchemaPath> selection = RowSetTestUtils.projectList("m1", "m1.b");
+    TupleMetadata schema = new SchemaBuilder()
+        .addMap("m1")
+          .add("a", MinorType.INT)
+          .add("b", MinorType.INT)
+          .resumeSchema()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Verify the projected columns
+
+    TupleMetadata actualSchema = rootWriter.tupleSchema();
+    ColumnMetadata m1Md = actualSchema.metadata("m1");
+    assertTrue(m1Md.isMap());
+    assertTrue(m1Md.isProjected());
+    assertEquals(2, m1Md.mapSchema().size());
+    assertTrue(m1Md.mapSchema().metadata("a").isProjected());
+    assertTrue(m1Md.mapSchema().metadata("b").isProjected());
+
+    // Write a couple of rows.
+
+    rsLoader.startBatch();
+    rootWriter.start();
+    rootWriter
+      .addSingleCol(mapValue( 1,  2))
+      .addSingleCol(mapValue(11, 12));
+
+    // Verify. The whole map appears in the result set because the
+    // project list included the whole map as well as a map member.
+
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+      .addSingleCol(mapValue( 1,  2))
+      .addSingleCol(mapValue(11, 12))
+      .build();
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(rsLoader.harvest()));
+    rsLoader.close();
+  }
+
   /**
    * Test a map array. Use the convenience methods to set values.
    * Only the projected array members should appear in the harvested
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java
index 30c20d7bd5e..4323a6b265c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java
@@ -28,6 +28,7 @@
 
 import java.util.Arrays;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
@@ -80,6 +81,7 @@ public void testBasics() {
     assertEquals(0, rsLoader.writer().rowCount());
     assertEquals(0, rsLoader.batchCount());
     assertEquals(0, rsLoader.totalRowCount());
+    assertTrue(rsLoader.isProjectionEmpty());
 
     // Failures due to wrong state (Start)
 
@@ -98,6 +100,7 @@ public void testBasics() {
 
     MaterializedField fieldA = SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REQUIRED);
     rootWriter.addColumn(fieldA);
+    assertFalse(rsLoader.isProjectionEmpty());
 
     assertEquals(1, schema.size());
     assertTrue(fieldA.isEquivalent(schema.column(0)));
@@ -362,14 +365,14 @@ public void testCaseInsensitiveSchema() {
     try {
       rootWriter.addColumn(colSchema);
       fail();
-    } catch(IllegalArgumentException e) {
+    } catch(UserException e) {
       // Expected
     }
     try {
       MaterializedField testCol = SchemaBuilder.columnSchema("A", MinorType.VARCHAR, DataMode.REQUIRED);
       rootWriter.addColumn(testCol);
       fail();
-    } catch (IllegalArgumentException e) {
+    } catch (UserException e) {
       // Expected
       assertTrue(e.getMessage().contains("Duplicate"));
     }
@@ -382,6 +385,7 @@ public void testCaseInsensitiveSchema() {
 
     MaterializedField col2 = SchemaBuilder.columnSchema("b", MinorType.VARCHAR, DataMode.REQUIRED);
     rootWriter.addColumn(col2);
+    assertEquals(2, rsLoader.schemaVersion());
     assertTrue(col2.isEquivalent(schema.column(1)));
     ColumnMetadata col2Metadata = schema.metadata(1);
     assertSame(col2Metadata, schema.metadata("b"));
@@ -401,6 +405,7 @@ public void testCaseInsensitiveSchema() {
 
     MaterializedField col3 = SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.REQUIRED);
     rootWriter.addColumn(col3);
+    assertEquals(3, rsLoader.schemaVersion());
     assertTrue(col3.isEquivalent(schema.column(2)));
     ColumnMetadata col3Metadata = schema.metadata(2);
     assertSame(col3Metadata, schema.metadata("c"));
@@ -412,6 +417,7 @@ public void testCaseInsensitiveSchema() {
 
     MaterializedField col4 = SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL);
     rootWriter.addColumn(col4);
+    assertEquals(4, rsLoader.schemaVersion());
     assertTrue(col4.isEquivalent(schema.column(3)));
     ColumnMetadata col4Metadata = schema.metadata(3);
     assertSame(col4Metadata, schema.metadata("d"));
@@ -423,6 +429,7 @@ public void testCaseInsensitiveSchema() {
 
     MaterializedField col5 = SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REPEATED);
     rootWriter.addColumn(col5);
+    assertEquals(5, rsLoader.schemaVersion());
     assertTrue(col5.isEquivalent(schema.column(4)));
     ColumnMetadata col5Metadata = schema.metadata(4);
     assertSame(col5Metadata, schema.metadata("e"));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultVectorCache.java
new file mode 100644
index 00000000000..6232564270f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultVectorCache.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.junit.Test;
+
+public class TestResultVectorCache extends SubOperatorTest {
+
+  @Test
+  public void testIsPromotable() {
+
+    MaterializedField required = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.INT)
+          .setMode(DataMode.REQUIRED)
+          .build());
+
+    // Type is promotable to itself
+
+    assertTrue(required.isPromotableTo(required, true));
+    assertTrue(required.isPromotableTo(required, false));
+
+    // Required is promotable to null
+
+    MaterializedField nullable = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.INT)
+          .setMode(DataMode.OPTIONAL)
+          .build());
+
+    assertTrue(required.isPromotableTo(nullable, true));
+    assertFalse(required.isPromotableTo(nullable, false));
+
+    // Nullable not promotable to required
+
+    assertFalse(nullable.isPromotableTo(required, true));
+
+    // Arrays cannot be promoted to/from other types
+
+    MaterializedField repeated = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.INT)
+          .setMode(DataMode.REPEATED)
+          .build());
+
+    assertFalse(required.isPromotableTo(repeated, true));
+    assertFalse(nullable.isPromotableTo(repeated, true));
+    assertFalse(repeated.isPromotableTo(required, true));
+    assertFalse(repeated.isPromotableTo(nullable, true));
+
+    // Narrower precision promotable to wider
+
+    MaterializedField narrow = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.REQUIRED)
+          .setPrecision(10)
+          .build());
+    MaterializedField wide = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.REQUIRED)
+          .setPrecision(20)
+          .build());
+    MaterializedField unset = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.REQUIRED)
+          .build());
+
+    assertTrue(narrow.isPromotableTo(wide, false));
+    assertTrue(unset.isPromotableTo(narrow, false));
+    assertTrue(unset.isPromotableTo(wide, false));
+    assertFalse(wide.isPromotableTo(narrow, false));
+    assertFalse(narrow.isPromotableTo(unset, false));
+  }
+
+  @SuppressWarnings("resource")
+  @Test
+  public void testBasics() {
+    ResultVectorCache cache = new ResultVectorCacheImpl(fixture.allocator());
+
+    // Create a vector
+
+    MaterializedField required = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.INT)
+          .setMode(DataMode.REQUIRED)
+          .build());
+    ValueVector vector1 = cache.addOrGet(required);
+    assertTrue(vector1.getField().isEquivalent(required));
+
+    // Request the same schema, should get the same vector.
+
+    ValueVector vector2 = cache.addOrGet(required);
+    assertSame(vector1, vector2);
+
+    // Non-permissive. Change in mode means different vector.
+
+    MaterializedField optional = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.INT)
+          .setMode(DataMode.OPTIONAL)
+          .build());
+    ValueVector vector3 = cache.addOrGet(optional);
+    assertTrue(vector3.getField().isEquivalent(optional));
+    assertNotSame(vector1, vector3);
+
+    // Asking for the required type again produces a new vector.
+    // Name is the key, and we can have only one type associated
+    // with each name.
+
+    ValueVector vector4 = cache.addOrGet(required);
+    assertTrue(vector4.getField().isEquivalent(required));
+    assertNotSame(vector3, vector4);
+    assertNotSame(vector1, vector4);
+
+    // Varchar, no precision.
+
+    MaterializedField varchar1 = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.REQUIRED)
+          .build());
+
+    ValueVector vector5 = cache.addOrGet(varchar1);
+    assertTrue(vector5.getField().isEquivalent(varchar1));
+
+    // Varchar, with precision, no match.
+
+    MaterializedField varchar2 = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.REQUIRED)
+          .setPrecision(10)
+          .build());
+
+    ValueVector vector6 = cache.addOrGet(varchar2);
+    assertTrue(vector6.getField().isEquivalent(varchar2));
+    assertNotSame(vector5, vector6);
+
+    // Does match if same precision.
+
+    ValueVector vector7 = cache.addOrGet(varchar2);
+    assertTrue(vector7.getField().isEquivalent(varchar2));
+    assertSame(vector6, vector7);
+
+    // Different names have different types
+
+    MaterializedField varchar3 = MaterializedField.create("b",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.REQUIRED)
+          .setPrecision(10)
+          .build());
+
+    ValueVector vector8 = cache.addOrGet(varchar3);
+    assertTrue(vector8.getField().isEquivalent(varchar3));
+    assertSame(vector7, cache.addOrGet(varchar2));
+    assertSame(vector8, cache.addOrGet(varchar3));
+
+    ((ResultVectorCacheImpl) cache).close();
+  }
+
+  @SuppressWarnings("resource")
+  @Test
+  public void testPermissive() {
+    ResultVectorCache cache = new ResultVectorCacheImpl(fixture.allocator(), true);
+
+    // Create a nullable vector
+
+    MaterializedField optional = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.INT)
+          .setMode(DataMode.OPTIONAL)
+          .build());
+    ValueVector vector1 = cache.addOrGet(optional);
+
+    // Ask for a required version of the same name and type.
+    // Should return the nullable version.
+
+    MaterializedField required = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.INT)
+          .setMode(DataMode.REQUIRED)
+          .build());
+    ValueVector vector2 = cache.addOrGet(required);
+    assertTrue(vector2.getField().isEquivalent(optional));
+    assertSame(vector1, vector2);
+
+    // Repeat with Varchar
+
+    MaterializedField varchar1 = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.OPTIONAL)
+          .build());
+
+    ValueVector vector3 = cache.addOrGet(varchar1);
+
+    MaterializedField varchar2 = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.REQUIRED)
+          .build());
+
+    ValueVector vector4 = cache.addOrGet(varchar2);
+    assertSame(vector3, vector4);
+
+    // Larger precision. Needs new vector.
+
+    MaterializedField varchar3 = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.OPTIONAL)
+          .setPrecision(10)
+          .build());
+
+    ValueVector vector5 = cache.addOrGet(varchar3);
+    assertTrue(vector5.getField().isEquivalent(varchar3));
+    assertNotSame(vector4, vector5);
+
+    // Smaller precision, reuse vector.
+
+    ValueVector vector6 = cache.addOrGet(varchar1);
+    assertTrue(vector6.getField().isEquivalent(varchar3));
+    assertSame(vector5, vector6);
+
+    // Same precision, required: reuse vector.
+
+    MaterializedField varchar4 = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.VARCHAR)
+          .setMode(DataMode.REQUIRED)
+          .setPrecision(5)
+          .build());
+
+    ValueVector vector7 = cache.addOrGet(varchar4);
+    assertTrue(vector7.getField().isEquivalent(varchar3));
+    assertSame(vector5, vector7);
+
+    // TODO: Repeat with decimal precision and scale.
+
+    ((ResultVectorCacheImpl) cache).close();
+  }
+
+  @SuppressWarnings("resource")
+  @Test
+  public void testClose() {
+    ResultVectorCache cache = new ResultVectorCacheImpl(fixture.allocator());
+
+    // Create a vector
+
+    MaterializedField required = MaterializedField.create("a",
+        MajorType.newBuilder()
+          .setMinorType(MinorType.INT)
+          .setMode(DataMode.REQUIRED)
+          .build());
+    IntVector vector1 = (IntVector) cache.addOrGet(required);
+    vector1.allocateNew(100);
+
+    // Close the cache. Note: close is on the implementation, not
+    // the interface, because only the implementation should decide
+    // when to close.
+
+    // Close should release the allocated vector. If not, then
+    // this test suite will fail with a memory leak when shutting
+    // down the root allocator.
+
+    ((ResultVectorCacheImpl) cache).close();
+    assertEquals(0, vector1.getBuffer().capacity());
+  }
+}
diff --git a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
index f818c236f62..02e1c279071 100644
--- a/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
+++ b/exec/vector/src/main/codegen/templates/BasicTypeHelper.java
@@ -67,12 +67,6 @@ public static int getSize(MajorType major) {
     throw new UnsupportedOperationException(buildErrorMessage("get size", major));
   }
 
-  public static ValueVector getNewVector(String name, BufferAllocator allocator, MajorType type, CallBack callback){
-    MaterializedField field = MaterializedField.create(name, type);
-    return getNewVector(field, allocator, callback);
-  }
-  
-  
   public static Class<? extends ValueVector> getValueVectorClass(MinorType type, DataMode mode){
     switch (type) {
     case UNION:
@@ -233,11 +227,23 @@ public static ValueVector getNewVector(String name, BufferAllocator allocator, M
       throw new UnsupportedOperationException(buildErrorMessage("get holder reader implementation", type, mode));
   }
   
+  public static ValueVector getNewVector(String name, BufferAllocator allocator, MajorType type, CallBack callback) {
+    MaterializedField field = MaterializedField.create(name, type);
+    return getNewVector(field, allocator, callback);
+  }
+  
   public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator){
     return getNewVector(field, allocator, null);
   }
-  public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
-    MajorType type = field.getType();
+  
+  public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
+    return getNewVector(field, field.getType(), allocator, callBack);
+  }
+  
+  // Creates an internal or external vector. Internal vectors may have
+  // types that disagree with their materialized field.
+  
+  public static ValueVector getNewVector(MaterializedField field, MajorType type, BufferAllocator allocator, CallBack callBack) {
 
     switch (type.getMinorType()) {
     
@@ -589,5 +595,4 @@ public static MajorType getType(${dataMode}${minor.class}Holder holder) {
   </#if>
   </#list>
   </#list>
-
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
index 915c9941d89..82e90e9cec3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
@@ -22,7 +22,6 @@
 
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.ProjectionType;
-import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
@@ -181,19 +180,21 @@ public static TupleObjectWriter buildMap(ColumnMetadata schema, MapVector vector
   }
 
   public static ArrayObjectWriter buildMapArray(ColumnMetadata schema,
-      UInt4Vector offsetVector,
+      RepeatedMapVector mapVector,
       List<AbstractObjectWriter> writers) {
     MapWriter mapWriter;
     if (schema.isProjected()) {
+      assert mapVector != null;
       mapWriter = new ArrayMapWriter(schema, writers);
     } else {
+      assert mapVector == null;
       mapWriter = new DummyArrayMapWriter(schema, writers);
     }
     TupleObjectWriter mapArray = new TupleObjectWriter(mapWriter);
     AbstractArrayWriter arrayWriter;
     if (schema.isProjected()) {
       arrayWriter = new ObjectArrayWriter(schema,
-          offsetVector,
+          mapVector.getOffsetVector(),
           mapArray);
     } else  {
       arrayWriter = new DummyArrayWriter(schema, mapArray);
@@ -206,8 +207,7 @@ public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema,
       List<AbstractObjectWriter> writers) {
     if (schema.isArray()) {
       return MapWriter.buildMapArray(schema,
-          vector == null ? null :
-          ((RepeatedMapVector) vector).getOffsetVector(), writers);
+          (RepeatedMapVector) vector, writers);
     } else {
       return MapWriter.buildMap(schema, (MapVector) vector, writers);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services