You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:41 UTC

[14/15] drill git commit: DRILL-5657: Size-aware vector writer structure

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
new file mode 100644
index 0000000..b875e7e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
@@ -0,0 +1,775 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.Collection;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.TupleState.RowState;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+
+/**
+ * Implementation of the result set loader.
+ * @see {@link ResultSetLoader}
+ */
+
+public class ResultSetLoaderImpl implements ResultSetLoader {
+
+  /**
+   * Read-only set of options for the result set loader.
+   */
+
+  public static class ResultSetOptions {
+    public final int vectorSizeLimit;
+    public final int rowCountLimit;
+    public final ResultVectorCache vectorCache;
+    public final Collection<SchemaPath> projection;
+    public final TupleMetadata schema;
+    public final long maxBatchSize;
+
+    public ResultSetOptions() {
+      vectorSizeLimit = ValueVector.MAX_BUFFER_SIZE;
+      rowCountLimit = DEFAULT_ROW_COUNT;
+      projection = null;
+      vectorCache = null;
+      schema = null;
+      maxBatchSize = -1;
+    }
+
+    public ResultSetOptions(OptionBuilder builder) {
+      this.vectorSizeLimit = builder.vectorSizeLimit;
+      this.rowCountLimit = builder.rowCountLimit;
+      this.projection = builder.projection;
+      this.vectorCache = builder.vectorCache;
+      this.schema = builder.schema;
+      this.maxBatchSize = builder.maxBatchSize;
+    }
+
+    public void dump(HierarchicalFormatter format) {
+      format
+        .startObject(this)
+        .attribute("vectorSizeLimit", vectorSizeLimit)
+        .attribute("rowCountLimit", rowCountLimit)
+        .attribute("projection", projection)
+        .endObject();
+    }
+  }
+
+  private enum State {
+    /**
+     * Before the first batch.
+     */
+
+    START,
+
+    /**
+     * Writing to a batch normally.
+     */
+
+    ACTIVE,
+
+    /**
+     * Batch overflowed a vector while writing. Can continue
+     * to write to a temporary "overflow" batch until the
+     * end of the current row.
+     */
+
+    OVERFLOW,
+
+    /**
+     * Temporary state to avoid batch-size related overflow while
+     * an overflow is in progress.
+     */
+
+    IN_OVERFLOW,
+
+    /**
+     * Batch is full due to reaching the row count limit
+     * when saving a row.
+     * No more writes allowed until harvesting the current batch.
+     */
+
+    FULL_BATCH,
+
+    /**
+     * Current batch was harvested: data is gone. No lookahead
+     * batch exists.
+     */
+
+    HARVESTED,
+
+    /**
+     * Current batch was harvested and its data is gone. However,
+     * overflow occurred during that batch and the data exists
+     * in the overflow vectors.
+     * <p>
+     * This state needs special consideration. The column writer
+     * structure maintains its state (offsets, etc.) from the OVERFLOW
+     * state, but the buffers currently in the vectors are from the
+     * complete batch. <b>No writes can be done in this state!</b>
+     * The writer state does not match the data in the buffers.
+     * The code here does what it can to catch this state. But, if
+     * some client tries to write to a column writer in this state,
+     * bad things will happen. Doing so is invalid (the write is outside
+     * of a batch), so this is not a terrible restriction.
+     * <p>
+     * Said another way, the current writer state is invalid with respect
+     * to the active buffers, but only if the writers try to act on the
+     * buffers. Since the writers won't do so, this temporary state is
+     * fine. The correct buffers are restored once a new batch is started
+     * and the state moves to ACTIVE.
+     */
+
+    LOOK_AHEAD,
+
+    /**
+     * Mutator is closed: no more operations are allowed.
+     */
+
+    CLOSED
+  }
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResultSetLoaderImpl.class);
+
+  /**
+   * Options provided to this loader.
+   */
+
+  private final ResultSetOptions options;
+
+  /**
+   * Allocator for vectors created by this loader.
+   */
+
+  final BufferAllocator allocator;
+
+  /**
+   * Internal structure used to work with the vectors (real or dummy) used
+   * by this loader.
+   */
+
+  final RowState rootState;
+
+  /**
+   * Top-level writer index that steps through the rows as they are written.
+   * When an overflow batch is in effect, indexes into that batch instead.
+   * Since a batch is really a tree of tuples, in which some branches of
+   * the tree are arrays, the root indexes here feeds into array indexes
+   * within the writer structure that points to the current position within
+   * an array column.
+   */
+
+  private final WriterIndexImpl writerIndex;
+
+  /**
+   * The row-level writer for stepping through rows as they are written,
+   * and for accessing top-level columns.
+   */
+
+  private final RowSetLoaderImpl rootWriter;
+
+  /**
+   * Vector cache for this loader.
+   * @see {@link OptionBuilder#setVectorCache()}.
+   */
+
+  private final ResultVectorCache vectorCache;
+
+  /**
+   * Tracks the state of the row set loader. Handling vector overflow requires
+   * careful stepping through a variety of states as the write proceeds.
+   */
+
+  private State state = State.START;
+
+  /**
+   * Track the current schema as seen by the writer. Each addition of a column
+   * anywhere in the schema causes the active schema version to increase by one.
+   * This allows very easy checks for schema changes: save the prior version number
+   * and compare it against the current version number.
+   */
+
+  private int activeSchemaVersion;
+
+  /**
+   * Track the current schema as seen by the consumer of the batches that this
+   * loader produces. The harvest schema version can be behind the active schema
+   * version in the case in which new columns are added to the overflow row.
+   * Since the overflow row won't be visible to the harvested batch, that batch
+   * sees the schema as it existed at a prior version: the harvest schema
+   * version.
+   */
+
+  private int harvestSchemaVersion;
+
+  /**
+   * Builds the harvest vector container that includes only the columns that
+   * are included in the harvest schema version. That is, it excludes columns
+   * added while writing the overflow row.
+   */
+
+  private VectorContainerBuilder containerBuilder;
+
+  /**
+   * Counts the batches harvested (sent downstream) from this loader. Does
+   * not include the current, in-flight batch.
+   */
+
+  private int harvestBatchCount;
+
+  /**
+   * Counts the rows included in previously-harvested batches. Does not
+   * include the number of rows in the current batch.
+   */
+
+  private int previousRowCount;
+
+  /**
+   * Number of rows in the harvest batch. If an overflow batch is in effect,
+   * then this is the number of rows in the "main" batch before the overflow;
+   * that is the number of rows in the batch that will be harvested. If no
+   * overflow row is in effect, then this number is undefined (and should be
+   * zero.)
+   */
+
+  private int pendingRowCount;
+
+  /**
+   * The number of rows per batch. Starts with the configured amount. Can be
+   * adjusted between batches, perhaps based on the actual observed size of
+   * input data.
+   */
+
+  private int targetRowCount;
+
+  /**
+   * Total bytes allocated to the current batch.
+   */
+
+  protected int accumulatedBatchSize;
+
+  protected final ProjectionSet projectionSet;
+
+  public ResultSetLoaderImpl(BufferAllocator allocator, ResultSetOptions options) {
+    this.allocator = allocator;
+    this.options = options;
+    targetRowCount = options.rowCountLimit;
+    writerIndex = new WriterIndexImpl(this);
+
+    if (options.vectorCache == null) {
+      vectorCache = new NullResultVectorCacheImpl(allocator);
+    } else {
+      vectorCache = options.vectorCache;
+    }
+
+    // If projection, build the projection map.
+
+    projectionSet = ProjectionSetImpl.parse(options.projection);
+
+    // Build the row set model depending on whether a schema is provided.
+
+    rootState = new RowState(this);
+    rootWriter = rootState.rootWriter();
+
+    // If no schema, columns will be added incrementally as they
+    // are discovered. Start with an empty model.
+
+    if (options.schema != null) {
+
+      // Schema provided. Populate a model (and create vectors) for the
+      // provided schema. The schema can be extended later, but normally
+      // won't be if known up front.
+
+      logger.debug("Schema: " + options.schema.toString());
+      rootState.buildSchema(options.schema);
+    }
+  }
+
+  private void updateCardinality() {
+    rootState.updateCardinality(targetRowCount());
+  }
+
+  public ResultSetLoaderImpl(BufferAllocator allocator) {
+    this(allocator, new ResultSetOptions());
+  }
+
+  public BufferAllocator allocator() { return allocator; }
+
+  protected int bumpVersion() {
+
+    // Update the active schema version. We cannot update the published
+    // schema version at this point because a column later in this same
+    // row might cause overflow, and any new columns in this row will
+    // be hidden until a later batch. But, if we are between batches,
+    // then it is fine to add the column to the schema.
+
+    activeSchemaVersion++;
+    switch (state) {
+    case HARVESTED:
+    case START:
+    case LOOK_AHEAD:
+      harvestSchemaVersion = activeSchemaVersion;
+      break;
+    default:
+      break;
+
+    }
+    return activeSchemaVersion;
+  }
+
+  @Override
+  public int schemaVersion() { return harvestSchemaVersion; }
+
+  @Override
+  public void startBatch() {
+    switch (state) {
+    case HARVESTED:
+    case START:
+      logger.trace("Start batch");
+      accumulatedBatchSize = 0;
+      updateCardinality();
+      rootState.startBatch();
+      checkInitialAllocation();
+
+      // The previous batch ended without overflow, so start
+      // a new batch, and reset the write index to 0.
+
+      writerIndex.reset();
+      rootWriter.startWrite();
+      break;
+
+    case LOOK_AHEAD:
+
+      // A row overflowed so keep the writer index at its current value
+      // as it points to the second row in the overflow batch. However,
+      // the last write position of each writer must be restored on
+      // a column-by-column basis, which is done by the visitor.
+
+      logger.trace("Start batch after overflow");
+      rootState.startBatch();
+
+      // Note: no need to do anything with the writers; they were left
+      // pointing to the correct positions in the look-ahead batch.
+      // The above simply puts the look-ahead vectors back "under"
+      // the writers.
+
+      break;
+
+    default:
+      throw new IllegalStateException("Unexpected state: " + state);
+    }
+
+    // Update the visible schema with any pending overflow batch
+    // updates.
+
+    harvestSchemaVersion = activeSchemaVersion;
+    pendingRowCount = 0;
+    state = State.ACTIVE;
+  }
+
+  @Override
+  public RowSetLoader writer() {
+    if (state == State.CLOSED) {
+      throw new IllegalStateException("Unexpected state: " + state);
+    }
+    return rootWriter;
+  }
+
+  @Override
+  public ResultSetLoader setRow(Object... values) {
+    startRow();
+    writer().setTuple(values);
+    saveRow();
+    return this;
+  }
+
+  /**
+   * Called before writing a new row. Implementation of
+   * {@link RowSetLoader#start()}.
+   */
+
+  protected void startRow() {
+    switch (state) {
+    case ACTIVE:
+
+      // Update the visible schema with any pending overflow batch
+      // updates.
+
+      harvestSchemaVersion = activeSchemaVersion;
+      rootWriter.startRow();
+      break;
+    default:
+      throw new IllegalStateException("Unexpected state: " + state);
+    }
+  }
+
+  /**
+   * Finalize the current row. Implementation of
+   * {@link RowSetLoader#save()}.
+   */
+
+  protected void saveRow() {
+    switch (state) {
+    case ACTIVE:
+      rootWriter.endArrayValue();
+      rootWriter.saveRow();
+      if (! writerIndex.next()) {
+        state = State.FULL_BATCH;
+      }
+
+      // No overflow row. Advertise the schema version to the client.
+
+      harvestSchemaVersion = activeSchemaVersion;
+      break;
+
+    case OVERFLOW:
+
+      // End the value of the look-ahead row in the look-ahead vectors.
+
+      rootWriter.endArrayValue();
+      rootWriter.saveRow();
+
+      // Advance the writer index relative to the look-ahead batch.
+
+      writerIndex.next();
+
+      // Stay in the overflow state. Doing so will cause the writer
+      // to report that it is full.
+      //
+      // Also, do not change the harvest schema version. We will
+      // expose to the downstream operators the schema in effect
+      // at the start of the row. Columns added within the row won't
+      // appear until the next batch.
+
+      break;
+
+    default:
+      throw new IllegalStateException("Unexpected state: " + state);
+    }
+  }
+
+  /**
+   * Implementation of {@link RowSetLoader#isFull()}
+   * @return true if the batch is full (reached vector capacity or the
+   * row count limit), false if more rows can be added
+   */
+
+  protected boolean isFull() {
+    switch (state) {
+    case ACTIVE:
+      return ! writerIndex.valid();
+    case OVERFLOW:
+    case FULL_BATCH:
+      return true;
+    default:
+      return false;
+    }
+  }
+
+  @Override
+  public boolean writeable() {
+    return state == State.ACTIVE || state == State.OVERFLOW;
+  }
+
+  private boolean isBatchActive() {
+    return state == State.ACTIVE || state == State.OVERFLOW ||
+           state == State.FULL_BATCH ;
+  }
+
+  /**
+   * Implementation for {#link {@link RowSetLoader#rowCount()}.
+   *
+   * @return the number of rows to be sent downstream for this
+   * batch. Does not include the overflow row.
+   */
+
+  protected int rowCount() {
+    switch (state) {
+    case ACTIVE:
+    case FULL_BATCH:
+      return writerIndex.size();
+    case OVERFLOW:
+      return pendingRowCount;
+    default:
+      return 0;
+    }
+  }
+
+  protected WriterIndexImpl writerIndex() { return writerIndex; }
+
+  @Override
+  public void setTargetRowCount(int rowCount) {
+    targetRowCount = Math.max(1, rowCount);
+  }
+
+  @Override
+  public int targetRowCount() { return targetRowCount; }
+
+  @Override
+  public int targetVectorSize() { return options.vectorSizeLimit; }
+
+  protected void overflowed() {
+    logger.trace("Vector overflow");
+
+    // If we see overflow when we are already handling overflow, it means
+    // that a single value is too large to fit into an entire vector.
+    // Fail the query.
+    //
+    // Note that this is a judgment call. It is possible to allow the
+    // vector to double beyond the limit, but that will require a bit
+    // of thought to get right -- and, of course, completely defeats
+    // the purpose of limiting vector size to avoid memory fragmentation...
+    //
+    // Individual columns handle the case in which overflow occurs on the
+    // first row of the main batch. This check handles the pathological case
+    // in which we successfully overflowed, but then another column
+    // overflowed during the overflow row -- that indicates that that one
+    // column can't fit in an empty vector. That is, this check is for a
+    // second-order overflow.
+
+    if (state == State.OVERFLOW) {
+      throw UserException
+          .memoryError("A single column value is larger than the maximum allowed size of 16 MB")
+          .build(logger);
+    }
+    if (state != State.ACTIVE) {
+      throw new IllegalStateException("Unexpected state: " + state);
+    }
+    state = State.IN_OVERFLOW;
+
+    // Preserve the number of rows in the now-complete batch.
+
+    pendingRowCount = writerIndex.vectorIndex();
+
+    // Roll-over will allocate new vectors. Update with the latest
+    // array cardinality.
+
+    updateCardinality();
+
+//    rootWriter.dump(new HierarchicalPrinter());
+
+    // Wrap up the completed rows into a batch. Sets
+    // vector value counts. The rollover data still exists so
+    // it can be moved, but it is now past the recorded
+    // end of the vectors (though, obviously, not past the
+    // physical end.)
+
+    rootWriter.preRollover();
+
+    // Roll over vector values.
+
+    accumulatedBatchSize = 0;
+    rootState.rollover();
+
+    // Adjust writer state to match the new vector values. This is
+    // surprisingly easy if we not that the current row is shifted to
+    // the 0 position in the new vector, so we just shift all offsets
+    // downward by the current row position at each repeat level.
+
+    rootWriter.postRollover();
+
+    // The writer index is reset back to 0. Because of the above roll-over
+    // processing, some vectors may now already have values in the 0 slot.
+    // However, the vector that triggered overflow has not yet written to
+    // the current record, and so will now write to position 0. After the
+    // completion of the row, all 0-position values should be written (or
+    // at least those provided by the client.)
+    //
+    // For arrays, the writer might have written a set of values
+    // (v1, v2, v3), and v4 might have triggered the overflow. In this case,
+    // the array values have been moved, offset vectors adjusted, the
+    // element writer adjusted, so that v4 will be written to index 3
+    // to produce (v1, v2, v3, v4, v5, ...) in the look-ahead vector.
+
+    writerIndex.rollover();
+    checkInitialAllocation();
+
+    // Remember that overflow is in effect.
+
+    state = State.OVERFLOW;
+  }
+
+  protected boolean hasOverflow() { return state == State.OVERFLOW; }
+
+  @Override
+  public VectorContainer harvest() {
+    int rowCount;
+    switch (state) {
+    case ACTIVE:
+    case FULL_BATCH:
+      rowCount = harvestNormalBatch();
+      logger.trace("Harvesting {} rows", rowCount);
+      break;
+    case OVERFLOW:
+      rowCount = harvestOverflowBatch();
+      logger.trace("Harvesting {} rows after overflow", rowCount);
+      break;
+    default:
+      throw new IllegalStateException("Unexpected state: " + state);
+    }
+
+    // Build the output container
+
+    VectorContainer container = outputContainer();
+    container.setRecordCount(rowCount);
+
+    // Finalize: update counts, set state.
+
+    harvestBatchCount++;
+    previousRowCount += rowCount;
+    return container;
+  }
+
+  private int harvestNormalBatch() {
+
+    // Wrap up the vectors: final fill-in, set value count, etc.
+
+    rootWriter.endBatch();
+    harvestSchemaVersion = activeSchemaVersion;
+    state = State.HARVESTED;
+    return writerIndex.size();
+  }
+
+  private int harvestOverflowBatch() {
+    rootState.harvestWithLookAhead();
+    state = State.LOOK_AHEAD;
+    return pendingRowCount;
+  }
+
+  @Override
+  public VectorContainer outputContainer() {
+    // Build the output container.
+
+    if (containerBuilder == null) {
+      containerBuilder = new VectorContainerBuilder(this);
+    }
+    containerBuilder.update(harvestSchemaVersion);
+    return containerBuilder.container();
+  }
+
+  @Override
+  public TupleMetadata harvestSchema() {
+    return containerBuilder.schema();
+  }
+
+  @Override
+  public void close() {
+    if (state == State.CLOSED) {
+      return;
+    }
+    rootState.close();
+
+    // Do not close the vector cache; the caller owns that and
+    // will, presumably, reuse those vectors for another writer.
+
+    state = State.CLOSED;
+  }
+
+  @Override
+  public int batchCount() {
+    return harvestBatchCount + (rowCount() == 0 ? 0 : 1);
+  }
+
+  @Override
+  public int totalRowCount() {
+    int total = previousRowCount;
+    if (isBatchActive()) {
+      total += pendingRowCount + writerIndex.size();
+    }
+    return total;
+  }
+
+  public ResultVectorCache vectorCache() { return vectorCache; }
+  public RowState rootState() { return rootState; }
+
+  /**
+   * Return whether a vector within the current batch can expand. Limits
+   * are enforce only if a limit was provided in the options.
+   *
+   * @param delta increase in vector size
+   * @return true if the vector can expand, false if an overflow
+   * event should occur
+   */
+
+  public boolean canExpand(int delta) {
+    accumulatedBatchSize += delta;
+    return state == State.IN_OVERFLOW ||
+           options.maxBatchSize <= 0 ||
+           accumulatedBatchSize <= options.maxBatchSize;
+  }
+
+  /**
+   * Accumulate the initial vector allocation sizes.
+   *
+   * @param allocationBytes number of bytes allocated to a vector
+   * in the batch setup step
+   */
+
+  public void tallyAllocations(int allocationBytes) {
+    accumulatedBatchSize += allocationBytes;
+  }
+
+  /**
+   * Log and check the initial vector allocation. If a batch size
+   * limit is set, warn if the initial allocation exceeds the limit.
+   * This will occur if the target row count is incorrect for the
+   * data size.
+   */
+
+  private void checkInitialAllocation() {
+    if (options.maxBatchSize < 0) {
+      logger.debug("Initial vector allocation: {}, no batch limit specified",
+          accumulatedBatchSize);
+    }
+    else if (accumulatedBatchSize > options.maxBatchSize) {
+      logger.warn("Initial vector allocation: {}, but batch size limit is: {}",
+          accumulatedBatchSize, options.maxBatchSize);
+    } else {
+      logger.debug("Initial vector allocation: {}, batch size limit: {}",
+          accumulatedBatchSize, options.maxBatchSize);
+    }
+  }
+
+  public void dump(HierarchicalFormatter format) {
+    format
+      .startObject(this)
+      .attribute("options");
+    options.dump(format);
+    format
+      .attribute("index", writerIndex.vectorIndex())
+      .attribute("state", state)
+      .attribute("activeSchemaVersion", activeSchemaVersion)
+      .attribute("harvestSchemaVersion", harvestSchemaVersion)
+      .attribute("pendingRowCount", pendingRowCount)
+      .attribute("targetRowCount", targetRowCount)
+      ;
+    format.attribute("root");
+    rootState.dump(format);
+    format.attribute("rootWriter");
+    rootWriter.dump(format);
+    format.endObject();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java
new file mode 100644
index 0000000..c7288b2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Manages an inventory of value vectors used across row batch readers.
+ * Drill semantics for batches is complex. Each operator logically returns
+ * a batch of records on each call of the Drill Volcano iterator protocol
+ * <tt>next()</tt> operation. However, the batches "returned" are not
+ * separate objects. Instead, Drill enforces the following semantics:
+ * <ul>
+ * <li>If a <tt>next()</tt> call returns <tt>OK</tt> then the set of vectors
+ * in the "returned" batch must be identical to those in the prior batch. Not
+ * just the same type; they must be the same <tt>ValueVector</tt> objects.
+ * (The buffers within the vectors will be different.)</li>
+ * <li>If the set of vectors changes in any way (add a vector, remove a
+ * vector, change the type of a vector), then the <tt>next()</tt> call
+ * <b>must</b> return <tt>OK_NEW_SCHEMA</tt>.</ul>
+ * </ul>
+ * These rules create interesting constraints for the scan operator.
+ * Conceptually, each batch is distinct. But, it must share vectors. The
+ * {@link ResultSetLoader} class handles this by managing the set of vectors
+ * used by a single reader.
+ * <p>
+ * Readers are independent: each may read a distinct schema (as in JSON.)
+ * Yet, the Drill protocol requires minimizing spurious <tt>OK_NEW_SCHEMA</tt>
+ * events. As a result, two readers run by the same scan operator must
+ * share the same set of vectors, despite the fact that they may have
+ * different schemas and thus different <tt>ResultSetLoader</tt>s.
+ * <p>
+ * The purpose of this inventory is to persist vectors across readers, even
+ * when, say, reader B does not use a vector that reader A created.
+ * <p>
+ * The semantics supported by this class include:
+ * <ul>
+ * <li>Ability to "pre-declare" columns based on columns that appear in
+ * an explicit select list. This ensures that the columns are known (but
+ * not their types).</li>
+ * <li>Ability to reuse a vector across readers if the column retains the same
+ * name and type (minor type and mode.)</li>
+ * <li>Ability to flush unused vectors for readers with changing schemas
+ * if a schema change occurs.</li>
+ * <li>Support schema "hysteresis"; that is, the a "sticky" schema that
+ * minimizes spurious changes. Once a vector is declared, it can be included
+ * in all subsequent batches (provided the column is nullable or an array.)</li>
+ * </ul>
+ */
+public class ResultVectorCacheImpl implements ResultVectorCache {
+
+  /**
+   * State of a projected vector. At first all we have is a name.
+   * Later, we'll discover the type.
+   */
+
+  private static class VectorState {
+    protected final String name;
+    protected ValueVector vector;
+    protected boolean touched;
+
+    public VectorState(String name) {
+      this.name = name;
+    }
+
+    public boolean satisfies(MaterializedField colSchema) {
+      if (vector == null) {
+        return false;
+      }
+      MaterializedField vectorSchema = vector.getField();
+      return vectorSchema.getType().equals(colSchema.getType());
+    }
+  }
+
+  private final BufferAllocator allocator;
+  private final Map<String, VectorState> vectors = new HashMap<>();
+
+  public ResultVectorCacheImpl(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  @Override
+  public BufferAllocator allocator() { return allocator; }
+
+  public void predefine(List<String> selected) {
+    for (String colName : selected) {
+      addVector(colName);
+    }
+  }
+
+  private VectorState addVector(String colName) {
+    VectorState vs = new VectorState(colName);
+    vectors.put(vs.name, vs);
+    return vs;
+  }
+
+  public void newBatch() {
+    for (VectorState vs : vectors.values()) {
+      vs.touched = false;
+    }
+  }
+
+  public void trimUnused() {
+    List<VectorState> unused = new ArrayList<>();
+    for (VectorState vs : vectors.values()) {
+      if (! vs.touched) {
+        unused.add(vs);
+      }
+    }
+    if (unused.isEmpty()) {
+      return;
+    }
+    for (VectorState vs : unused) {
+      vectors.remove(vs.name);
+    }
+  }
+
+  @Override
+  public ValueVector addOrGet(MaterializedField colSchema) {
+    VectorState vs = vectors.get(colSchema.getName());
+
+    // If the vector is found, and is of the right type, reuse it.
+
+    if (vs != null && vs.satisfies(colSchema)) {
+      return vs.vector;
+    }
+
+    // If no vector, this is a late schema. Create the vector.
+
+    if (vs == null) {
+      vs = addVector(colSchema.getName());
+
+    // Else, if the vector changed type, close the old one.
+
+    } else if (vs.vector != null) {
+      vs.vector.close();
+      vs.vector = null;
+    }
+
+    // Create the new vector.
+
+    vs.touched = true;
+    vs.vector = TypeHelper.getNewVector(colSchema, allocator, null);
+    return vs.vector;
+  }
+
+  public MajorType getType(String name) {
+    VectorState vs = vectors.get(name);
+    if (vs == null || vs.vector == null) {
+      return null;
+    }
+    return vs.vector.getField().getType();
+  }
+
+  public void close() {
+    for (VectorState vs : vectors.values()) {
+      vs.vector.close();
+    }
+    vectors.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java
new file mode 100644
index 0000000..ec61ae7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.ArrayList;
+
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
+
+/**
+ * Implementation of the row set loader. Provides row-level operations, leaving the
+ * result set loader to provide batch-level operations. However, all control
+ * operations are actually delegated to the result set loader, which handles
+ * the details of working with overflow rows.
+ */
+
+public class RowSetLoaderImpl extends AbstractTupleWriter implements RowSetLoader {
+
+  private final ResultSetLoaderImpl rsLoader;
+
+  protected RowSetLoaderImpl(ResultSetLoaderImpl rsLoader, TupleMetadata schema) {
+    super(schema, new ArrayList<AbstractObjectWriter>());
+    this.rsLoader = rsLoader;
+    bindIndex(rsLoader.writerIndex());
+  }
+
+  @Override
+  public ResultSetLoader loader() { return rsLoader; }
+
+  @Override
+  public RowSetLoader addRow(Object...values) {
+    if (! start()) {
+      throw new IllegalStateException("Batch is full.");
+    }
+    setObject(values);
+    save();
+    return this;
+  }
+
+  @Override
+  public int rowIndex() { return rsLoader.writerIndex().vectorIndex(); }
+
+  @Override
+  public void save() { rsLoader.saveRow(); }
+
+  @Override
+  public boolean start() {
+    if (rsLoader.isFull()) {
+
+      // Full batch? Return false.
+
+      return false;
+    } else if (state == State.IN_ROW) {
+
+      // Already in a row? Rewind the to start of the row.
+
+      restartRow();
+    } else {
+
+      // Otherwise, advance to the next row.
+
+      rsLoader.startRow();
+    }
+    return true;
+  }
+
+  public void endBatch() {
+    if (state == State.IN_ROW) {
+      restartRow();
+      state = State.IN_WRITE;
+    }
+    endWrite();
+  }
+
+  @Override
+  public boolean isFull( ) { return rsLoader.isFull(); }
+
+  @Override
+  public int rowCount() { return rsLoader.rowCount(); }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
new file mode 100644
index 0000000..f6bc5f3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter;
+
+/**
+ * Base class for a single vector. Handles the bulk of work for that vector.
+ * Subclasses are specialized for offset vectors or values vectors.
+ * (The "single vector" name contrasts with classes that manage compound
+ * vectors, such as a data and offsets vector.)
+ */
+
+public abstract class SingleVectorState implements VectorState {
+
+  /**
+   * State for a scalar value vector. The vector might be for a simple (non-array)
+   * vector, or might be the payload part of a scalar array (repeated scalar)
+   * vector.
+   */
+
+  public static class ValuesVectorState extends SingleVectorState {
+
+    private final ColumnMetadata schema;
+
+    public ValuesVectorState(ColumnMetadata schema, AbstractScalarWriter writer, ValueVector mainVector) {
+      super(writer, mainVector);
+      this.schema = schema;
+    }
+
+    @Override
+    public int allocateVector(ValueVector vector, int cardinality) {
+      if (schema.isVariableWidth()) {
+
+        // Cap the allocated size to the maximum.
+
+        int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) cardinality * schema.expectedWidth());
+        ((VariableWidthVector) vector).allocateNew(size, cardinality);
+      } else {
+        ((FixedWidthVector) vector).allocateNew(cardinality);
+      }
+      return vector.getBufferSize();
+    }
+
+    @Override
+    protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
+      int newIndex = 0;
+      ResultSetLoaderImpl.logger.trace("Vector {} of type {}: copy {} values from {} to {}",
+          mainVector.getField().toString(),
+          mainVector.getClass().getSimpleName(),
+          Math.max(0, sourceEndIndex - sourceStartIndex + 1),
+          sourceStartIndex, newIndex);
+
+      // Copy overflow values from the full vector to the new
+      // look-ahead vector. Uses vector-level operations for convenience.
+      // These aren't very efficient, but overflow does not happen very
+      // often.
+
+      for (int src = sourceStartIndex; src <= sourceEndIndex; src++, newIndex++) {
+        mainVector.copyEntry(newIndex, backupVector, src);
+      }
+    }
+  }
+
+  /**
+   * Special case for an offset vector. Offset vectors are managed like any other
+   * vector with respect to overflow and allocation. This means that the loader
+   * classes avoid the use of the RepeatedVector class methods, instead working
+   * with the offsets vector (here) or the values vector to allow the needed
+   * fine control over overflow operations.
+   */
+
+  public static class OffsetVectorState extends SingleVectorState {
+
+    private final AbstractObjectWriter childWriter;
+
+    public OffsetVectorState(AbstractScalarWriter writer, ValueVector mainVector,
+        AbstractObjectWriter childWriter) {
+      super(writer, mainVector);
+      this.childWriter = childWriter;
+    }
+
+    @Override
+    public int allocateVector(ValueVector toAlloc, int cardinality) {
+      ((UInt4Vector) toAlloc).allocateNew(cardinality);
+      return toAlloc.getBufferSize();
+    }
+
+    public int rowStartOffset() {
+      return ((OffsetVectorWriter) writer).rowStartOffset();
+    }
+
+    @Override
+    protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
+
+      if (sourceStartIndex > sourceEndIndex) {
+        return;
+      }
+
+      // This is an offset vector. The data to copy is one greater
+      // than the row index.
+
+      sourceStartIndex++;
+      sourceEndIndex++;
+
+      // Copy overflow values from the full vector to the new
+      // look-ahead vector. Since this is an offset vector, values must
+      // be adjusted as they move across.
+      //
+      // Indexing can be confusing. Offset vectors have values offset
+      // from their row by one position. The offset vector position for
+      // row i has the start value for row i. The offset vector position for
+      // i+1 has the start of the next value. The difference between the
+      // two is the element length. As a result, the offset vector always has
+      // one more value than the number of rows, and position 0 is always 0.
+      //
+      // The index passed in here is that of the row that overflowed. That
+      // offset vector position contains the offset of the start of the data
+      // for the current row. We must subtract that offset from each copied
+      // value to adjust the offset for the destination.
+
+      UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) backupVector).getAccessor();
+      UInt4Vector.Mutator destMutator = ((UInt4Vector) mainVector).getMutator();
+      int offset = childWriter.events().writerIndex().rowStartIndex();
+      int newIndex = 1;
+      ResultSetLoaderImpl.logger.trace("Offset vector: copy {} values from {} to {} with offset {}",
+          Math.max(0, sourceEndIndex - sourceStartIndex + 1),
+          sourceStartIndex, newIndex, offset);
+      assert offset == sourceAccessor.get(sourceStartIndex - 1);
+
+      // Position zero is special and will be filled in by the writer
+      // later.
+
+      for (int src = sourceStartIndex; src <= sourceEndIndex; src++, newIndex++) {
+        destMutator.set(newIndex, sourceAccessor.get(src) - offset);
+      }
+//      VectorPrinter.printOffsets((UInt4Vector) backupVector, sourceStartIndex - 1, sourceEndIndex - sourceStartIndex + 3);
+//      VectorPrinter.printOffsets((UInt4Vector) mainVector, 0, newIndex);
+    }
+  }
+
+  protected final AbstractScalarWriter writer;
+  protected final ValueVector mainVector;
+  protected ValueVector backupVector;
+
+  public SingleVectorState(AbstractScalarWriter writer, ValueVector mainVector) {
+    this.writer = writer;
+    this.mainVector = mainVector;
+  }
+
+  @Override
+  public ValueVector vector() { return mainVector; }
+
+  @Override
+  public int allocate(int cardinality) {
+    return allocateVector(mainVector, cardinality);
+  }
+
+  protected abstract int allocateVector(ValueVector vector, int cardinality);
+
+  /**
+   * A column within the row batch overflowed. Prepare to absorb the rest of
+   * the in-flight row by rolling values over to a new vector, saving the
+   * complete vector for later. This column could have a value for the overflow
+   * row, or for some previous row, depending on exactly when and where the
+   * overflow occurs.
+   *
+   * @param sourceStartIndex the index of the row that caused the overflow, the
+   * values of which should be copied to a new "look-ahead" vector. If the
+   * vector is an array, then the overflowIndex is the position of the first
+   * element to be moved, and multiple elements may need to move
+   */
+
+  @Override
+  public void rollover(int cardinality) {
+
+    int sourceStartIndex = writer.writerIndex().rowStartIndex();
+
+    // Remember the last write index for the original vector.
+    // This tells us the end of the set of values to move, while the
+    // sourceStartIndex above tells us the start.
+
+    int sourceEndIndex = writer.lastWriteIndex();
+
+    // Switch buffers between the backup vector and the writer's output
+    // vector. Done this way because writers are bound to vectors and
+    // we wish to keep the binding.
+
+    if (backupVector == null) {
+      backupVector = TypeHelper.getNewVector(mainVector.getField(), mainVector.getAllocator(), null);
+    }
+    assert cardinality > 0;
+    allocateVector(backupVector, cardinality);
+    mainVector.exchange(backupVector);
+
+    // Copy overflow values from the full vector to the new
+    // look-ahead vector.
+
+    copyOverflow(sourceStartIndex, sourceEndIndex);
+
+    // At this point, the writer is positioned to write to the look-ahead
+    // vector at the position after the copied values. The original vector
+    // is saved along with a last write position that is no greater than
+    // the retained values.
+  }
+
+  protected abstract void copyOverflow(int sourceStartIndex, int sourceEndIndex);
+
+  /**
+    * Exchange the data from the backup vector and the main vector, putting
+    * the completed buffers back into the main vectors, and stashing the
+    * overflow buffers away in the backup vector.
+    * Restore the main vector's last write position.
+    */
+
+  @Override
+  public void harvestWithLookAhead() {
+    mainVector.exchange(backupVector);
+  }
+
+  /**
+   * The previous full batch has been sent downstream and the client is
+   * now ready to start writing to the next batch. Initialize that new batch
+   * with the look-ahead values saved during overflow of the previous batch.
+   */
+
+  @Override
+  public void startBatchWithLookAhead() {
+    mainVector.exchange(backupVector);
+    backupVector.clear();
+  }
+
+  @Override
+  public void reset() {
+    mainVector.clear();
+    if (backupVector != null) {
+      backupVector.clear();
+    }
+  }
+
+  @Override
+  public void dump(HierarchicalFormatter format) {
+    format
+      .startObject(this)
+      .attributeIdentity("writer", writer)
+      .attributeIdentity("mainVector", mainVector)
+      .attributeIdentity("backupVector", backupVector)
+      .endObject();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
new file mode 100644
index 0000000..de41ee4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseMapColumnState;
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.MapArrayColumnState;
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.MapColumnState;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+import org.apache.drill.exec.record.TupleSchema.AbstractColumnMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
+import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
+
+/**
+ * Represents the loader state for a tuple: a row or a map. This is "state" in
+ * the sense of variables that are carried along with each tuple. Handles
+ * write-time issues such as defining new columns, allocating memory, handling
+ * overflow, assembling the output version of the map, and so on. Each
+ * row and map in the result set has a tuple state instances associated
+ * with it.
+ * <p>
+ * Here, by "tuple" we mean a container of vectors, each of which holds
+ * a variety of values. So, the "tuple" here is structural, not a specific
+ * set of values, but rather the collection of vectors that hold tuple
+ * values.
+ */
+
+public abstract class TupleState implements TupleWriterListener {
+
+  /**
+   * Handles the details of the top-level tuple, the data row itself.
+   * Note that by "row" we mean the set of vectors that define the
+   * set of rows.
+   */
+
+  public static class RowState extends TupleState {
+
+    /**
+     * The row-level writer for stepping through rows as they are written,
+     * and for accessing top-level columns.
+     */
+
+    private final RowSetLoaderImpl writer;
+
+    public RowState(ResultSetLoaderImpl rsLoader) {
+      super(rsLoader, rsLoader.projectionSet);
+      writer = new RowSetLoaderImpl(rsLoader, schema);
+      writer.bindListener(this);
+    }
+
+    public RowSetLoaderImpl rootWriter() { return writer; }
+
+    @Override
+    public AbstractTupleWriter writer() { return writer; }
+
+    @Override
+    public int innerCardinality() { return resultSetLoader.targetRowCount();}
+  }
+
+  /**
+   * Represents a tuple defined as a Drill map: single or repeated. Note that
+   * the map vector does not exist here; it is assembled only when "harvesting"
+   * a batch. This design supports the obscure case in which a new column
+   * is added during an overflow row, so exists within this abstraction,
+   * but is not published to the map that makes up the output.
+   */
+
+  public static class MapState extends TupleState {
+
+    protected final BaseMapColumnState mapColumnState;
+    protected int outerCardinality;
+
+    public MapState(ResultSetLoaderImpl rsLoader,
+        BaseMapColumnState mapColumnState,
+        ProjectionSet projectionSet) {
+      super(rsLoader, projectionSet);
+      this.mapColumnState = mapColumnState;
+      mapColumnState.writer().bindListener(this);
+    }
+
+    /**
+     * Return the tuple writer for the map. If this is a single
+     * map, then it is the writer itself. If this is a map array,
+     * then the tuple is nested inside the array.
+     */
+
+    @Override
+    public AbstractTupleWriter writer() {
+      AbstractObjectWriter objWriter = mapColumnState.writer();
+      TupleWriter tupleWriter;
+      if (objWriter.type() == ObjectType.ARRAY) {
+        tupleWriter = objWriter.array().tuple();
+      } else {
+        tupleWriter = objWriter.tuple();
+      }
+      return (AbstractTupleWriter) tupleWriter;
+    }
+
+    /**
+     * In order to allocate the correct-sized vectors, the map must know
+     * its member cardinality: the number of elements in each row. This
+     * is 1 for a single map, but may be any number for a map array. Then,
+     * this value is recursively pushed downward to compute the cardinality
+     * of lists of maps that contains lists of maps, and so on.
+     */
+
+    @Override
+    public void updateCardinality(int outerCardinality) {
+      this.outerCardinality = outerCardinality;
+      super.updateCardinality(outerCardinality);
+    }
+
+    @Override
+    public int innerCardinality() {
+      return outerCardinality * mapColumnState.schema().expectedElementCount();
+    }
+
+    @Override
+    public void dump(HierarchicalFormatter format) {
+      format
+        .startObject(this)
+        .attribute("column", mapColumnState.schema().name())
+        .attribute("cardinality", outerCardinality)
+        .endObject();
+    }
+  }
+
+  protected final ResultSetLoaderImpl resultSetLoader;
+  protected final List<ColumnState> columns = new ArrayList<>();
+  protected final TupleSchema schema = new TupleSchema();
+  protected final ProjectionSet projectionSet;
+
+  protected TupleState(ResultSetLoaderImpl rsLoader, ProjectionSet projectionSet) {
+    this.resultSetLoader = rsLoader;
+    this.projectionSet = projectionSet;
+  }
+
+  public abstract int innerCardinality();
+
+  /**
+   * Returns an ordered set of the columns which make up the tuple.
+   * Column order is the same as that defined by the map's schema,
+   * to allow indexed access. New columns always appear at the end
+   * of the list to preserve indexes.
+   *
+   * @return ordered list of column states for the columns within
+   * this tuple
+   */
+
+  public List<ColumnState> columns() { return columns; }
+
+  public TupleMetadata schema() { return writer().schema(); }
+
+  public abstract AbstractTupleWriter writer();
+
+  @Override
+  public ObjectWriter addColumn(TupleWriter tupleWriter, MaterializedField column) {
+    return addColumn(tupleWriter, TupleSchema.fromField(column));
+  }
+
+  @Override
+  public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSchema) {
+
+    // Verify name is not a (possibly case insensitive) duplicate.
+
+    TupleMetadata tupleSchema = schema();
+    String colName = columnSchema.name();
+    if (tupleSchema.column(colName) != null) {
+      throw new IllegalArgumentException("Duplicate column: " + colName);
+    }
+
+    return addColumn(columnSchema);
+  }
+
+  /**
+   * Implementation of the work to add a new column to this tuple given a
+   * schema description of the column.
+   *
+   * @param columnSchema schema of the column
+   * @return writer for the new column
+   */
+
+  private AbstractObjectWriter addColumn(ColumnMetadata columnSchema) {
+
+    // Indicate projection in the metadata.
+
+    ((AbstractColumnMetadata) columnSchema).setProjected(
+        projectionSet.isProjected(columnSchema.name()));
+
+    // Build the column
+
+    ColumnState colState;
+    if (columnSchema.isMap()) {
+      colState = buildMap(columnSchema);
+    } else {
+      colState = buildPrimitive(columnSchema);
+    }
+    columns.add(colState);
+    colState.updateCardinality(innerCardinality());
+    colState.allocateVectors();
+    return colState.writer();
+  }
+
+  /**
+   * Build a primitive column. Check if the column is projected. If not,
+   * allocate a dummy writer for the column. If projected, then allocate
+   * a vector, a writer, and the column state which binds the two together
+   * and manages the column.
+   *
+   * @param columnSchema schema of the new primitive column
+   * @return column state for the new column
+   */
+
+  @SuppressWarnings("resource")
+  private ColumnState buildPrimitive(ColumnMetadata columnSchema) {
+    ValueVector vector;
+    if (columnSchema.isProjected()) {
+
+      // Create the vector for the column.
+
+      vector = resultSetLoader.vectorCache().addOrGet(columnSchema.schema());
+    } else {
+
+      // Column is not projected. No materialized backing for the column.
+
+      vector = null;
+    }
+
+    // Create the writer. Will be returned to the tuple writer.
+
+    AbstractObjectWriter colWriter = ColumnWriterFactory.buildColumnWriter(columnSchema, vector);
+
+    if (columnSchema.isArray()) {
+      return PrimitiveColumnState.newPrimitiveArray(resultSetLoader, vector, colWriter);
+    } else {
+      return PrimitiveColumnState.newPrimitive(resultSetLoader, vector, colWriter);
+    }
+  }
+
+  /**
+   * Build a new map (single or repeated) column. No map vector is created
+   * here, instead we create a tuple state to hold the columns, and defer the
+   * map vector (or vector container) until harvest time.
+   *
+   * @param columnSchema description of the map column
+   * @return column state for the map column
+   */
+
+  private ColumnState buildMap(ColumnMetadata columnSchema) {
+
+    // When dynamically adding columns, must add the (empty)
+    // map by itself, then add columns to the map via separate
+    // calls.
+
+    assert columnSchema.isMap();
+    assert columnSchema.mapSchema().size() == 0;
+
+    // Create the writer. Will be returned to the tuple writer.
+
+    ProjectionSet childProjection = projectionSet.mapProjection(columnSchema.name());
+    if (columnSchema.isArray()) {
+      return MapArrayColumnState.build(resultSetLoader,
+          columnSchema,
+          childProjection);
+    } else {
+      return new MapColumnState(resultSetLoader,
+          columnSchema,
+          childProjection);
+    }
+  }
+
+  /**
+   * When creating a schema up front, provide the schema of the desired tuple,
+   * then build vectors and writers to match. Allows up-front schema definition
+   * in addition to on-the-fly schema creation handled elsewhere.
+   *
+   * @param schema desired tuple schema to be materialized
+   */
+
+  public void buildSchema(TupleMetadata schema) {
+    for (int i = 0; i < schema.size(); i++) {
+      ColumnMetadata colSchema = schema.metadata(i);
+      AbstractObjectWriter colWriter;
+      if (colSchema.isMap()) {
+        colWriter = addColumn(colSchema.cloneEmpty());
+        BaseMapColumnState mapColState = (BaseMapColumnState) columns.get(columns.size() - 1);
+        mapColState.mapState().buildSchema(colSchema.mapSchema());
+      } else {
+        colWriter = addColumn(colSchema);
+      }
+      writer().addColumnWriter(colWriter);
+    }
+  }
+
+  public void updateCardinality(int cardinality) {
+    for (ColumnState colState : columns) {
+      colState.updateCardinality(cardinality);
+    }
+  }
+
+  /**
+   * A column within the row batch overflowed. Prepare to absorb the rest of the
+   * in-flight row by rolling values over to a new vector, saving the complete
+   * vector for later. This column could have a value for the overflow row, or
+   * for some previous row, depending on exactly when and where the overflow
+   * occurs.
+   */
+
+  public void rollover() {
+    for (ColumnState colState : columns) {
+      colState.rollover();
+    }
+  }
+
+  /**
+   * Writing of a row batch is complete, and an overflow occurred. Prepare the
+   * vector for harvesting to send downstream. Set aside the look-ahead vector
+   * and put the full vector buffer back into the active vector.
+   */
+
+  public void harvestWithLookAhead() {
+    for (ColumnState colState : columns) {
+      colState.harvestWithLookAhead();
+    }
+  }
+
+  /**
+   * Start a new batch by shifting the overflow buffers back into the main
+   * write vectors and updating the writers.
+   */
+
+  public void startBatch() {
+    for (ColumnState colState : columns) {
+      colState.startBatch();
+    }
+  }
+
+  /**
+   * Clean up state (such as backup vectors) associated with the state
+   * for each vector.
+   */
+
+  public void close() {
+    for (ColumnState colState : columns) {
+      colState.close();
+    }
+  }
+
+  public void dump(HierarchicalFormatter format) {
+    format
+      .startObject(this)
+      .attributeArray("columns");
+    for (int i = 0; i < columns.size(); i++) {
+      format.element(i);
+      columns.get(i).dump(format);
+    }
+    format
+      .endArray()
+      .endObject();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java
new file mode 100644
index 0000000..faa68cb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.List;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseMapColumnState;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+/**
+ * Builds the harvest vector container that includes only the columns that
+ * are included in the harvest schema version. That is, it excludes columns
+ * added while writing an overflow row.
+ * <p>
+ * Because a Drill row is actually a hierarchy, walks the internal hierarchy
+ * and builds a corresponding output hierarchy.
+ * <ul>
+ * <li>The root node is the row itself (vector container),</li>
+ * <li>Internal nodes are maps (structures),</li>
+ * <li>Leaf notes are primitive vectors (which may be arrays).</li>
+ * </ul>
+ * The basic algorithm is to identify the version of the output schema,
+ * then add any new columns added up to that version. This object maintains
+ * the output container across batches, meaning that updates are incremental:
+ * we need only add columns that are new since the last update. And, those new
+ * columns will always appear directly after all existing columns in the row
+ * or in a map.
+ * <p>
+ * As special case occurs when columns are added in the overflow row. These
+ * columns <i>do not</i> appear in the output container for the main part
+ * of the batch; instead they appear in the <i>next</i> output container
+ * that includes the overflow row.
+ * <p>
+ * Since the container here may contain a subset of the internal columns, an
+ * interesting case occurs for maps. The maps in the output container are
+ * <b>not</b> the same as those used internally. Since a map column can contain
+ * either one list of columns or another, the internal and external maps must
+ * differ. The set of child vectors (except for child maps) are shared.
+ */
+
+public class VectorContainerBuilder {
+
+  /**
+   * Drill vector containers and maps are both tuples, but they irritatingly
+   * have completely different APIs for working with their child vectors.
+   * This class acts as a proxy to wrap the two APIs to provide a common
+   * view for the use of the container builder.
+   */
+
+  public static abstract class TupleProxy {
+    protected TupleMetadata schema;
+
+    public TupleProxy(TupleMetadata schema) {
+      this.schema = schema;
+    }
+
+    protected abstract int size();
+    protected abstract ValueVector vector(int index);
+    protected abstract void add(ValueVector vector);
+
+    protected TupleProxy mapProxy(int index) {
+      return new MapProxy(
+          schema.metadata(index).mapSchema(),
+          (AbstractMapVector) vector(index));
+    }
+  }
+
+  /**
+   * Proxy wrapper class for a vector container.
+   */
+
+  protected static class ContainerProxy extends TupleProxy {
+
+    private VectorContainer container;
+
+    protected ContainerProxy(TupleMetadata schema, VectorContainer container) {
+      super(schema);
+      this.container = container;
+    }
+
+    @Override
+    protected int size() {
+      return container.getNumberOfColumns();
+    }
+
+    @Override
+    protected ValueVector vector(int index) {
+      return container.getValueVector(index).getValueVector();
+    }
+
+    @Override
+    protected void add(ValueVector vector) {
+      container.add(vector);
+    }
+  }
+
+  /**
+   * Proxy wrapper for a map container.
+   */
+
+  protected static class MapProxy extends TupleProxy {
+
+    private AbstractMapVector mapVector;
+
+    protected MapProxy(TupleMetadata schema, AbstractMapVector mapVector) {
+      super(schema);
+      this.mapVector = mapVector;
+    }
+
+    @Override
+    protected int size() {
+      return mapVector.size();
+    }
+
+    @Override
+    protected ValueVector vector(int index) {
+      return mapVector.getChildByOrdinal(index);
+    }
+
+    @Override
+    protected void add(ValueVector vector) {
+      mapVector.putChild(vector.getField().getName(), vector);
+    }
+  }
+
+  private final ResultSetLoaderImpl resultSetLoader;
+  private int outputSchemaVersion = -1;
+  private TupleMetadata schema;
+  private VectorContainer container;
+
+  public VectorContainerBuilder(ResultSetLoaderImpl rsLoader) {
+    this.resultSetLoader = rsLoader;
+    container = new VectorContainer(rsLoader.allocator);
+    schema = new TupleSchema();
+  }
+
+  public void update(int targetVersion) {
+    if (outputSchemaVersion >= targetVersion) {
+      return;
+    }
+    outputSchemaVersion = targetVersion;
+    updateTuple(resultSetLoader.rootState(), new ContainerProxy(schema, container));
+    container.buildSchema(SelectionVectorMode.NONE);
+  }
+
+  public VectorContainer container() { return container; }
+
+  public int outputSchemaVersion() { return outputSchemaVersion; }
+
+  public BufferAllocator allocator() {
+     return resultSetLoader.allocator();
+  }
+
+  private void updateTuple(TupleState sourceModel, TupleProxy destProxy) {
+    int prevCount = destProxy.size();
+    List<ColumnState> cols = sourceModel.columns();
+    int currentCount = cols.size();
+
+    // Scan any existing maps for column additions
+
+    for (int i = 0; i < prevCount; i++) {
+      ColumnState colState = cols.get(i);
+      if (! colState.schema().isProjected()) {
+        continue;
+      }
+      if (colState.schema().isMap()) {
+        updateTuple((TupleState) ((BaseMapColumnState) colState).mapState(), destProxy.mapProxy(i));
+      }
+    }
+
+    // Add new columns, which may be maps
+
+    for (int i = prevCount; i < currentCount; i++) {
+      ColumnState colState = cols.get(i);
+      if (! colState.schema().isProjected()) {
+        continue;
+      }
+
+      // If the column was added after the output schema version cutoff,
+      // skip that column for now.
+
+      if (colState.addVersion > outputSchemaVersion) {
+        break;
+      }
+      if (colState.schema().isMap()) {
+        buildMap(destProxy, (BaseMapColumnState) colState);
+      } else {
+        destProxy.add(colState.vector());
+        destProxy.schema.addColumn(colState.schema());
+        assert destProxy.size() == destProxy.schema.size();
+      }
+    }
+  }
+
+  @SuppressWarnings("resource")
+  private void buildMap(TupleProxy parentTuple, BaseMapColumnState colModel) {
+
+    // Creating the map vector will create its contained vectors if we
+    // give it a materialized field with children. So, instead pass a clone
+    // without children so we can add them.
+
+    ColumnMetadata mapColSchema = colModel.schema().cloneEmpty();
+
+    // Don't get the map vector from the vector cache. Map vectors may
+    // have content that varies from batch to batch. Only the leaf
+    // vectors can be cached.
+
+    AbstractMapVector mapVector;
+    if (mapColSchema.isArray()) {
+
+      // A repeated map shares an offset vector with the internal
+      // repeated map.
+
+      UInt4Vector offsets = (UInt4Vector) colModel.vector();
+      mapVector = new RepeatedMapVector(mapColSchema.schema(), offsets, null);
+    } else {
+      mapVector = new MapVector(mapColSchema.schema(), allocator(), null);
+    }
+
+    // Add the map vector and schema to the parent tuple
+
+    parentTuple.add(mapVector);
+    int index = parentTuple.schema.addColumn(mapColSchema);
+    assert parentTuple.size() == parentTuple.size();
+
+    // Update the tuple, which will add the new columns in the map
+
+    updateTuple(colModel.mapState(), parentTuple.mapProxy(index));
+  }
+
+  public TupleMetadata schema() { return schema; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java
new file mode 100644
index 0000000..4a1c698
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+
+/**
+ * Handles batch and overflow operation for a (possibly compound) vector.
+ * <p>
+ * The data model is the following:
+ * <ul>
+ * <li>Column model<ul>
+ *   <li>Value vector itself</li>
+ *   <li>Column writer</li>
+ *   <li>Column schema</li>
+ *   <li>Column coordinator (this class)</li>
+ * </ul></li></ul>
+ * The vector state coordinates events between the result set loader
+ * on the one side and the vectors, writers and schema on the other.
+ * For example:
+ * <pre><code>
+ * Result Set       Vector
+ *   Loader   <-->  State   <-->    Vectors
+ * </code></pre>
+ * Events from the row set loader deal with allocation, roll-over,
+ * harvesting completed batches and so on. Events from the writer,
+ * via the tuple model deal with adding columns and column
+ * overflow.
+ */
+
+public interface VectorState {
+
+  /**
+   * Allocate a new vector with the number of elements given. If the vector
+   * is an array, then the cardinality given is the number of arrays.
+   * @param cardinality number of elements desired in the allocated
+   * vector
+   *
+   * @return the number of bytes allocated
+   */
+
+  int allocate(int cardinality);
+
+  /**
+   * A vector has overflowed. Create a new look-ahead vector of the given
+   * cardinality, then copy the overflow values from the main vector to the
+   * look-ahead vector.
+   *
+   * @param cardinality the number of elements in the new vector. If this
+   * vector is an array, then this is the number of arrays
+   * @return the new next write position for the vector index associated
+   * with the writer for this vector
+   */
+
+  void rollover(int cardinality);
+
+  /**
+   * A batch is being harvested after an overflow. Put the full batch
+   * back into the main vector so it can be harvested.
+   */
+
+  void harvestWithLookAhead();
+
+  /**
+   * A new batch is starting while an look-ahead vector exists. Move
+   * the look-ahead buffers into the main vector to prepare for writing
+   * the rest of the batch.
+   */
+
+  void startBatchWithLookAhead();
+
+  /**
+   * Clear the vector(s) associated with this state.
+   */
+
+  void reset();
+
+  /**
+   * Underlying vector: the one presented to the consumer of the
+   * result set loader.
+   */
+
+  ValueVector vector();
+
+  void dump(HierarchicalFormatter format);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
new file mode 100644
index 0000000..2158dd1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+
+/**
+ * Writer index that points to each row in the row set. The index starts at
+ * the 0th row and advances one row on each increment. This allows writers to
+ * start positioned at the first row. Writes happen in the current row.
+ * Calling <tt>next()</tt> advances to the next position, effectively saving
+ * the current row. The most recent row can be abandoned easily simply by not
+ * calling <tt>next()</tt>. This means that the number of completed rows is
+ * the same as the row index.
+ * <p>
+ * The writer index enforces the row count limit for a new batch. The
+ * limit is set by the result set loader and can vary from batch to batch
+ * if the client chooses in order to adjust the row count based on actual
+ * data size.
+ */
+
+class WriterIndexImpl implements ColumnWriterIndex {
+
+  private final ResultSetLoader rsLoader;
+  private int rowIndex = 0;
+
+  public WriterIndexImpl(ResultSetLoader rsLoader) {
+    this.rsLoader = rsLoader;
+  }
+
+  @Override
+  public int vectorIndex() { return rowIndex; }
+
+  @Override
+  public int rowStartIndex() { return rowIndex; }
+
+  public boolean next() {
+    if (++rowIndex < rsLoader.targetRowCount()) {
+      return true;
+    } else {
+      // Should not call next() again once batch is full.
+      rowIndex = rsLoader.targetRowCount();
+      return false;
+    }
+  }
+
+  public int size() {
+
+    // The index always points to the next slot past the
+    // end of valid rows.
+
+    return rowIndex;
+  }
+
+  public boolean valid() { return rowIndex < rsLoader.targetRowCount(); }
+
+  @Override
+  public void rollover() {
+
+    // The top level index always rolls over to 0 --
+    // the first row position in the new vectors.
+
+    reset();
+  }
+
+  public void reset() { rowIndex = 0; }
+
+  @Override
+  public void nextElement() { }
+
+  @Override
+  public ColumnWriterIndex outerIndex() { return null; }
+
+  @Override
+  public String toString() {
+    return new StringBuilder()
+      .append("[")
+      .append(getClass().getSimpleName())
+      .append(" rowIndex = ")
+      .append(rowIndex)
+      .append("]")
+      .toString();
+  }
+}