You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:40 UTC

[13/15] drill git commit: DRILL-5657: Size-aware vector writer structure

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/package-info.java
new file mode 100644
index 0000000..4c11499
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/package-info.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Handles the details of the result set loader implementation.
+ * <p>
+ * The primary purpose of this loader, and the most complex to understand and
+ * maintain, is overflow handling.
+ *
+ * <h4>Detailed Use Cases</h4>
+ *
+ * Let's examine it by considering a number of
+ * use cases.
+ * <table style="border: 1px solid; border-collapse: collapse;">
+ * <tr><th>Row</th><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th><th>f</th><th>g</th><th>h</th></tr>
+ * <tr><td>n-2</td><td>X</td><td>X</td><td>X</td><td>X</td><td>X</td><td>X</td><td>-</td><td>-</td></tr>
+ * <tr><td>n-1</td><td>X</td><td>X</td><td>X</td><td>X</td><td> </td><td> </td><td>-</td><td>-</td></tr>
+ * <tr><td>n  </td><td>X</td><td>!</td><td>O</td><td> </td><td>O</td><td> </td><td>O</td><td> </td></tr>
+ * </table>
+ * Here:
+ * <ul>
+ * <li>n-2, n-1, and n are rows. n is the overflow row.</li>
+ * <li>X indicates a value was written before overflow.</li>
+ * <li>Blank indicates no value was written in that row.</li>
+ * <li>! indicates the value that triggered overflow.</li>
+ * <li>- indicates a column that did not exist prior to overflow.</li>
+ * <li>O indicates a value written after overflow.</li>
+ * </ul>
+ * Column a is written before overflow occurs, b causes overflow, and all other
+ * columns either are not written, or written after overflow.
+ * <p>
+ * The scenarios, identified by column names above, are:
+ * <dl>
+ * <dt>a</dt>
+ * <dd>a contains values for all three rows.
+ * <ul>
+ * <li>Two values were written in the "main" batch, while a third was written to
+ * what becomes the overflow row.</li>
+ * <li>When overflow occurs, the last write position is at n. It must be moved
+ * back to n-1.</li>
+ * <li>Since data was written to the overflow row, it is copied to the look-
+ * ahead batch.</li>
+ * <li>The last write position in the lookahead batch is 0 (since data was
+ * copied into the 0th row.</li>
+ * <li>When harvesting, no empty-filling is needed. Values in the main
+ * batch are zero-filled when the batch is finished, values in the look-ahead
+ * batch are back-filled when the first value is written.</li>
+ * <li>When starting the next batch, the last write position must be set to 0 to
+ * reflect the presence of the value for row n.</li>
+ * </ul>
+ * </dd>
+ * <dt>b</dt>
+ * <dd>b contains values for all three rows. The value for row n triggers
+ * overflow.
+ * <ul>
+ * <li>The last write position is at n-1, which is kept for the "main"
+ * vector.</li>
+ * <li>A new overflow vector is created and starts empty, with the last write
+ * position at -1.</li>
+ * <li>Once created, b is immediately written to the overflow vector, advancing
+ * the last write position to 0.</li>
+ * <li>Harvesting, and starting the next for column b works the same as column
+ * a.</li>
+ * </ul>
+ * </dd>
+ * <dt>c</dt>
+ * <dd>Column c has values for all rows.
+ * <ul>
+ * <li>The value for row n is written after overflow.</li>
+ * <li>At overflow, the last write position is at n-1.</li>
+ * <li>At overflow, a new lookahead vector is created with the last write
+ * position at -1.</li>
+ * <li>The value of c is written to the lookahead vector, advancing the last
+ * write position to -1.</li>
+ * <li>Harvesting, and starting the next for column c works the same as column
+ * a.</li>
+ * </ul>
+ * </dd>
+ * <dt>d</dt>
+ * <dd>Column d writes values to the last two rows before overflow, but not to
+ * the overflow row.
+ * <ul>
+ * <li>The last write position for the main batch is at n-1.</li>
+ * <li>The last write position in the lookahead batch remains at -1.</li>
+ * <li>Harvesting for column d requires filling an empty value for row n-1.</li>
+ * <li>When starting the next batch, the last write position must be set to -1,
+ * indicating no data yet written.</li>
+ * </ul>
+ * </dd>
+ * <dt>f</dt>
+ * <dd>Column f has no data in the last position of the main batch, and no data
+ * in the overflow row.
+ * <ul>
+ * <li>The last write position is at n-2.</li>
+ * <li>An empty value must be written into position n-1 during harvest.</li>
+ * <li>On start of the next batch, the last write position starts at -1.</li>
+ * </ul>
+ * </dd>
+ * <dt>g</dt>
+ * <dd>Column g is added after overflow, and has a value written to the overflow
+ * row.
+ * <ul>
+ * <li>On harvest, column g is simply skipped.</li>
+ * <li>On start of the next row, the last write position can be left unchanged
+ * since no "exchange" was done.</li>
+ * </ul>
+ * </dd>
+ * <dt>h</dt>
+ * <dd>Column h is added after overflow, but does not have data written to it
+ * during the overflow row. Similar to column g, but the last write position
+ * starts at -1 for the next batch.</dd>
+ * </dl>
+ *
+ * <h4>General Rules</h4>
+ *
+ * The above can be summarized into a smaller set of rules:
+ * <p>
+ * At the time of overflow on row n:
+ * <ul>
+ * <li>Create or clear the lookahead vector.</li>
+ * <li>Copy (last write position - n  + 1) values from row n in the old vector to 0
+ * in the new one. If the copy count is negative, copy nothing. (A negative
+ * copy count means that the last write position is behind the current
+ * row position. Should not occur after back-filling.)</li>
+ * <li>Save the last write position from the old vector, clamped at n.
+ * (That is, if the last write position is before n, keep it. If at
+ * n+1, set it back to n.)</li>
+ * <li>Set the last write position of the overflow vector to (original last
+ * write position - n), clamped at -1. That is, if the original last write
+ * position was before n, the new one is -1. If the original last write
+ * position is after n, shift it down by n places.</li>
+ * <li>Swap buffers from the main vectors and the overflow vectors. This sets
+ * aside the main values, and allows writing to continue using the overflow
+ * buffers.</li>
+ * </ul>
+ * <p>
+ * As the overflow write proceeds:
+ * <ul>
+ * <li>For existing columns, write as normal. The last write position moves from
+ * -1 to 0.</li>
+ * <li>Columns not written leave the last write position at -1.</li>
+ * <li>If a new column appears, set its last write position to -1. If it is then
+ * written, proceed as in the first point above.</li>
+ * </ul>
+ * <p>
+ * At harvest time:
+ * <ul>
+ * <li>For every writer, save the last write position.</li>
+ * <li>Swap the overflow and main buffers to put the main batch back into the
+ * main vectors.</li>
+ * <li>Reset the last write position for all writers to the values saved at
+ * overflow time above.</li>
+ * <li>Finish the batch for the main vectors as normal. No special handling
+ * needed.</li>
+ * </ul>
+ * <p>
+ * When starting the next batch:
+ * <ul>
+ * <li>Swap buffers again, putting the overflow row back into the main vectors.
+ * (At this point, the harvested vectors should all have zero buffers.)</li>
+ * <li>Restore the last write position saved during harvest.</li>
+ * </ul>
+ * <h4>Constraints</h4>
+ * A number of constraints are worth pointing out:
+ * <ul>
+ * <li>Writers are bound to vectors, so we can't easily swap vectors during
+ * overflow.</li>
+ * <li>The project operator to which this operator feeds data also binds to
+ * vectors, so the same set of vectors must be presented on every batch.</li>
+ * <li>The client binds to writers, so we cannot swap writers between main and
+ * overflow batches.</li>
+ * <li>Therefore, the unit of swapping is the buffer that backs the vectors.
+ * </li>
+ * <li>Swapping is not copying; it is only exchanging pointers.</li>
+ * <li>The only copying in this entire process occurs when moving previously-
+ * written values in the overflow row to the new vector at the time of
+ * overflow.</li>
+ * </ul>
+ *
+ * <h4>Arrays</h4>
+ *
+ * The above covers the case of scalar, top-level columns. The extension to
+ * scalar maps is straightforward: at run time, the members of maps are just
+ * simple scalar vectors that reside in a map name space, but the structure
+ * of map fields is the same as for top-level fields. (Think of map fields
+ * as being "flattened" into the top-level tuple.)
+ * <p>
+ * Arrays are a different matter: each row can have many values associated
+ * with it. Consider an array of scalars. We have:
+ * <pre><code>
+ *    Row 0   Row 1     Row 2
+ *    0 1 2   3 4 5     6 7 8
+ * [ [a b c] [d e f] | [g h i] ]
+ * </code></pre>
+ * Here, the letters indicate values. The brackets show the overall vector
+ * (outer brackets) and individual rows (inner brackets). The vertical line
+ * shows where overflow occurred. The same rules as discussed earier still
+ * apply, but we must consider both the row indexes and the array indexes.
+ * <ul>
+ * <li>Overflow occurs at the row level. Here row 2 overflowed and must
+ * be moved to the look-ahead vector.</li>
+ * <li>Value movement occurs at the value level. Here, values 6, 7 and 8
+ * must be move to the look-ahead vector.</li>
+ * </ul>
+ * The result, after overflow, is:
+ * <pre><code>
+ *    Row 0   Row 1       Row 0
+ *    0 1 2   3 4 5       0 1 2
+ * [ [a b c] [d e f] ] [ [g h i] ]
+ * </code></pre>
+ * Further, we must consider lists: a column may consist of a list of
+ * arrays. Or, a column may consist of an array of maps, one of which is
+ * a list of arrays. So, the above reasoning must apply recursively down
+ * the value tree.
+ * <p>
+ * As it turns out, there is a simple recursive algorithm, which is a
+ * simple extension of the reasoning for the top-level scalar case, that can
+ * handle arrays:
+ * <ul>
+ * <li>Start with the row index of the overflow row.</li>
+ * <li>If column c, say, is an array, obtain the index of the first value for
+ * the overflow row.</li>
+ * <li>If c is a list, or a repeated map, then repeat the above, for each
+ * member of c (a single column for a list, a set of columns for a map), but
+ * replace the row index with the index of the first element.</li>
+ * </ul>
+ * The result will be a walk of the value tree in which the overflow index
+ * starts as an index relative to the result set (a row index), and is
+ * recursively replaced with an array offset for each level of the array.
+ *
+ * <h4>Resynching Writers after Overflow</h4>
+ *
+ * When an overflow occurs, our focus is starts with the single top-level row
+ * that will not fit into the current batch. We move this row to the look-ahead
+ * vectors. Doing so is quite simple when each row is a simple tuple. As
+ * described above, the work is quite a bit more complex when the structure
+ * is a JSON-like tree flattened into vectors.
+ * <p>
+ * Consider the writers. Each writer corresponds to a single vector. Writers
+ * are grouped into logical tree nodes. Those in the root node write to
+ * (single, scalar) columns that are either top-level columns, or nested
+ * some level down in single-value (not array) tuples. Another tree level
+ * occurs in an array: the elements of the array use a different
+ * (faster-changing) index than the top (row-level) writers. Different arrays
+ * have different indexes: a row may have, say, four elements in array A,
+ * but 20 elements in array B.
+ * <p>
+ * Further, arrays can be singular (a repeated int, say) or for an entire
+ * tuple (a repeated map.) And, since Drill supports the full JSON model, in
+ * the most general case, there is a tree of array indexes that can be nested
+ * to an arbitrary level. (A row can have an array of maps which contains a
+ * column that is, itself, a list of repeated maps, a field of which is an
+ * array of ints.)
+ * <p>
+ * Writers handle this index tree via a tree of {@link ColumnWriterIndex}
+ * objects, often specialized for various tasks.
+ * <p>
+ * Now we can get to the key concept in this section: how we update those indexes
+ * after an overflow. The top-level index reverts to zero. (We start writing
+ * the 0th row in the new look-ahead batch.) But, nested indexes (those for arrays)
+ * will start at some other position depending on the number elements already
+ * written in an overflow row. The number of such elements is determined by a
+ * top-down traversal of the tree (to determine the start offset of each array
+ * for the row.) Resetting the writer indexes is a bottom-up process: based on
+ * the number of elements in that array, the writer index is reset to match.
+ * <p>
+ * This flow is the opposite of the "normal" case in which a new batch is started
+ * top-down, with each index being reset to zero.
+ *
+ * <h4>The Need for a Uniform Structure</h4>
+ *
+ * Drill has vastly different implementations and interfaces for:
+ * <ul>
+ * <li>Result sets (as a {@link VectorContainer}),</li>
+ * <li>Arrays (as a generated repeated vector),</li>
+ * <li>Lists (as a {@link ListVector}),</li>
+ * <li>Repeated lists (as a {@link RepeatedList vector}, and</li>
+ * <li>Repeated maps ({@link RepeatedMapVector}.</li>
+ * </ul>
+ * If we were to work directly with the above abstractions the code would be
+ * vastly complex. Instead, we abstract out the common structure into the
+ * {@link TupleMode} abstraction. In particular, we use the
+ * single tuple model which works with a single batch. This model provides a
+ * simple, uniform interface to work with columns and tuples (rows, maps),
+ * and a simple way to work with arrays. This interface reduces the above
+ * array algorithm to a simple set of recursive method calls.
+ */
+
+package org.apache.drill.exec.physical.rowSet.impl;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/BaseTupleModel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/BaseTupleModel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/BaseTupleModel.java
new file mode 100644
index 0000000..40da4ec
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/BaseTupleModel.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+import org.apache.drill.exec.record.TupleSchema.AbstractColumnMetadata;
+import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * Base implementation for a tuple model which is common to the "single"
+ * and "hyper" cases. Deals primarily with the structure of the model,
+ * which is common between the two physical implementations.
+ */
+
+public abstract class BaseTupleModel implements TupleModel {
+
+  public static abstract class BaseColumnModel implements ColumnModel {
+
+    /**
+     * Extended schema associated with a column.
+     */
+
+    protected final ColumnMetadata schema;
+
+    public BaseColumnModel(ColumnMetadata schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public ColumnMetadata schema() { return schema; }
+
+    @Override
+    public TupleModel mapModel() { return null; }
+  }
+
+  /**
+   * Columns within the tuple. Columns may, themselves, be represented
+   * as tuples.
+   */
+
+  protected final List<ColumnModel> columns;
+
+  /**
+   * Descriptive schema associated with the columns above. Unlike a
+   * {@link VectorContainer}, this abstraction keeps the schema in sync
+   * with vectors as columns are added.
+   */
+
+  protected final TupleSchema schema;
+
+  public BaseTupleModel() {
+
+    // Schema starts empty and is built as columns are added.
+    // This ensures that the schema stays in sync with the
+    // backing vectors.
+
+    schema = new TupleSchema();
+    columns = new ArrayList<>();
+  }
+
+  public BaseTupleModel(TupleSchema schema, List<ColumnModel> columns) {
+    this.schema = schema;
+    this.columns = columns;
+    assert schema.size() == columns.size();
+  }
+
+  @Override
+  public TupleMetadata schema() { return schema; }
+
+  @Override
+  public int size() { return schema.size(); }
+
+  @Override
+  public ColumnModel column(int index) {
+    return columns.get(index);
+  }
+
+  @Override
+  public ColumnModel column(String name) {
+    return column(schema.index(name));
+  }
+
+  /**
+   * Perform the work of keeping the list of columns and schema in-sync
+   * as columns are added. This is protected because derived classes
+   * must add logic to keep the new column in sync with the underlying
+   * container or map vector.
+   *
+   * @param column column implementation to add
+   */
+
+  protected void addBaseColumn(BaseColumnModel column) {
+    schema.add((AbstractColumnMetadata) column.schema());
+    columns.add(column);
+    assert columns.size() == schema.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ContainerVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ContainerVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ContainerVisitor.java
new file mode 100644
index 0000000..28c8c59
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ContainerVisitor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
+import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.RepeatedListVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+public class ContainerVisitor<R, A> {
+
+  public R apply(VectorContainer container, A arg) {
+    return visitContainer(container, arg);
+  }
+
+  private R visitContainer(VectorContainer container, A arg) {
+    return visitChildren(container, arg);
+  }
+
+  public R visitChildren(VectorContainer container, A arg) {
+    for (int i = 0; i < container.getNumberOfColumns(); i++) {
+      @SuppressWarnings("resource")
+      ValueVector vector = container.getValueVector(i).getValueVector();
+      apply(vector, arg);
+    }
+    return null;
+  }
+
+  protected R apply(ValueVector vector, A arg) {
+    MaterializedField schema = vector.getField();
+    MajorType majorType = schema.getType();
+    MinorType type = majorType.getMinorType();
+    DataMode mode = majorType.getMode();
+    switch (type) {
+    case MAP:
+      if (mode == DataMode.REPEATED) {
+        return visitRepeatedMap((RepeatedMapVector) vector, arg);
+      } else {
+        return visitMap((AbstractMapVector) vector, arg);
+      }
+    case LIST:
+      if (mode == DataMode.REPEATED) {
+        return visitRepeatedList((RepeatedListVector) vector, arg);
+      } else {
+        return visitList((ListVector) vector, arg);
+      }
+    default:
+      if (mode == DataMode.REPEATED) {
+        return visitRepeatedPrimitive((BaseRepeatedValueVector) vector, arg);
+      } else {
+        return visitPrimitive(vector, arg);
+      }
+    }
+  }
+
+  protected R visitRepeatedMap(RepeatedMapVector vector, A arg) {
+    visitChildren(vector, arg);
+    return visitVector(vector, arg);
+  }
+
+  protected R visitMap(AbstractMapVector vector, A arg) {
+    visitChildren(vector, arg);
+    return visitVector(vector, arg);
+  }
+
+  private R visitChildren(AbstractMapVector vector, A arg) {
+    for (int i = 0; i < vector.size(); i++) {
+      apply(vector.getChildByOrdinal(i), arg);
+    }
+    return null;
+  }
+
+  protected R visitRepeatedList(RepeatedListVector vector, A arg) {
+    return visitVector(vector, arg);
+  }
+
+  protected R visitList(ListVector vector, A arg) {
+    return visitVector(vector, arg);
+  }
+
+  protected R visitRepeatedPrimitive(BaseRepeatedValueVector vector, A arg) {
+    return visitVector(vector, arg);
+  }
+
+  protected R visitPrimitive(ValueVector vector, A arg) {
+    return visitVector(vector, arg);
+  }
+
+  protected R visitVector(ValueVector vector, A arg) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java
new file mode 100644
index 0000000..bb5e18e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model;
+
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+
+/**
+ * Interface for retrieving and/or creating metadata given
+ * a vector.
+ */
+
+public interface MetadataProvider {
+  ColumnMetadata metadata(int index, MaterializedField field);
+  MetadataProvider childProvider(ColumnMetadata colMetadata);
+  TupleMetadata tuple();
+
+  public static class VectorDescrip {
+    public final MetadataProvider parent;
+    public final ColumnMetadata metadata;
+
+    public VectorDescrip(MetadataProvider provider, int index,
+        MaterializedField field) {
+      parent = provider;
+      metadata = provider.metadata(index, field);
+    }
+  }
+
+  public static class MetadataCreator implements MetadataProvider {
+
+    private final TupleSchema tuple;
+
+    public MetadataCreator() {
+      tuple = new TupleSchema();
+    }
+
+    public MetadataCreator(TupleSchema tuple) {
+      this.tuple = tuple;
+    }
+
+    @Override
+    public ColumnMetadata metadata(int index, MaterializedField field) {
+      return tuple.addView(field);
+    }
+
+    @Override
+    public MetadataProvider childProvider(ColumnMetadata colMetadata) {
+      return new MetadataCreator((TupleSchema) colMetadata.mapSchema());
+    }
+
+    @Override
+    public TupleMetadata tuple() { return tuple; }
+  }
+
+  public static class MetadataRetrieval implements MetadataProvider {
+
+    private final TupleMetadata tuple;
+
+    public MetadataRetrieval(TupleMetadata schema) {
+      tuple = schema;
+    }
+
+    @Override
+    public ColumnMetadata metadata(int index, MaterializedField field) {
+      return tuple.metadata(index);
+    }
+
+    @Override
+    public MetadataProvider childProvider(ColumnMetadata colMetadata) {
+      return new MetadataRetrieval((TupleSchema) colMetadata.mapSchema());
+    }
+
+    @Override
+    public TupleMetadata tuple() { return tuple; }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java
new file mode 100644
index 0000000..c4b0415
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model;
+
+import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
+
+/**
+ * Row set index base class used when indexing rows within a row
+ * set for a row set reader. Keeps track of the current position,
+ * which starts before the first row, meaning that the client
+ * must call <tt>next()</tt> to advance to the first row.
+ */
+
+public abstract class ReaderIndex implements ColumnReaderIndex {
+
+  protected int rowIndex = -1;
+  protected final int rowCount;
+
+  public ReaderIndex(int rowCount) {
+    this.rowCount = rowCount;
+  }
+
+  public int position() { return rowIndex; }
+  public void set(int index) { rowIndex = index; }
+
+  public boolean next() {
+    if (++rowIndex < rowCount ) {
+      return true;
+    } else {
+      rowIndex--;
+      return false;
+    }
+  }
+
+  public int size() { return rowCount; }
+
+  public boolean valid() { return rowIndex < rowCount; }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java
new file mode 100644
index 0000000..3db01dd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * Produce a metadata schema from a vector container. Used when given a
+ * record batch without metadata.
+ */
+
+public class SchemaInference {
+
+  public TupleMetadata infer(VectorContainer container) {
+    List<ColumnMetadata> columns = new ArrayList<>();
+    for (int i = 0; i < container.getNumberOfColumns(); i++) {
+      MaterializedField field = container.getValueVector(i).getField();
+      columns.add(inferVector(field));
+    }
+    return TupleSchema.fromColumns(columns);
+  }
+
+  private ColumnMetadata inferVector(MaterializedField field) {
+    if (field.getType().getMinorType() == MinorType.MAP) {
+      return TupleSchema.newMap(field, inferMapSchema(field));
+    } else {
+      return TupleSchema.fromField(field);
+    }
+  }
+
+  private TupleSchema inferMapSchema(MaterializedField field) {
+    List<ColumnMetadata> columns = new ArrayList<>();
+    for (MaterializedField child : field.getChildren()) {
+      columns.add(inferVector(child));
+    }
+    return TupleSchema.fromColumns(columns);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/TupleModel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/TupleModel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/TupleModel.java
new file mode 100644
index 0000000..5fcba73
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/TupleModel.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model;
+
+import javax.sql.RowSet;
+
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+
+/**
+ * Common interface to access a tuple backed by a vector container or a
+ * map vector. Provides a visitor interface to apply tasks such as vector
+ * allocation, reader or writer creation, and so on. Allows either static
+ * or dynamic vector allocation.
+ * <p>
+ * The terminology used here:
+ * <dl>
+ * <dt>Row set</dt>
+ * <dd>A collection of rows stored as value vectors. Elsewhere in
+ * Drill we call this a "record batch", but that term has been overloaded to
+ * mean the runtime implementation of an operator.</dd>
+ * <dt>Tuple</dt>
+ * <dd>The relational-theory term for a row. Drill maps have a fixed schema.
+ * Impala, Hive and other tools use the term "structure" (or "struct") for
+ * what Drill calls a map. A structure is simply a nested tuple, modeled
+ * here by the same tuple abstraction used for rows.</dd>
+ * <dt>Column</dt>
+ * <dd>A column is represented by a vector (which may have internal
+ * null-flag or offset vectors.) Maps are a kind of column that has an
+ * associated tuple. Because this abstraction models structure, array
+ * columns are grouped with single values: the array-ness is just cardinality.</dd>
+ * <dt>Visitor</dt>
+ * <dd>The visitor abstraction (classic Gang-of-Four pattern) allows adding
+ * functionality without complicating the structure classes. Allows the same
+ * abstraction to be used for the testing {@link RowSet} abstractions and
+ * the scan operator "loader" classes.</dd>
+ * <dt>Metadata</dt>
+ * <dd>Metadata is simply data about data. Here, data about tuples and columns.
+ * The column metadata mostly expands on that available in {@link MaterializedField},
+ * but also adds allocation hints.
+ * </dl>
+ * <p>
+ * This abstraction is the physical dual of a {@link VectorContainer}.
+ * The vectors are "owned" by
+ * the associated container. The structure here simply applies additional
+ * metadata and visitor behavior to allow much easier processing that is
+ * possible with the raw container structure.
+ * <p>
+ * A key value of this abstraction is the extended {@link TupleSchema}
+ * associated with the structure.  Unlike a
+ * {@link VectorContainer}, this abstraction keeps the schema in sync
+ * with vectors as columns are added.
+ * <p>
+ * Some future version may wish to merge the two concepts. That way, metadata
+ * discovered by one operator will be available to another. Complex recursive
+ * functions can be replace by a visitor with the recursion handled inside
+ * implementations of this interface.
+ * <p>
+ * Tuples provide access to columns by both index and name. Both the schema and
+ * model classes follow this convention. Compared with the VectorContainer and
+ * {@link AbstractMapVector} classes, the vector index is a first-class concept:
+ * the column model and schema are guaranteed to reside at the same index relative
+ * to the enclosing tuple. In addition, name access is efficient using a hash
+ * index.
+ * <p>
+ * Visitor classes are defined by the "simple" (single batch) and "hyper"
+ * (multi-batch) implementations to allow vector implementations to work
+ * with the specifics of each type of batch.
+ */
+
+public interface TupleModel {
+
+  /**
+   * Common interface to access a column vector, its metadata, and its
+   * tuple definition (for maps.) Provides a visitor interface for common
+   * vector tasks.
+   */
+
+  public interface ColumnModel {
+    ColumnMetadata schema();
+    TupleModel mapModel();
+  }
+
+  /**
+   * Tuple-model interface for the top-level row (tuple) structure.
+   * Provides access to the {@link VectorContainer} representation of the
+   * row set (record batch.)
+   */
+
+  public interface RowSetModel extends TupleModel {
+    VectorContainer container();
+  }
+
+  TupleMetadata schema();
+  int size();
+  ColumnModel column(int index);
+  ColumnModel column(String name);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java
new file mode 100644
index 0000000..ee856be
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model.hyper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.VectorDescrip;
+import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
+import org.apache.drill.exec.vector.accessor.impl.AccessorUtilities;
+import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
+import org.apache.drill.exec.vector.accessor.reader.ColumnReaderFactory;
+import org.apache.drill.exec.vector.accessor.reader.MapReader;
+import org.apache.drill.exec.vector.accessor.reader.ObjectArrayReader;
+import org.apache.drill.exec.vector.accessor.reader.VectorAccessor;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+
+public abstract class BaseReaderBuilder {
+
+  /**
+   * Read-only row index into the hyper row set with batch and index
+   * values mapping via an SV4.
+   */
+
+  public static class HyperRowIndex extends ReaderIndex {
+
+    private final SelectionVector4 sv4;
+
+    public HyperRowIndex(SelectionVector4 sv4) {
+      super(sv4.getCount());
+      this.sv4 = sv4;
+    }
+
+    @Override
+    public int vectorIndex() {
+      return AccessorUtilities.sv4Index(sv4.get(rowIndex));
+    }
+
+    @Override
+    public int batchIndex( ) {
+      return AccessorUtilities.sv4Batch(sv4.get(rowIndex));
+    }
+  }
+
+  /**
+   * Vector accessor used by the column accessors to obtain the vector for
+   * each column value. That is, position 0 might be batch 4, index 3,
+   * while position 1 might be batch 1, index 7, and so on.
+   */
+
+  public static class HyperVectorAccessor implements VectorAccessor {
+
+    private final ValueVector[] vectors;
+    private ColumnReaderIndex rowIndex;
+
+    public HyperVectorAccessor(VectorWrapper<?> vw) {
+      vectors = vw.getValueVectors();
+    }
+
+    @Override
+    public void bind(ColumnReaderIndex index) {
+      rowIndex = index;
+    }
+
+    @Override
+    public ValueVector vector() {
+      return vectors[rowIndex.batchIndex()];
+    }
+  }
+
+
+  protected AbstractObjectReader[] buildContainerChildren(
+      VectorContainer container, MetadataProvider mdProvider) {
+    List<AbstractObjectReader> readers = new ArrayList<>();
+    for (int i = 0; i < container.getNumberOfColumns(); i++) {
+      VectorWrapper<?> vw = container.getValueVector(i);
+      VectorDescrip descrip = new VectorDescrip(mdProvider, i, vw.getField());
+      readers.add(buildVectorReader(vw, descrip));
+    }
+    return readers.toArray(new AbstractObjectReader[readers.size()]);
+  }
+
+  @SuppressWarnings("unchecked")
+  private AbstractObjectReader buildVectorReader(VectorWrapper<?> vw, VectorDescrip descrip) {
+    MajorType type = vw.getField().getType();
+    if (type.getMinorType() == MinorType.MAP) {
+      if (type.getMode() == DataMode.REPEATED) {
+        return buildMapArrayReader((HyperVectorWrapper<? extends AbstractMapVector>) vw, descrip);
+      } else {
+        return buildMapReader((HyperVectorWrapper<? extends AbstractMapVector>) vw, descrip);
+      }
+    } else {
+      return buildPrimitiveReader(vw, descrip);
+    }
+  }
+
+  private AbstractObjectReader buildMapArrayReader(HyperVectorWrapper<? extends AbstractMapVector> vectors, VectorDescrip descrip) {
+    AbstractObjectReader mapReader = MapReader.build(descrip.metadata, buildMap(vectors, descrip));
+    return ObjectArrayReader.build(new HyperVectorAccessor(vectors), mapReader);
+  }
+
+  private AbstractObjectReader buildMapReader(HyperVectorWrapper<? extends AbstractMapVector> vectors, VectorDescrip descrip) {
+    return MapReader.build(descrip.metadata, buildMap(vectors, descrip));
+  }
+
+  private AbstractObjectReader buildPrimitiveReader(VectorWrapper<?> vw, VectorDescrip descrip) {
+    return ColumnReaderFactory.buildColumnReader(
+        vw.getField().getType(), new HyperVectorAccessor(vw));
+  }
+
+  private List<AbstractObjectReader> buildMap(HyperVectorWrapper<? extends AbstractMapVector> vectors, VectorDescrip descrip) {
+    List<AbstractObjectReader> readers = new ArrayList<>();
+    MetadataProvider provider = descrip.parent.childProvider(descrip.metadata);
+    MaterializedField mapField = vectors.getField();
+    for (int i = 0; i < mapField.getChildren().size(); i++) {
+      HyperVectorWrapper<? extends ValueVector> child = (HyperVectorWrapper<? extends ValueVector>) vectors.getChildWrapper(new int[] {i});
+      VectorDescrip childDescrip = new VectorDescrip(provider, i, child.getField());
+      readers.add(buildVectorReader(child, childDescrip));
+      i++;
+    }
+    return readers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/package-info.java
new file mode 100644
index 0000000..433231e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of a row set model for hyper-batches. A hyper batch is
+ * one that contains a list of batches. The batch is logically comprised
+ * of "hyper-vectors" which are the individual vectors from each batch
+ * stacked "end-to-end."
+ * <p>
+ * Hyper batches allow only reading. So, the only services here are to
+ * parse a hyper-container into a row set model, then use that model to
+ * create a matching set of readers.
+ */
+
+package org.apache.drill.exec.physical.rowSet.model.hyper;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/package-info.java
new file mode 100644
index 0000000..6f24d33
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/package-info.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The "row set model" provides a "dual" of the vector structure used to create,
+ * allocate and work with a collection of vectors. The model provides an enhanced
+ * "metadata" schema, given by {@link TupleMetadata} and {@link ColumnMetadata},
+ * with allocation hints that goes beyond the {@link MaterializedField}
+ * used by value vectors.
+ * <p>
+ * In an ideal world, this structure would not be necessary; the vectors could, by
+ * themselves, provide the needed structure. However, vectors are used in many
+ * places, in many ways, and are hard to evolve. Further, Drill may eventually
+ * choose to move to Arrow, which would not have the structure provided here.
+ * <p>
+ * A set of visitor classes provide the logic to traverse the vector structure,
+ * avoiding the need for multiple implementations of vector traversal. (Traversal
+ * is needed because maps contain vectors, some of which can be maps, resulting
+ * in a tree structure. Further, the API provided by containers (a top-level
+ * tuple) differs from that of a map vector (nested tuple.) This structure provides
+ * a uniform API for both cases.
+ * <p>
+ * Three primary tasks provided by this structure are:
+ * <ol>
+ * <li>Create writers for a set of vectors. Allow incremental write-time
+ * addition of columns, keeping the vectors, columns and metadata all in
+ * sync.</li>
+ * <li>Create readers for a set of vectors. Vectors are immutable once written,
+ * so the reader mechanism does not provide any dynamic schema change
+ * support.</li>
+ * <li>Allocate vectors based on metadata provided. Allocation metadata
+ * includes estimated widths for variable-width columns and estimated
+ * cardinality for array columns.</li>
+ * </ol>
+ * <p>
+ * Drill supports two kinds of batches, reflected by two implementations of
+ * the structure:
+ * <dl>
+ * <dt>Single batch</dt>
+ * <dd>Represents a single batch in which each column is backed by a single
+ * value vector. Single batches support both reading and writing. Writing can
+ * be done only for "new" batches; reading can be done only after writing
+ * is complete. Modeled by the {#link org.apache.drill.exec.physical.rowSet.model.single
+ * single} package.</dd>
+ * <dt>Hyper batch</dt>
+ * <dd>Represents a stacked set of batches in which each column is backed
+ * by a list of columns. A hyper batch is indexed by an "sv4" (four-byte
+ * selection vector.) A hyper batch allows only reading. Modeled by the
+ * {@link org.apache.drill.exec.physical.rowSet.model.hyper hyper} package.</dd>
+ * </dl>
+ */
+
+package org.apache.drill.exec.physical.rowSet.model;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java
new file mode 100644
index 0000000..80ad19f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model.single;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.VectorDescrip;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
+import org.apache.drill.exec.vector.accessor.reader.ColumnReaderFactory;
+import org.apache.drill.exec.vector.accessor.reader.MapReader;
+import org.apache.drill.exec.vector.accessor.reader.ObjectArrayReader;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+public abstract class BaseReaderBuilder {
+
+  protected List<AbstractObjectReader> buildContainerChildren(
+      VectorContainer container, MetadataProvider mdProvider) {
+    List<AbstractObjectReader> writers = new ArrayList<>();
+    for (int i = 0; i < container.getNumberOfColumns(); i++) {
+      @SuppressWarnings("resource")
+      ValueVector vector = container.getValueVector(i).getValueVector();
+      VectorDescrip descrip = new VectorDescrip(mdProvider, i, vector.getField());
+      writers.add(buildVectorReader(vector, descrip));
+    }
+    return writers;
+  }
+
+  private AbstractObjectReader buildVectorReader(ValueVector vector, VectorDescrip descrip) {
+    MajorType type = vector.getField().getType();
+    if (type.getMinorType() == MinorType.MAP) {
+      if (type.getMode() == DataMode.REPEATED) {
+        return buildMapArrayReader((RepeatedMapVector) vector, descrip);
+      } else {
+        return buildMapReader((MapVector) vector, descrip);
+      }
+    } else {
+      return buildPrimitiveReader(vector, descrip);
+    }
+  }
+
+  private AbstractObjectReader buildMapArrayReader(RepeatedMapVector vector, VectorDescrip descrip) {
+    AbstractObjectReader mapReader = MapReader.build(descrip.metadata, buildMap(vector, descrip));
+    return ObjectArrayReader.build(vector, mapReader);
+  }
+
+  private AbstractObjectReader buildMapReader(MapVector vector, VectorDescrip descrip) {
+    return MapReader.build(descrip.metadata, buildMap(vector, descrip));
+  }
+
+  private AbstractObjectReader buildPrimitiveReader(ValueVector vector, VectorDescrip descrip) {
+    return ColumnReaderFactory.buildColumnReader(vector);
+  }
+
+  private List<AbstractObjectReader> buildMap(AbstractMapVector vector, VectorDescrip descrip) {
+    List<AbstractObjectReader> readers = new ArrayList<>();
+    MetadataProvider provider = descrip.parent.childProvider(descrip.metadata);
+    int i = 0;
+    for (ValueVector child : vector) {
+      VectorDescrip childDescrip = new VectorDescrip(provider, i, child.getField());
+      readers.add(buildVectorReader(child, childDescrip));
+      i++;
+    }
+    return readers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java
new file mode 100644
index 0000000..bab7b39
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model.single;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.VectorDescrip;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+
+/**
+ * Build a set of writers for a single (non-hyper) vector container.
+ */
+
+public abstract class BaseWriterBuilder {
+
+  protected List<AbstractObjectWriter> buildContainerChildren(VectorContainer container, MetadataProvider mdProvider) {
+    List<AbstractObjectWriter> writers = new ArrayList<>();
+    for (int i = 0; i < container.getNumberOfColumns(); i++) {
+      @SuppressWarnings("resource")
+      ValueVector vector = container.getValueVector(i).getValueVector();
+      VectorDescrip descrip = new VectorDescrip(mdProvider, i, vector.getField());
+      writers.add(buildVectorWriter(vector, descrip));
+    }
+    return writers;
+  }
+
+  private AbstractObjectWriter buildVectorWriter(ValueVector vector, VectorDescrip descrip) {
+    MajorType type = vector.getField().getType();
+    if (type.getMinorType() == MinorType.MAP) {
+      return ColumnWriterFactory.buildMapWriter(descrip.metadata,
+          (AbstractMapVector) vector,
+          buildMap((AbstractMapVector) vector, descrip));
+    } else {
+      return ColumnWriterFactory.buildColumnWriter(descrip.metadata, vector);
+    }
+  }
+
+  private List<AbstractObjectWriter> buildMap(AbstractMapVector vector, VectorDescrip descrip) {
+    List<AbstractObjectWriter> writers = new ArrayList<>();
+    MetadataProvider provider = descrip.parent.childProvider(descrip.metadata);
+    int i = 0;
+    for (ValueVector child : vector) {
+      VectorDescrip childDescrip = new VectorDescrip(provider, i, child.getField());
+      writers.add(buildVectorWriter(child, childDescrip));
+      i++;
+    }
+    return writers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java
new file mode 100644
index 0000000..30f60b3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model.single;
+
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+
+/**
+ * Build (materialize) as set of vectors based on a provided
+ * metadata schema.
+ */
+
+public class BuildVectorsFromMetadata {
+
+  private final BufferAllocator allocator;
+
+  public BuildVectorsFromMetadata(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  public VectorContainer build(TupleMetadata schema) {
+    VectorContainer container = new VectorContainer(allocator);
+    for (int i = 0; i < schema.size(); i++) {
+      container.add(buildVector(schema.metadata(i)));
+    }
+
+    // Build the row set from a matching triple of schema, container and
+    // column models.
+
+    container.buildSchema(SelectionVectorMode.NONE);
+    return container;
+  }
+
+  private ValueVector buildVector(ColumnMetadata metadata) {
+    if (metadata.isMap()) {
+      return buildMap(metadata);
+    } else {
+      return TypeHelper.getNewVector(metadata.schema(), allocator, null);
+    }
+  }
+
+  /**
+   * Build a map column including the members of the map given a map
+   * column schema.
+   *
+   * @param schema the schema of the map column
+   * @return the completed map vector column model
+   */
+
+  private AbstractMapVector buildMap(ColumnMetadata schema) {
+
+    // Creating the map vector will create its contained vectors if we
+    // give it a materialized field with children. So, instead pass a clone
+    // without children so we can add them.
+
+    MaterializedField mapField = schema.schema();
+    MaterializedField emptyClone = MaterializedField.create(mapField.getName(), mapField.getType());
+
+    // Don't get the map vector from the vector cache. Map vectors may
+    // have content that varies from batch to batch. Only the leaf
+    // vectors can be cached.
+
+    AbstractMapVector mapVector = (AbstractMapVector) TypeHelper.getNewVector(emptyClone, allocator, null);
+
+    // Create the contents building the model as we go.
+
+    TupleMetadata mapSchema = schema.mapSchema();
+    for (int i = 0; i < mapSchema.size(); i++) {
+      ColumnMetadata childSchema = mapSchema.metadata(i);
+      mapVector.putChild(childSchema.name(), buildVector(childSchema));
+    }
+
+    return mapVector;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/VectorAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/VectorAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/VectorAllocator.java
new file mode 100644
index 0000000..34a6960
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/VectorAllocator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.model.single;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataCreator;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+/**
+ * Given a vector container, and a metadata schema that matches the container,
+ * walk the schema tree to allocate new vectors according to a given
+ * row count and the size information provided in column metadata.
+ * <p>
+ * @see {@link AllocationHelper} - the class which this one replaces
+ * @see {@link VectorInitializer} - an earlier cut at implementation
+ * based on data from the {@link RecordBatchSizer}
+ */
+
+// TODO: Does not yet handle lists; lists are a simple extension
+// of the array-handling logic below.
+
+public class VectorAllocator {
+
+  private final VectorContainer container;
+
+  public VectorAllocator(VectorContainer container) {
+    this.container = container;
+  }
+
+  public void allocate(int rowCount) {
+    allocate(rowCount, new MetadataCreator());
+  }
+
+  public void allocate(int rowCount, TupleMetadata schema) {
+    allocate(rowCount, new MetadataRetrieval(schema));
+  }
+
+  public void allocate(int rowCount, MetadataProvider mdProvider) {
+    for (int i = 0; i < container.getNumberOfColumns(); i++) {
+      @SuppressWarnings("resource")
+      ValueVector vector = container.getValueVector(i).getValueVector();
+      allocateVector(vector, mdProvider.metadata(i, vector.getField()), rowCount, mdProvider);
+    }
+  }
+
+  private void allocateVector(ValueVector vector, ColumnMetadata metadata, int valueCount, MetadataProvider mdProvider) {
+    MajorType type = vector.getField().getType();
+    assert vector.getField().getName().equals(metadata.name());
+    assert type.getMinorType() == metadata.type();
+    if (type.getMinorType() == MinorType.MAP) {
+      if (type.getMode() == DataMode.REPEATED) {
+        allocateMapArray((RepeatedMapVector) vector, metadata, valueCount, mdProvider);
+      } else {
+        allocateMap((AbstractMapVector) vector, metadata, valueCount, mdProvider);
+      }
+    } else {
+      allocatePrimitive(vector, metadata, valueCount);
+    }
+  }
+
+  private void allocatePrimitive(ValueVector vector,
+      ColumnMetadata metadata, int valueCount) {
+    AllocationHelper.allocatePrecomputedChildCount(vector,
+        valueCount,
+        metadata.expectedWidth(),
+        metadata.expectedElementCount());
+  }
+
+  private void allocateMapArray(RepeatedMapVector vector,
+      ColumnMetadata metadata, int valueCount, MetadataProvider mdProvider) {
+    ((RepeatedMapVector) vector).getOffsetVector().allocateNew(valueCount);
+    int expectedValueCount = valueCount * metadata.expectedElementCount();
+    allocateMap(vector, metadata, expectedValueCount, mdProvider);
+  }
+
+  private void allocateMap(AbstractMapVector vector, ColumnMetadata metadata, int valueCount, MetadataProvider mdProvider) {
+    MetadataProvider mapProvider = mdProvider.childProvider(metadata);
+    TupleMetadata mapSchema = metadata.mapSchema();
+    assert mapSchema != null;
+    int i = 0;
+    for (ValueVector child : vector) {
+      allocateVector(child, mapProvider.metadata(i, child.getField()), valueCount, mapProvider);
+      i++;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/package-info.java
new file mode 100644
index 0000000..6cb6f27
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This set of classes models the structure of a batch consisting
+ * of single vectors (as contrasted with a hyper batch.) Provides tools
+ * or metdata-based construction, allocation, reading and writing of
+ * the vectors.
+ * <p>
+ * The classes here walk the container/map/vector tree to apply
+ * operations.
+ */
+
+package org.apache.drill.exec.physical.rowSet.model.single;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/package-info.java
new file mode 100644
index 0000000..d92c6b7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/package-info.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides a second-generation row set (AKA "record batch") writer used
+ * by client code to<ul>
+ * <li>Define the schema of a result set.</li>
+ * <li>Write data into the vectors backing a row set.</li></ul>
+ * <p>
+ * <h4>Terminology</h4>
+ * The code here follows the "row/column" naming convention rather than
+ * the "record/field" convention.
+ * <dl>
+ * <dt>Result set</dt>
+ * <dd>A set of zero or more row sets that hold rows of data.<dd>
+ * <dt>Row set</dt>
+ * <dd>A collection of rows with a common schema. Also called a "row
+ * batch" or "record batch." (But, in Drill, the term "record batch" also
+ * usually means an operator on that set of records. Here, a row set is
+ * just the rows &nash; separate from operations on that data.</dd>
+ * <dt>Row</dt>
+ * <dd>A single row of data, in the usual database sense. Here, a row is
+ * a kind of tuple (see below) allowing both name and index access to
+ * columns.</dd>
+ * <dt>Tuple</dt>
+ * <dd>In relational theory, a row is a tuple: a collection of values
+ * defined by a schema. Tuple values are indexed by position or name.</dd>
+ * <dt>Column</dt>
+ * <dd>A single value within a row or row set. (Generally, the context
+ * makes clear if the term refers to single value or all values for a
+ * column for a row set. Columns are backed by value vectors.</dd>
+ * <dt>Map</dt>
+ * <dd>In Drill, a map is what other systems call a "structure". It is,
+ * in fact, a nested tuple. In a Java or Python map, each map instance has
+ * a distinct set of name/value pairs. But, in Drill, all map instances have
+ * the same schema; hence the so-called "map" is really a tuple. This
+ * implementation exploits that fact and treats the row, and nested maps,
+ * almost identically: both provide columns indexed by name or position.</dd>
+ * <dt>Row Set Mutator</dt>
+ * <dd>An awkward name, but retains the "mutator" name from the previous
+ * generation. The mechanism to build a result set as series of row sets.</dd>
+ * <dt>Tuple Loader</dt>
+ * <dd>Mechanism to build a single tuple (row or map) by providing name
+ * or index access to columns. A better name would b "tuple writer", but
+ * that name is already used elsewhere.</dd>
+ * <dt>Column Loader</dt>
+ * <dd>Mechanism to write values to a single column.<dd>
+ * </dl>
+ * <h4>Building the Schema</h4>
+ * The row set mutator works for two cases: a known schema or a discovered
+ * schema. A known schema occurs in the case, such as JDBC, where the
+ * underlying data source can describe the schema before reading any rows.
+ * In this case, client code can build the schema and pass that schema to
+ * the mutator directly. Alternatively, the client code can build the
+ * schema column-by-column before the first row is read.
+ * <p>
+ * Readers that discover schema can build the schema incrementally: add
+ * a column, load data for that column for one row, discover the next
+ * column, and so on. Almost any kind of column can be added at any time
+ * within the first batch:<ul>
+ * <li>Required columns are "back-filled" with zeros in the active batch,
+ * if that value
+ * makes sense for the column. (Date and Interval columns will throw an
+ * exception if added after the first row as there is no good "zero"
+ * value for that column. Varchar columns are back-filled with blanks.<li>
+ * <li>Optional (nullable) columns can be added at any time; they are
+ * back-filled with nulls in the active batch. In general, if a column is
+ * added after the first row, it should be nullable, not required, unless
+ * the data source has a "missing = blank or zero" policy.</li>
+ * <li>Repeated (array) columns can be added at any time; they are
+ * back-filled with empty entries in the first batch. Arrays can also be
+ * safely added at any time.</li></ul>
+ * Client code must be aware of the semantics of adding columns at various
+ * times.<ul>
+ * <li>Columns added before or during the first row are the trivial case;
+ * this works for all data types and modes.</li>
+ * <li>Required (non-nullable0 structured columns (Date, Period) cannot be
+ * added after the first row (as there is no good zero-fill value.)</li>
+ * <li>Columns added within the first batch appear to the rest of Drill as
+ * if they were added before the first row: the downstream operators see the
+ * same schema from batch to batch.</li>
+ * <li>Columns added <i>after</i> the first batch will trigger a
+ * schema-change event downstream.</li>
+ * <li>The above is true during an "overflow row" (see below.) Once
+ * overflow occurs, columns added later in that overflow row will actually
+ * appear in the next batch, and will trigger a schema change when that
+ * batch is returned. That is, overflow "time shifts" a row addition from
+ * one batch to the next, and so it also time-shifts the column addition.
+ * </li></ul>
+ * Use the {@link LoaderSchema} class to build the schema. The schema class is
+ * part of the {@link TupleLoader} object available from the
+ * {@link #root()} method.
+ * <h4>Using the Schema</h4>
+ * Presents columns using a physical schema. That is, map columns appear
+ * as columns that provide a nested map schema. Presumes that column
+ * access is primarily structural: first get a map, then process all
+ * columns for the map.
+ * <p>
+ * If the input is a flat structure, then the physical schema has a
+ * flattened schema as the degenerate case.
+ * <p>
+ * In both cases, access to columns is by index or by name. If new columns
+ * are added while loading, their index is always at the end of the existing
+ * columns.
+ * <h4>Writing Data to the Batch</h4>
+ * Each batch is delimited by a call to {@link #startBatch()} and a call to
+ * {@link #harvestWithLookAhead()} to obtain the completed batch. Note that readers do not
+ * call these methods; the scan operator does this work.
+ * <p>
+ * Each row is delimited by a call to {@link #startValue()} and a call to
+ * {@link #saveRow()}. <tt>startRow()</tt> performs initialization necessary
+ * for some vectors such as repeated vectors. <tt>saveRow()</tt> moves the
+ * row pointer ahead.
+ * <p>
+ * A reader can easily reject a row by calling <tt>startRow()</tt>, begin
+ * to load a row, but omitting the call to <tt>saveRow()</tt> In this case,
+ * the next call to <tt>startRow()</tt> repositions the row pointer to the
+ * same row, and new data will overwrite the previous data, effectively erasing
+ * the unwanted row. This also works for the last row; omitting the call to
+ * <tt>saveRow()</tt> causes the batch to hold only the rows actually
+ * saved.
+ * <p>
+ * Readers then write to each column. Columns are accessible via index
+ * ({@link TupleLoader#column(int)} or by name
+ * ({@link TupleLoader#column(String)}. Indexed access is much faster.
+ * Column indexes are defined by the order that columns are added. The first
+ * column is column 0, the second is column 1 and so on.
+ * <p>
+ * Each call to the above methods returns the same column writer, allowing the
+ * reader to cache column writers for additional performance.
+ * <p>
+ * All column writers are of the same class; there is no need to cast to a
+ * type corresponding to the vector. Instead, they provide a variety of
+ * <tt>set<i>Type</i></tt> methods, where the type is one of various Java
+ * primitive or structured types. Most vectors provide just one method, but
+ * others (such as VarChar) provide two. The implementation will throw an
+ * exception if the vector does not support a particular type.
+ * <p>
+ * Note that this class uses the term "loader" for row and column writers
+ * since the term "writer" is already used by the legacy record set mutator
+ * and column writers.
+ * <h4>Handling Batch Limits</h4>
+ * The mutator enforces two sets of batch limits:<ol>
+ * <li>The number of rows per batch. The limit defaults to 64K (the Drill
+ * maximum), but can be set lower by the client.</li>
+ * <li>The size of the largest vector, which is capped at 16 MB. (A future
+ * version may allow adjustable caps, or cap the memory of the entire
+ * batch.</li></ol>
+ * Both limits are presented to the client via the {@link #isFull()}
+ * method. After each call to {@link #saveRow()}, the client should call
+ * <tt>isFull()</tt> to determine if the client can add another row. Note
+ * that failing to do this check will cause the next call to
+ * {@link #startBatch()} to throw an exception.
+ * <p>
+ * The limits have subtle differences, however. Row limits are simple: at
+ * the end of the last row, the mutator notices that no more rows are possible,
+ * and so does not allow starting a new row.
+ * <p>
+ * Vector overflow is more complex. A row may consist of columns (a, b, c).
+ * The client may write column a, but then column b might trigger a vector
+ * overflow. (For example, b is a Varchar, and the value for b is larger than
+ * the space left in the vector.) The client cannot stop and rewrite a. Instead,
+ * the client simply continues writing the row. The mutator, internally, moves
+ * this "overflow" row to a new batch. The overflow row becomes the first row
+ * of the next batch rather than the first row of the current batch.
+ * <p>
+ * For this reason, the client can treat the two overflow cases identically,
+ * as described above.
+ * <p>
+ * There are some subtle differences between the two cases that clients may
+ * occasionally may need to expect:<ul>
+ * <li>When a vector overflow occurs, the returned batch will have one
+ * fewer rows than the client might expect if it is simply counting the rows
+ * written.</li>
+ * <li>A new column added to the batch after overflow occurs will appear in
+ * the <i>next</i> batch, triggering a schema change between the current and
+ * next batches.</li></ul>
+ */
+package org.apache.drill.exec.physical.rowSet;

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 0497cfd..2d01ef4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -96,6 +96,9 @@ public class BatchSchema implements Iterable<MaterializedField> {
     return result;
   }
 
+  // DRILL-5525: the semantics of this method are badly broken.
+  // Caveat emptor.
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
@@ -108,13 +111,24 @@ public class BatchSchema implements Iterable<MaterializedField> {
       return false;
     }
     BatchSchema other = (BatchSchema) obj;
+    if (selectionVectorMode != other.selectionVectorMode) {
+      return false;
+    }
     if (fields == null) {
-      if (other.fields != null) {
-        return false;
-      }
-    } else if (!fields.equals(other.fields)) {
+      return other.fields == null;
+    }
+
+    // Compare names.
+    // (DRILL-5525: actually compares all fields.)
+
+    if (!fields.equals(other.fields)) {
       return false;
     }
+
+    // Compare types
+    // (DRILL-5525: this code is redundant because any differences
+    // will fail above.)
+
     for (int i = 0; i < fields.size(); i++) {
       MajorType t1 = fields.get(i).getType();
       MajorType t2 = other.fields.get(i).getType();
@@ -128,13 +142,25 @@ public class BatchSchema implements Iterable<MaterializedField> {
         }
       }
     }
-    if (selectionVectorMode != other.selectionVectorMode) {
-      return false;
-    }
     return true;
   }
 
+  /**
+   * Compare that two schemas are identical according to the rules defined
+   * in {@ link MaterializedField#isEquivalent(MaterializedField)}. In particular,
+   * this method requires that the fields have a 1:1 ordered correspondence
+   * in the two schemas.
+   *
+   * @param other another non-null batch schema
+   * @return <tt>true</tt> if the two schemas are equivalent according to
+   * the {@link MaterializedField#isEquivalent(MaterializedField)} rules,
+   * false otherwise
+   */
+
   public boolean isEquivalent(BatchSchema other) {
+    if (this == other) {
+      return true;
+    }
     if (fields == null || other.fields == null) {
       return fields == other.fields;
     }
@@ -172,7 +198,7 @@ public class BatchSchema implements Iterable<MaterializedField> {
   }
 
   /**
-   * Merge two schema to produce a new, merged schema. The caller is responsible
+   * Merge two schemas to produce a new, merged schema. The caller is responsible
    * for ensuring that column names are unique. The order of the fields in the
    * new schema is the same as that of this schema, with the other schema's fields
    * appended in the order defined in the other schema.

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index b4ae2d2..acb7a9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.record;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.vector.ValueVector;
 
 /**
  * A record batch contains a set of field values for a particular range of
@@ -38,7 +39,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 public interface RecordBatch extends VectorAccessible {
 
   /** max batch size, limited by 2-byte length in SV2: 65536 = 2^16 */
-  public static final int MAX_BATCH_SIZE = 65536;
+  public static final int MAX_BATCH_SIZE = ValueVector.MAX_ROW_COUNT;
 
   /**
    * Describes the outcome of incrementing RecordBatch forward by a call to