You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:36 UTC

[09/15] drill git commit: DRILL-5657: Size-aware vector writer structure

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
index 030f95a..e1e18dc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
@@ -17,23 +17,13 @@
  */
 package org.apache.drill.test.rowSet;
 
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval;
+import org.apache.drill.exec.physical.rowSet.model.single.BaseReaderBuilder;
+import org.apache.drill.exec.record.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
-import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
-import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
-import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn;
-import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
 
 /**
  * Base class for row sets backed by a single record batch.
@@ -41,151 +31,27 @@ import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
 
 public abstract class AbstractSingleRowSet extends AbstractRowSet implements SingleRowSet {
 
-  /**
-   * Internal helper class to organize a set of value vectors for use by the
-   * row set class. Subclasses either build vectors from a schema, or map an
-   * existing vector container into the row set structure. The row set
-   * structure is based on a flattened structure; all vectors appear in
-   * a single vector array. Maps are set aside in a separate map list.
-   */
-
-  public abstract static class StructureBuilder {
-    protected final PhysicalSchema schema;
-    protected final BufferAllocator allocator;
-    protected final ValueVector[] valueVectors;
-    protected final MapVector[] mapVectors;
-    protected int vectorIndex;
-    protected int mapIndex;
-
-    public StructureBuilder(BufferAllocator allocator, RowSetSchema schema) {
-      this.allocator = allocator;
-      this.schema = schema.physical();
-      FlattenedSchema flatSchema = schema.flatAccess();
-      valueVectors = new ValueVector[flatSchema.count()];
-      if (flatSchema.mapCount() == 0) {
-        mapVectors = null;
-      } else {
-        mapVectors = new MapVector[flatSchema.mapCount()];
-      }
-    }
-  }
-
-  /**
-   * Create a set of value vectors given a schema, then map them into both
-   * the value container and the row set structure.
-   */
-
-  public static class VectorBuilder extends StructureBuilder {
-
-    public VectorBuilder(BufferAllocator allocator, RowSetSchema schema) {
-      super(allocator, schema);
-    }
-
-    public ValueVector[] buildContainer(VectorContainer container) {
-      for (int i = 0; i < schema.count(); i++) {
-        LogicalColumn colSchema = schema.column(i);
-        @SuppressWarnings("resource")
-        ValueVector v = TypeHelper.getNewVector(colSchema.field, allocator, null);
-        container.add(v);
-        if (colSchema.field.getType().getMinorType() == MinorType.MAP) {
-          MapVector mv = (MapVector) v;
-          mapVectors[mapIndex++] = mv;
-          buildMap(mv, colSchema.mapSchema);
-        } else {
-          valueVectors[vectorIndex++] = v;
-        }
-      }
-      container.buildSchema(SelectionVectorMode.NONE);
-      return valueVectors;
-    }
-
-    private void buildMap(MapVector mapVector, PhysicalSchema mapSchema) {
-      for (int i = 0; i < mapSchema.count(); i++) {
-        LogicalColumn colSchema = mapSchema.column(i);
-        MajorType type = colSchema.field.getType();
-        Class<? extends ValueVector> vectorClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
-        @SuppressWarnings("resource")
-        ValueVector v = mapVector.addOrGet(colSchema.field.getName(), type, vectorClass);
-        if (type.getMinorType() == MinorType.MAP) {
-          MapVector mv = (MapVector) v;
-          mapVectors[mapIndex++] = mv;
-          buildMap(mv, colSchema.mapSchema);
-        } else {
-          valueVectors[vectorIndex++] = v;
-        }
-      }
-    }
-  }
-
-  /**
-   * Build a row set given an existing vector container. In this case,
-   * the vectors exist and we simply need to pull them out of the container
-   * and maps and put them into the row set arrays.
-   */
-
-  public static class VectorMapper extends StructureBuilder {
-
-    public VectorMapper(BufferAllocator allocator, RowSetSchema schema) {
-      super(allocator, schema);
-    }
+  public static class RowSetReaderBuilder extends BaseReaderBuilder {
 
-    public ValueVector[] mapContainer(VectorContainer container) {
-      for (VectorWrapper<?> w : container) {
-        @SuppressWarnings("resource")
-        ValueVector v = w.getValueVector();
-        if (v.getField().getType().getMinorType() == MinorType.MAP) {
-          MapVector mv = (MapVector) v;
-          mapVectors[mapIndex++] = mv;
-          buildMap(mv);
-        } else {
-          valueVectors[vectorIndex++] = v;
-        }
-      }
-      return valueVectors;
+    public RowSetReader buildReader(AbstractSingleRowSet rowSet, ReaderIndex rowIndex) {
+      TupleMetadata schema = rowSet.schema();
+      return new RowSetReaderImpl(schema, rowIndex,
+          buildContainerChildren(rowSet.container(),
+          new MetadataRetrieval(schema)));
     }
-
-    private void buildMap(MapVector mapVector) {
-      for (ValueVector v : mapVector) {
-        if (v.getField().getType().getMinorType() == MinorType.MAP) {
-          MapVector mv = (MapVector) v;
-          mapVectors[mapIndex++] = mv;
-          buildMap(mv);
-        } else {
-          valueVectors[vectorIndex++] = v;
-        }
-      }
-    }
-  }
-
-  /**
-   * Flattened representation of value vectors using a depth-first
-   * traversal of maps. Order of vectors here correspond to the column
-   * indexes used to access columns in a reader or writer.
-   */
-
-  protected final ValueVector[] valueVectors;
-
-  public AbstractSingleRowSet(BufferAllocator allocator, BatchSchema schema) {
-    super(allocator, schema, new VectorContainer());
-    valueVectors = new VectorBuilder(allocator, super.schema).buildContainer(container);
-  }
-
-  public AbstractSingleRowSet(BufferAllocator allocator, VectorContainer container) {
-    super(allocator, container.getSchema(), container);
-    valueVectors = new VectorMapper(allocator, super.schema).mapContainer(container);
   }
 
   public AbstractSingleRowSet(AbstractSingleRowSet rowSet) {
-    super(rowSet.allocator, rowSet.schema.batch(), rowSet.container);
-    valueVectors = rowSet.valueVectors;
+    super(rowSet.container, rowSet.schema);
   }
 
-  @Override
-  public ValueVector[] vectors() { return valueVectors; }
+  public AbstractSingleRowSet(VectorContainer container, TupleMetadata schema) {
+    super(container, schema);
+  }
 
   @Override
   public long size() {
-    RecordBatchSizer sizer = new RecordBatchSizer(container);
+    RecordBatchSizer sizer = new RecordBatchSizer(container());
     return sizer.actualSize();
   }
 
@@ -197,21 +63,7 @@ public abstract class AbstractSingleRowSet extends AbstractRowSet implements Sin
    * (non-map) vectors.
    */
 
-  protected RowSetReader buildReader(RowSetIndex rowIndex) {
-    FlattenedSchema accessSchema = schema().flatAccess();
-    ValueVector[] valueVectors = vectors();
-    AbstractColumnReader[] readers = new AbstractColumnReader[valueVectors.length];
-    for (int i = 0; i < readers.length; i++) {
-      MinorType type = accessSchema.column(i).getType().getMinorType();
-      if (type == MinorType.MAP) {
-        readers[i] = null; // buildMapAccessor(i);
-      } else if (type == MinorType.LIST) {
-        readers[i] = null; // buildListAccessor(i);
-      } else {
-        readers[i] = ColumnAccessorFactory.newReader(valueVectors[i].getField().getType());
-        readers[i].bind(rowIndex, valueVectors[i]);
-      }
-    }
-    return new RowSetReaderImpl(accessSchema, rowIndex, readers);
+  protected RowSetReader buildReader(ReaderIndex rowIndex) {
+    return new RowSetReaderBuilder().buildReader(this, rowIndex);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
index 29a1702..5972f05 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
@@ -18,19 +18,21 @@
 package org.apache.drill.test.rowSet;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
+import org.apache.drill.exec.physical.rowSet.model.SchemaInference;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval;
+import org.apache.drill.exec.physical.rowSet.model.single.BaseWriterBuilder;
+import org.apache.drill.exec.physical.rowSet.model.single.BuildVectorsFromMetadata;
+import org.apache.drill.exec.physical.rowSet.model.single.VectorAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
 import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnWriter;
-import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
-import org.apache.drill.exec.vector.accessor.impl.TupleWriterImpl;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSetWriterImpl.WriterIndexImpl;
 
 /**
  * Implementation of a single row set with no indirection (selection)
@@ -46,118 +48,54 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
    * the first. (This is the JDBC RecordSet convention.)
    */
 
-  private static class DirectRowIndex extends BoundedRowIndex {
+  private static class DirectRowIndex extends ReaderIndex {
 
     public DirectRowIndex(int rowCount) {
       super(rowCount);
     }
 
     @Override
-    public int index() { return rowIndex; }
+    public int vectorIndex() { return rowIndex; }
 
     @Override
-    public int batch() { return 0; }
+    public int batchIndex() { return 0; }
   }
 
-  /**
-   * Writer index that points to each row in the row set. The index starts at
-   * the 0th row and advances one row on each increment. This allows writers to
-   * start positioned at the first row. Writes happen in the current row.
-   * Calling <tt>next()</tt> advances to the next position, effectively saving
-   * the current row. The most recent row can be abandoned easily simply by not
-   * calling <tt>next()</tt>. This means that the number of completed rows is
-   * the same as the row index.
-   */
-
-  private static class ExtendableRowIndex extends RowSetIndex {
-
-    private final int maxSize;
-
-    public ExtendableRowIndex(int maxSize) {
-      this.maxSize = maxSize;
-      rowIndex = 0;
-    }
+  public static class RowSetWriterBuilder extends BaseWriterBuilder {
 
-    @Override
-    public int index() { return rowIndex; }
-
-    @Override
-    public boolean next() {
-      if (++rowIndex <= maxSize ) {
-        return true;
-      } else {
-        rowIndex--;
-        return false;
-      }
+    public RowSetWriter buildWriter(DirectRowSet rowSet) {
+      WriterIndexImpl index = new WriterIndexImpl();
+      TupleMetadata schema = rowSet.schema();
+      RowSetWriterImpl writer = new RowSetWriterImpl(rowSet, schema, index,
+          buildContainerChildren(rowSet.container(),
+          new MetadataRetrieval(schema)));
+      return writer;
     }
-
-    @Override
-    public int size() { return rowIndex; }
-
-    @Override
-    public boolean valid() { return rowIndex < maxSize; }
-
-    @Override
-    public int batch() { return 0; }
   }
 
-  /**
-   * Implementation of a row set writer. Only available for newly-created,
-   * empty, direct, single row sets. Rewriting is not allowed, nor is writing
-   * to a hyper row set.
-   */
-
-  public class RowSetWriterImpl extends TupleWriterImpl implements RowSetWriter {
-
-    private final ExtendableRowIndex index;
-    private final ExtendableRowSet rowSet;
-
-    protected RowSetWriterImpl(ExtendableRowSet rowSet, TupleSchema schema, ExtendableRowIndex index, AbstractColumnWriter[] writers) {
-      super(schema, writers);
-      this.rowSet = rowSet;
-      this.index = index;
-      start();
-    }
-
-    @Override
-    public void setRow(Object...values) {
-      if (! index.valid()) {
-        throw new IndexOutOfBoundsException("Write past end of row set");
-      }
-      for (int i = 0; i < values.length;  i++) {
-        set(i, values[i]);
-      }
-      save();
-    }
-
-    @Override
-    public boolean valid() { return index.valid(); }
-
-    @Override
-    public int index() { return index.position(); }
+  private DirectRowSet(VectorContainer container, TupleMetadata schema) {
+    super(container, schema);
+  }
 
-    @Override
-    public void save() {
-      index.next();
-      start();
-    }
+  public DirectRowSet(AbstractSingleRowSet from) {
+    super(from);
+  }
 
-    @Override
-    public void done() {
-      rowSet.setRowCount(index.size());
-    }
+  public static DirectRowSet fromSchema(BufferAllocator allocator, BatchSchema schema) {
+    return fromSchema(allocator, TupleSchema.fromFields(schema));
   }
 
-  public DirectRowSet(BufferAllocator allocator, BatchSchema schema) {
-    super(allocator, schema);
+  public static DirectRowSet fromSchema(BufferAllocator allocator, TupleMetadata schema) {
+    BuildVectorsFromMetadata builder = new BuildVectorsFromMetadata(allocator);
+    return new DirectRowSet(builder.build(schema), schema);
   }
 
-  public DirectRowSet(BufferAllocator allocator, VectorContainer container) {
-    super(allocator, container);
+  public static DirectRowSet fromContainer(VectorContainer container) {
+    return new DirectRowSet(container, new SchemaInference().infer(container));
   }
 
-  public DirectRowSet(BufferAllocator allocator, VectorAccessible va) {
-    super(allocator, toContainer(va, allocator));
+  public static DirectRowSet fromVectorAccessible(BufferAllocator allocator, VectorAccessible va) {
+    return fromContainer(toContainer(va, allocator));
   }
 
   private static VectorContainer toContainer(VectorAccessible va, BufferAllocator allocator) {
@@ -168,16 +106,8 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
   }
 
   @Override
-  public void allocate(int recordCount) {
-    for (final ValueVector v : valueVectors) {
-      AllocationHelper.allocate(v, recordCount, 50, 10);
-    }
-  }
-
-  @Override
-  public void setRowCount(int rowCount) {
-    container.setRecordCount(rowCount);
-    VectorAccessibleUtilities.setValueCount(container, rowCount);
+  public void allocate(int rowCount) {
+    new VectorAllocator(container()).allocate(rowCount, schema());
   }
 
   @Override
@@ -187,29 +117,11 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
 
   @Override
   public RowSetWriter writer(int initialRowCount) {
-    if (container.hasRecordCount()) {
+    if (container().hasRecordCount()) {
       throw new IllegalStateException("Row set already contains data");
     }
     allocate(initialRowCount);
-    return buildWriter(new ExtendableRowIndex(Character.MAX_VALUE));
-  }
-
-  /**
-   * Build writer objects for each column based on the column type.
-   *
-   * @param rowIndex the index which points to each row
-   * @return an array of writers
-   */
-
-  protected RowSetWriter buildWriter(ExtendableRowIndex rowIndex) {
-    ValueVector[] valueVectors = vectors();
-    AbstractColumnWriter[] writers = new AbstractColumnWriter[valueVectors.length];
-    for (int i = 0; i < writers.length; i++) {
-      writers[i] = ColumnAccessorFactory.newWriter(valueVectors[i].getField().getType());
-      writers[i].bind(rowIndex, valueVectors[i]);
-    }
-    TupleSchema accessSchema = schema().hierarchicalAccess();
-    return new RowSetWriterImpl(this, accessSchema, rowIndex, writers);
+    return new RowSetWriterBuilder().buildWriter(this);
   }
 
   @Override
@@ -233,9 +145,4 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
 
   @Override
   public SelectionVector2 getSv2() { return null; }
-
-  @Override
-  public RowSet merge(RowSet other) {
-    return new DirectRowSet(allocator, container().merge(other.container()));
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
index afc2e6e..8a3db9f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
@@ -17,27 +17,14 @@
  */
 package org.apache.drill.test.rowSet;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval;
+import org.apache.drill.exec.physical.rowSet.model.SchemaInference;
+import org.apache.drill.exec.physical.rowSet.model.hyper.BaseReaderBuilder;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.HyperVectorWrapper;
-import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.AccessorUtilities;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader.VectorAccessor;
-import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
-import org.apache.drill.exec.vector.complex.AbstractMapVector;
 import org.apache.drill.test.rowSet.RowSet.HyperRowSet;
-import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
-import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn;
-import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
 
 /**
  * Implements a row set wrapper around a collection of "hyper vectors."
@@ -52,176 +39,14 @@ import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
 
 public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
 
-  /**
-   * Read-only row index into the hyper row set with batch and index
-   * values mapping via an SV4.
-   */
-
-  public static class HyperRowIndex extends BoundedRowIndex {
-
-    private final SelectionVector4 sv4;
-
-    public HyperRowIndex(SelectionVector4 sv4) {
-      super(sv4.getCount());
-      this.sv4 = sv4;
-    }
-
-    @Override
-    public int index() {
-      return AccessorUtilities.sv4Index(sv4.get(rowIndex));
-    }
-
-    @Override
-    public int batch( ) {
-      return AccessorUtilities.sv4Batch(sv4.get(rowIndex));
-    }
-  }
-
-  /**
-   * Vector accessor used by the column accessors to obtain the vector for
-   * each column value. That is, position 0 might be batch 4, index 3,
-   * while position 1 might be batch 1, index 7, and so on.
-   */
-
-  public static class HyperVectorAccessor implements VectorAccessor {
+  public static class RowSetReaderBuilder extends BaseReaderBuilder {
 
-    private final HyperRowIndex rowIndex;
-    private final ValueVector[] vectors;
-
-    public HyperVectorAccessor(HyperVectorWrapper<ValueVector> hvw, HyperRowIndex rowIndex) {
-      this.rowIndex = rowIndex;
-      vectors = hvw.getValueVectors();
-    }
-
-    @Override
-    public ValueVector vector() {
-      return vectors[rowIndex.batch()];
-    }
-  }
-
-  /**
-   * Build a hyper row set by restructuring a hyper vector bundle into a uniform
-   * shape. Consider this schema: <pre><code>
-   * { a: 10, b: { c: 20, d: { e: 30 } } }</code></pre>
-   * <p>
-   * The hyper container, with two batches, has this structure:
-   * <table border="1">
-   * <tr><th>Batch</th><th>a</th><th>b</th></tr>
-   * <tr><td>0</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th>
-   * <tr><td>1</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th>
-   * </table>
-   * <p>
-   * The above table shows that top-level scalar vectors (such as the Int Vector for column
-   * a) appear "end-to-end" as a hyper-vector. Maps also appear end-to-end. But, the
-   * contents of the map (column c) do not appear end-to-end. Instead, they appear as
-   * contents in the map vector. To get to c, one indexes into the map vector, steps inside
-   * the map to find c and indexes to the right row.
-   * <p>
-   * Similarly, the maps for d do not appear end-to-end, one must step to the right batch
-   * in b, then step to d.
-   * <p>
-   * Finally, to get to e, one must step
-   * into the hyper vector for b, then steps to the proper batch, steps to d, step to e
-   * and finally step to the row within e. This is a very complex, costly indexing scheme
-   * that differs depending on map nesting depth.
-   * <p>
-   * To simplify access, this class restructures the maps to flatten the scalar vectors
-   * into end-to-end hyper vectors. For example, for the above:
-   * <p>
-   * <table border="1">
-   * <tr><th>Batch</th><th>a</th><th>c</th><th>d</th></tr>
-   * <tr><td>0</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th>
-   * <tr><td>1</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th>
-   * </table>
-   *
-   * The maps are still available as hyper vectors, but separated into map fields.
-   * (Scalar access no longer needs to access the maps.) The result is a uniform
-   * addressing scheme for both top-level and nested vectors.
-   */
-
-  public static class HyperVectorBuilder {
-
-    protected final HyperVectorWrapper<?> valueVectors[];
-    protected final HyperVectorWrapper<AbstractMapVector> mapVectors[];
-    private final List<ValueVector> nestedScalars[];
-    private int vectorIndex;
-    private int mapIndex;
-    private final PhysicalSchema physicalSchema;
-
-    @SuppressWarnings("unchecked")
-    public HyperVectorBuilder(RowSetSchema schema) {
-      physicalSchema = schema.physical();
-      FlattenedSchema flatSchema = schema.flatAccess();
-      valueVectors = new HyperVectorWrapper<?>[schema.hierarchicalAccess().count()];
-      if (flatSchema.mapCount() == 0) {
-        mapVectors = null;
-        nestedScalars = null;
-      } else {
-        mapVectors = (HyperVectorWrapper<AbstractMapVector>[])
-            new HyperVectorWrapper<?>[flatSchema.mapCount()];
-        nestedScalars = new ArrayList[flatSchema.count()];
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    public HyperVectorWrapper<ValueVector>[] mapContainer(VectorContainer container) {
-      int i = 0;
-      for (VectorWrapper<?> w : container) {
-        HyperVectorWrapper<?> hvw = (HyperVectorWrapper<?>) w;
-        if (w.getField().getType().getMinorType() == MinorType.MAP) {
-          HyperVectorWrapper<AbstractMapVector> mw = (HyperVectorWrapper<AbstractMapVector>) hvw;
-          mapVectors[mapIndex++] = mw;
-          buildHyperMap(physicalSchema.column(i).mapSchema(), mw);
-        } else {
-          valueVectors[vectorIndex++] = hvw;
-        }
-        i++;
-      }
-      if (nestedScalars != null) {
-        buildNestedHyperVectors();
-      }
-      return (HyperVectorWrapper<ValueVector>[]) valueVectors;
-    }
-
-    private void buildHyperMap(PhysicalSchema mapSchema, HyperVectorWrapper<AbstractMapVector> mapWrapper) {
-      createHyperVectors(mapSchema);
-      for (AbstractMapVector mapVector : mapWrapper.getValueVectors()) {
-        buildMap(mapSchema, mapVector);
-      }
-    }
-
-    private void buildMap(PhysicalSchema mapSchema, AbstractMapVector mapVector) {
-      for (ValueVector v : mapVector) {
-        LogicalColumn col = mapSchema.column(v.getField().getName());
-        if (col.isMap()) {
-          buildMap(col.mapSchema, (AbstractMapVector) v);
-        } else {
-          nestedScalars[col.accessIndex()].add(v);
-        }
-      }
-    }
-
-    private void createHyperVectors(PhysicalSchema mapSchema) {
-      for (int i = 0; i < mapSchema.count(); i++) {
-        LogicalColumn col = mapSchema.column(i);
-        if (col.isMap()) {
-          createHyperVectors(col.mapSchema);
-        } else {
-          nestedScalars[col.accessIndex()] = new ArrayList<ValueVector>();
-        }
-      }
-    }
-
-    private void buildNestedHyperVectors() {
-      for (int i = 0;  i < nestedScalars.length; i++) {
-        if (nestedScalars[i] == null) {
-          continue;
-        }
-        ValueVector vectors[] = new ValueVector[nestedScalars[i].size()];
-        nestedScalars[i].toArray(vectors);
-        assert valueVectors[i] == null;
-        valueVectors[i] = new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors, false);
-      }
+    public RowSetReader buildReader(HyperRowSet rowSet, SelectionVector4 sv4) {
+      TupleMetadata schema = rowSet.schema();
+      HyperRowIndex rowIndex = new HyperRowIndex(sv4);
+      return new RowSetReaderImpl(schema, rowIndex,
+          buildContainerChildren(rowSet.container(),
+              new MetadataRetrieval(schema)));
     }
   }
 
@@ -231,18 +56,9 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
 
   private final SelectionVector4 sv4;
 
-  /**
-   * Collection of hyper vectors in flattened order: a left-to-right,
-   * depth first ordering of vectors in maps. Order here corresponds to
-   * the order used for column indexes in the row set reader.
-   */
-
-  private final HyperVectorWrapper<ValueVector> hvw[];
-
-  public HyperRowSetImpl(BufferAllocator allocator, VectorContainer container, SelectionVector4 sv4) {
-    super(allocator, container.getSchema(), container);
+  public HyperRowSetImpl(VectorContainer container, SelectionVector4 sv4) {
+    super(container, new SchemaInference().infer(container));
     this.sv4 = sv4;
-    hvw = new HyperVectorBuilder(schema).mapContainer(container);
   }
 
   @Override
@@ -252,33 +68,8 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
   public boolean isWritable() { return false; }
 
   @Override
-  public RowSetWriter writer() {
-    throw new UnsupportedOperationException("Cannot write to a hyper vector");
-  }
-
-  @Override
   public RowSetReader reader() {
-    return buildReader(new HyperRowIndex(sv4));
-  }
-
-  /**
-   * Internal method to build the set of column readers needed for
-   * this row set. Used when building a row set reader.
-   * @param rowIndex object that points to the current row
-   * @return an array of column readers: in the same order as the
-   * (non-map) vectors.
-   */
-
-  protected RowSetReader buildReader(HyperRowIndex rowIndex) {
-    FlattenedSchema accessSchema = schema().flatAccess();
-    AbstractColumnReader readers[] = new AbstractColumnReader[accessSchema.count()];
-    for (int i = 0; i < readers.length; i++) {
-      MaterializedField field = accessSchema.column(i);
-      readers[i] = ColumnAccessorFactory.newReader(field.getType());
-      HyperVectorWrapper<ValueVector> hvw = getHyperVector(i);
-      readers[i].bind(rowIndex, field, new HyperVectorAccessor(hvw, rowIndex));
-    }
-    return new RowSetReaderImpl(accessSchema, rowIndex, readers);
+    return new RowSetReaderBuilder().buildReader(this, sv4);
   }
 
   @Override
@@ -288,13 +79,5 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
   public SelectionVector4 getSv4() { return sv4; }
 
   @Override
-  public HyperVectorWrapper<ValueVector> getHyperVector(int i) { return hvw[i]; }
-
-  @Override
   public int rowCount() { return sv4.getCount(); }
-
-  @Override
-  public RowSet merge(RowSet other) {
-    return new HyperRowSetImpl(allocator, container().merge(other.container()), sv4);
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index 1914705..e729bba 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -20,6 +20,8 @@ package org.apache.drill.test.rowSet;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
+import org.apache.drill.exec.physical.rowSet.model.SchemaInference;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -33,14 +35,14 @@ public class IndirectRowSet extends AbstractSingleRowSet {
 
   /**
    * Reader index that points to each row indirectly through the
-   * selection vector. The {@link #index()} method points to the
+   * selection vector. The {@link #vectorIndex()} method points to the
    * actual data row, while the {@link #position()} method gives
    * the position relative to the indirection vector. That is,
    * the position increases monotonically, but the index jumps
    * around as specified by the indirection vector.
    */
 
-  private static class IndirectRowIndex extends BoundedRowIndex {
+  private static class IndirectRowIndex extends ReaderIndex {
 
     private final SelectionVector2 sv2;
 
@@ -50,21 +52,25 @@ public class IndirectRowSet extends AbstractSingleRowSet {
     }
 
     @Override
-    public int index() { return sv2.getIndex(rowIndex); }
+    public int vectorIndex() { return sv2.getIndex(rowIndex); }
 
     @Override
-    public int batch() { return 0; }
+    public int batchIndex() { return 0; }
   }
 
   private final SelectionVector2 sv2;
 
-  public IndirectRowSet(BufferAllocator allocator, VectorContainer container) {
-    this(allocator, container, makeSv2(allocator, container));
+  private IndirectRowSet(VectorContainer container, SelectionVector2 sv2) {
+    super(container, new SchemaInference().infer(container));
+    this.sv2 = sv2;
   }
 
-  public IndirectRowSet(BufferAllocator allocator, VectorContainer container, SelectionVector2 sv2) {
-    super(allocator, container);
-    this.sv2 = sv2;
+  public static IndirectRowSet fromContainer(VectorContainer container) {
+    return new IndirectRowSet(container, makeSv2(container.getAllocator(), container));
+  }
+
+  public static IndirectRowSet fromSv2(VectorContainer container, SelectionVector2 sv2) {
+    return new IndirectRowSet(container, sv2);
   }
 
   private static SelectionVector2 makeSv2(BufferAllocator allocator, VectorContainer container) {
@@ -83,7 +89,7 @@ public class IndirectRowSet extends AbstractSingleRowSet {
 
   public IndirectRowSet(DirectRowSet directRowSet) {
     super(directRowSet);
-    sv2 = makeSv2(allocator, container);
+    sv2 = makeSv2(allocator(), container());
   }
 
   @Override
@@ -96,11 +102,6 @@ public class IndirectRowSet extends AbstractSingleRowSet {
   }
 
   @Override
-  public RowSetWriter writer() {
-    throw new UnsupportedOperationException("Cannot write to an existing row set");
-  }
-
-  @Override
   public RowSetReader reader() {
     return buildReader(new IndirectRowIndex(getSv2()));
   }
@@ -119,12 +120,7 @@ public class IndirectRowSet extends AbstractSingleRowSet {
 
   @Override
   public long size() {
-    RecordBatchSizer sizer = new RecordBatchSizer(container, sv2);
+    RecordBatchSizer sizer = new RecordBatchSizer(container(), sv2);
     return sizer.actualSize();
   }
-
-  @Override
-  public RowSet merge(RowSet other) {
-    return new IndirectRowSet(allocator, container().merge(other.container()), sv2);
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
index 474508c..f2435de 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
@@ -20,25 +20,24 @@ package org.apache.drill.test.rowSet;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.TupleMetadata;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.ColumnReader;
-import org.apache.drill.exec.vector.accessor.ColumnWriter;
-import org.apache.drill.exec.vector.accessor.TupleReader;
-import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.ScalarReader;
+import org.apache.parquet.column.ColumnWriter;
 
 /**
  * A row set is a collection of rows stored as value vectors. Elsewhere in
  * Drill we call this a "record batch", but that term has been overloaded to
- * mean the runtime implementation of an operator...
+ * mean the runtime implementation of an operator.
  * <p>
  * A row set encapsulates a set of vectors and provides access to Drill's
  * various "views" of vectors: {@link VectorContainer},
- * {@link VectorAccessible}, etc.
+ * {@link VectorAccessible}, etc. The row set wraps a {#link TupleModel}
+ * which holds the vectors and column metadata. This form is optimized
+ * for easy use in testing; use other implementations for production code.
  * <p>
  * A row set is defined by a {@link RowSetSchema}. For testing purposes, a row
  * set has a fixed schema; we don't allow changing the set of vectors
@@ -52,7 +51,7 @@ import org.apache.drill.exec.vector.accessor.TupleWriter;
  * Drill provides a large number of vector (data) types. Each requires a
  * type-specific way to set data. The row set writer uses a {@link ColumnWriter}
  * to set each value in a way unique to the specific data type. Similarly, the
- * row set reader provides a {@link ColumnReader} interface. In both cases,
+ * row set reader provides a {@link ScalarReader} interface. In both cases,
  * columns can be accessed by index number (as defined in the schema) or
  * by name.
  * <p>
@@ -78,54 +77,6 @@ import org.apache.drill.exec.vector.accessor.TupleWriter;
 
 public interface RowSet {
 
-  /**
-   * Interface for writing values to a row set. Only available
-   * for newly-created, single, direct row sets. Eventually, if
-   * we want to allow updating a row set, we have to create a
-   * new row set with the updated columns, then merge the new
-   * and old row sets to create a new immutable row set.
-   */
-  interface RowSetWriter extends TupleWriter {
-    void setRow(Object...values);
-    boolean valid();
-    int index();
-    void save();
-    void done();
-  }
-
-  /**
-   * Reader for all types of row sets.
-   */
-  interface RowSetReader extends TupleReader {
-
-    /**
-     * Total number of rows in the row set.
-     * @return total number of rows
-     */
-    int size();
-
-    boolean next();
-    int index();
-    void set(int index);
-
-    /**
-     * Batch index: 0 for a single batch, batch for the current
-     * row is a hyper-batch.
-     * @return index of the batch for the current row
-     */
-    int batchIndex();
-
-    /**
-     * The index of the underlying row which may be indexed by an
-     * Sv2 or Sv4.
-     *
-     * @return
-     */
-
-    int rowIndex();
-    boolean valid();
-  }
-
   boolean isExtendable();
 
   boolean isWritable();
@@ -136,13 +87,11 @@ public interface RowSet {
 
   int rowCount();
 
-  RowSetWriter writer();
-
   RowSetReader reader();
 
   void clear();
 
-  RowSetSchema schema();
+  TupleMetadata schema();
 
   BufferAllocator allocator();
 
@@ -157,17 +106,16 @@ public interface RowSet {
    *
    * @return memory size in bytes
    */
-  long size();
 
-  RowSet merge(RowSet other);
+  long size();
 
   BatchSchema batchSchema();
 
   /**
    * Row set that manages a single batch of rows.
    */
-  interface SingleRowSet extends RowSet {
-    ValueVector[] vectors();
+
+  public interface SingleRowSet extends RowSet {
     SingleRowSet toIndirect();
     SelectionVector2 getSv2();
   }
@@ -177,9 +125,10 @@ public interface RowSet {
    * Once writing is complete, the row set becomes an
    * immutable direct row set.
    */
+
   interface ExtendableRowSet extends SingleRowSet {
     void allocate(int recordCount);
-    void setRowCount(int rowCount);
+    RowSetWriter writer();
     RowSetWriter writer(int initialRowCount);
   }
 
@@ -187,8 +136,8 @@ public interface RowSet {
    * Row set comprised of multiple single row sets, along with
    * an indirection vector (SV4).
    */
+
   interface HyperRowSet extends RowSet {
     SelectionVector4 getSv4();
-    HyperVectorWrapper<ValueVector> getHyperVector(int i);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
index 6f9a8d9..7b1554c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
@@ -19,7 +19,10 @@ package org.apache.drill.test.rowSet;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 
 /**
@@ -40,14 +43,20 @@ public final class RowSetBuilder {
   private boolean withSv2;
 
   public RowSetBuilder(BufferAllocator allocator, BatchSchema schema) {
+    this(allocator, TupleSchema.fromFields(schema), 10);
+  }
+
+  public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema) {
     this(allocator, schema, 10);
   }
 
-  public RowSetBuilder(BufferAllocator allocator, BatchSchema schema, int capacity) {
-    rowSet = new DirectRowSet(allocator, schema);
+  public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema, int capacity) {
+    rowSet = DirectRowSet.fromSchema(allocator, schema);
     writer = rowSet.writer(capacity);
   }
 
+  public TupleWriter writer() { return writer; }
+
   /**
    * Add a new row using column values passed as variable-length arguments. Expects
    * map values to be flattened. a schema of (a:int, b:map(c:varchar)) would be>
@@ -56,17 +65,18 @@ public final class RowSetBuilder {
    * <tt>add(10, new int[] {100, 200});</tt><br>
    * @param values column values in column index order
    * @return this builder
-   * @see {@link #addSingleCol(Object)} to create a row of a single column when
-   * the value to <tt>add()</tt> is ambiguous
+   * @throws IllegalStateException if the batch, or any vector in the batch,
+   * becomes full. This method is designed to be used in tests where we will
+   * seldom create a full vector of data.
    */
 
-  public RowSetBuilder add(Object...values) {
+  public RowSetBuilder addRow(Object...values) {
     writer.setRow(values);
     return this;
   }
 
   /**
-   * The {@link #add(Object...)} method uses Java variable-length arguments to
+   * The {@link #addRow(Object...)} method uses Java variable-length arguments to
    * pass a row of values. But, when the row consists of a single array, Java
    * gets confused: is that an array for variable-arguments or is it the value
    * of the first argument? This method clearly states that the single value
@@ -93,7 +103,7 @@ public final class RowSetBuilder {
    */
 
   public RowSetBuilder addSingleCol(Object value) {
-    return add(new Object[] { value });
+    return addRow(new Object[] { value });
   }
 
   /**
@@ -110,10 +120,10 @@ public final class RowSetBuilder {
   }
 
   public SingleRowSet build() {
-    writer.done();
+    SingleRowSet result = writer.done();
     if (withSv2) {
-      return rowSet.toIndirect();
+      return result.toIndirect();
     }
-    return rowSet;
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
index 6e72923..1cae64f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
@@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.exec.vector.accessor.ArrayReader;
-import org.apache.drill.exec.vector.accessor.ColumnReader;
-import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+import org.apache.drill.exec.vector.accessor.ObjectReader;
+import org.apache.drill.exec.vector.accessor.ScalarElementReader;
+import org.apache.drill.exec.vector.accessor.ScalarReader;
+import org.apache.drill.exec.vector.accessor.TupleReader;
 import org.bouncycastle.util.Arrays;
 
 import java.util.Comparator;
@@ -31,19 +33,48 @@ import java.util.Comparator;
  * For testing, compare the contents of two row sets (record batches)
  * to verify that they are identical. Supports masks to exclude certain
  * columns from comparison.
+ * <p>
+ * Drill rows are analogous to JSON documents: they can have scalars,
+ * arrays and maps, with maps and lists holding maps, arrays and scalars.
+ * This class walks the row structure tree to compare each structure
+ * of two row sets checking counts, types and values to ensure that the
+ * "actual" result set (result of a test) matches the "expected" result
+ * set.
+ * <p>
+ * This class acts as an example of how to use the suite of reader
+ * abstractions.
  */
 
 public class RowSetComparison {
 
+  /**
+   * Row set with the expected outcome of a test. This is the "golden"
+   * copy defined in the test itself.
+   */
   private RowSet expected;
+  /**
+   * Some tests wish to ignore certain (top-level) columns. If a
+   * mask is provided, then only those columns with a <tt>true</tt>
+   * will be verified.
+   */
   private boolean mask[];
+  /**
+   * Floats and doubles do not compare exactly. This delta is used
+   * by JUnit for such comparisons.
+   */
   private double delta = 0.001;
+  /**
+   * Tests can skip the first n rows.
+   */
   private int offset;
   private int span = -1;
 
   public RowSetComparison(RowSet expected) {
     this.expected = expected;
-    mask = new boolean[expected.schema().hierarchicalAccess().count()];
+
+    // TODO: The mask only works at the top level presently
+
+    mask = new boolean[expected.schema().size()];
     for (int i = 0; i < mask.length; i++) {
       mask[i] = true;
     }
@@ -134,7 +165,8 @@ public class RowSetComparison {
     for (int i = 0; i < testLength; i++) {
       er.next();
       ar.next();
-      verifyRow(er, ar);
+      String label = Integer.toString(er.index() + 1);
+      verifyRow(label, er, ar);
     }
   }
 
@@ -167,22 +199,50 @@ public class RowSetComparison {
     }
   }
 
-  private void verifyRow(RowSetReader er, RowSetReader ar) {
+  private void verifyRow(String label, TupleReader er, TupleReader ar) {
+    String prefix = label + ":";
     for (int i = 0; i < mask.length; i++) {
       if (! mask[i]) {
         continue;
       }
-      ColumnReader ec = er.column(i);
-      ColumnReader ac = ar.column(i);
-      String label = (er.index() + 1) + ":" + i;
-      assertEquals(label, ec.valueType(), ac.valueType());
-      if (ec.isNull()) {
-        assertTrue(label + " - column not null", ac.isNull());
-        continue;
-      }
-      if (! ec.isNull()) {
-        assertTrue(label + " - column is null", ! ac.isNull());
-      }
+      verifyColumn(prefix + i, er.column(i), ar.column(i));
+    }
+  }
+
+  private void verifyColumn(String label, ObjectReader ec, ObjectReader ac) {
+    assertEquals(label, ec.type(), ac.type());
+    switch (ec.type()) {
+    case ARRAY:
+      verifyArray(label, ec.array(), ac.array());
+      break;
+    case SCALAR:
+      verifyScalar(label, ec.scalar(), ac.scalar());
+      break;
+    case TUPLE:
+      verifyTuple(label, ec.tuple(), ac.tuple());
+      break;
+    default:
+      throw new IllegalStateException( "Unexpected type: " + ec.type());
+    }
+  }
+
+  private void verifyTuple(String label, TupleReader er, TupleReader ar) {
+    assertEquals(label + " - tuple count", er.columnCount(), ar.columnCount());
+    String prefix = label + ":";
+    for (int i = 0; i < er.columnCount(); i++) {
+      verifyColumn(prefix + i, er.column(i), ar.column(i));
+    }
+  }
+
+  private void verifyScalar(String label, ScalarReader ec, ScalarReader ac) {
+    assertEquals(label + " - value type", ec.valueType(), ac.valueType());
+    if (ec.isNull()) {
+      assertTrue(label + " - column not null", ac.isNull());
+      return;
+    }
+    if (! ec.isNull()) {
+      assertTrue(label + " - column is null", ! ac.isNull());
+    }
     switch (ec.valueType()) {
     case BYTES: {
         byte expected[] = ac.getBytes();
@@ -209,24 +269,42 @@ public class RowSetComparison {
      case PERIOD:
        assertEquals(label, ec.getPeriod(), ac.getPeriod());
        break;
-     case ARRAY:
-       verifyArray(label, ec.array(), ac.array());
-       break;
      default:
         throw new IllegalStateException( "Unexpected type: " + ec.valueType());
-      }
     }
   }
 
-  private void verifyArray(String colLabel, ArrayReader ea,
+  private void verifyArray(String label, ArrayReader ea,
       ArrayReader aa) {
+    assertEquals(label, ea.entryType(), aa.entryType());
+    assertEquals(label, ea.size(), aa.size());
+    switch (ea.entryType()) {
+    case ARRAY:
+      throw new UnsupportedOperationException();
+    case SCALAR:
+      verifyScalarArray(label, ea.elements(), aa.elements());
+      break;
+    case TUPLE:
+      verifyTupleArray(label, ea, aa);
+      break;
+    default:
+      throw new IllegalStateException( "Unexpected type: " + ea.entryType());
+    }
+  }
+
+  private void verifyTupleArray(String label, ArrayReader ea, ArrayReader aa) {
+    for (int i = 0; i < ea.size(); i++) {
+      verifyTuple(label + "[" + i + "]", ea.tuple(i), aa.tuple(i));
+    }
+  }
+
+  private void verifyScalarArray(String colLabel, ScalarElementReader ea,
+      ScalarElementReader aa) {
     assertEquals(colLabel, ea.valueType(), aa.valueType());
     assertEquals(colLabel, ea.size(), aa.size());
     for (int i = 0; i < ea.size(); i++) {
       String label = colLabel + "[" + i + "]";
       switch (ea.valueType()) {
-      case ARRAY:
-        throw new IllegalStateException("Arrays of arrays not supported yet");
       case BYTES: {
         byte expected[] = ea.getBytes(i);
         byte actual[] = aa.getBytes(i);

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
index 42a7e63..e730987 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
@@ -20,8 +20,8 @@ package org.apache.drill.test.rowSet;
 import java.io.PrintStream;
 
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
-import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.TupleMetadata;
 
 /**
  * Print a row set in CSV-like format. Primarily for debugging.
@@ -41,21 +41,21 @@ public class RowSetPrinter {
   public void print(PrintStream out) {
     SelectionVectorMode selectionMode = rowSet.indirectionType();
     RowSetReader reader = rowSet.reader();
-    int colCount = reader.schema().count();
-    printSchema(out, selectionMode);
+    int colCount = reader.schema().size();
+    printSchema(out, selectionMode, reader);
     while (reader.next()) {
       printHeader(out, reader, selectionMode);
       for (int i = 0; i < colCount; i++) {
         if (i > 0) {
           out.print(", ");
         }
-        out.print(reader.getAsString(i));
+        out.print(reader.column(i).getAsString());
       }
       out.println();
     }
   }
 
-  private void printSchema(PrintStream out, SelectionVectorMode selectionMode) {
+  private void printSchema(PrintStream out, SelectionVectorMode selectionMode, RowSetReader reader) {
     out.print("#");
     switch (selectionMode) {
     case FOUR_BYTE:
@@ -68,14 +68,24 @@ public class RowSetPrinter {
       break;
     }
     out.print(": ");
-    TupleSchema schema = rowSet.schema().hierarchicalAccess();
-    for (int i = 0; i < schema.count(); i++) {
+    TupleMetadata schema = reader.schema();
+    printTupleSchema(out, schema);
+    out.println();
+  }
+
+  private void printTupleSchema(PrintStream out, TupleMetadata schema) {
+    for (int i = 0; i < schema.size(); i++) {
       if (i > 0) {
         out.print(", ");
       }
-      out.print(schema.column(i).getName());
+      ColumnMetadata colSchema = schema.metadata(i);
+      out.print(colSchema.name());
+      if (colSchema.isMap()) {
+        out.print("(");
+        printTupleSchema(out, colSchema.mapSchema());
+        out.print(")");
+      }
     }
-    out.println();
   }
 
   private void printHeader(PrintStream out, RowSetReader reader, SelectionVectorMode selectionMode) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java
new file mode 100644
index 0000000..3e27529
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.vector.accessor.TupleReader;
+
+/**
+ * Reader for all types of row sets.
+ */
+
+public interface RowSetReader extends TupleReader {
+
+  /**
+   * Total number of rows in the row set.
+   * @return total number of rows
+   */
+  int rowCount();
+
+  boolean next();
+  int index();
+  void set(int index);
+
+  /**
+   * Batch index: 0 for a single batch, batch for the current
+   * row is a hyper-batch.
+   * @return index of the batch for the current row
+   */
+  int batchIndex();
+
+  /**
+   * The index of the underlying row which may be indexed by an
+   * Sv2 or Sv4.
+   *
+   * @return
+   */
+
+  int rowIndex();
+  boolean valid();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java
new file mode 100644
index 0000000..2bae085
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
+import org.apache.drill.exec.vector.accessor.reader.AbstractTupleReader;
+
+/**
+ * Reader implementation for a row set.
+ */
+
+public class RowSetReaderImpl extends AbstractTupleReader implements RowSetReader {
+
+  protected final ReaderIndex readerIndex;
+
+  public RowSetReaderImpl(TupleMetadata schema, ReaderIndex index, AbstractObjectReader[] readers) {
+    super(schema, readers);
+    this.readerIndex = index;
+    bindIndex(index);
+  }
+
+  public RowSetReaderImpl(TupleMetadata schema, ReaderIndex index,
+      List<AbstractObjectReader> readers) {
+    this(schema, index,
+        readers.toArray(new AbstractObjectReader[readers.size()]));
+  }
+
+  @Override
+  public boolean next() {
+    if (! readerIndex.next()) {
+      return false;
+    }
+    reposition();
+    return true;
+  }
+
+  @Override
+  public boolean valid() { return readerIndex.valid(); }
+
+  @Override
+  public int index() { return readerIndex.position(); }
+
+  @Override
+  public int rowCount() { return readerIndex.size(); }
+
+  @Override
+  public int rowIndex() { return readerIndex.vectorIndex(); }
+
+  @Override
+  public int batchIndex() { return readerIndex.batchIndex(); }
+
+  @Override
+  public void set(int index) {
+    this.readerIndex.set(index);
+    reposition();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
deleted file mode 100644
index 55b5f12..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.test.rowSet;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
-import org.apache.drill.exec.record.MaterializedField;
-
-/**
- * Row set schema presented as a number of distinct "views" for various
- * purposes:
- * <ul>
- * <li>Batch schema: the schema used by a VectorContainer.</li>
- * <li>Physical schema: the schema expressed as a hierarchy of
- * tuples with the top tuple representing the row, nested tuples
- * representing maps.</li>
- * <li>Access schema: a flattened schema with all scalar columns
- * at the top level, and with map columns pulled out into a separate
- * collection. The flattened-scalar view is the one used to write to,
- * and read from, the row set.</li>
- * </ul>
- * Allows easy creation of multiple row sets from the same schema.
- * Each schema is immutable, which is fine for tests in which we
- * want known inputs and outputs.
- */
-
-public class RowSetSchema {
-
-  /**
-   * Logical description of a column. A logical column is a
-   * materialized field. For maps, also includes a logical schema
-   * of the map.
-   */
-
-  public static class LogicalColumn {
-    protected final String fullName;
-    protected final int accessIndex;
-    protected int flatIndex;
-    protected final MaterializedField field;
-
-    /**
-     * Schema of the map. Includes only those fields directly within
-     * the map; does not include fields from nested tuples.
-     */
-
-    protected PhysicalSchema mapSchema;
-
-    public LogicalColumn(String fullName, int accessIndex, MaterializedField field) {
-      this.fullName = fullName;
-      this.accessIndex = accessIndex;
-      this.field = field;
-    }
-
-    private void updateStructure(int index, PhysicalSchema children) {
-      flatIndex = index;
-      mapSchema = children;
-    }
-
-    public int accessIndex() { return accessIndex; }
-    public int flatIndex() { return flatIndex; }
-    public boolean isMap() { return mapSchema != null; }
-    public PhysicalSchema mapSchema() { return mapSchema; }
-    public MaterializedField field() { return field; }
-    public String fullName() { return fullName; }
-  }
-
-  /**
-   * Implementation of a tuple name space. Tuples allow both indexed and
-   * named access to their members.
-   *
-   * @param <T> the type of object representing each column
-   */
-
-  public static class NameSpace<T> {
-    private final Map<String,Integer> nameSpace = new HashMap<>();
-    private final List<T> columns = new ArrayList<>();
-
-    public int add(String key, T value) {
-      int index = columns.size();
-      nameSpace.put(key, index);
-      columns.add(value);
-      return index;
-    }
-
-    public T get(int index) {
-      return columns.get(index);
-    }
-
-    public T get(String key) {
-      int index = getIndex(key);
-      if (index == -1) {
-        return null;
-      }
-      return get(index);
-    }
-
-    public int getIndex(String key) {
-      Integer index = nameSpace.get(key);
-      if (index == null) {
-        return -1;
-      }
-      return index;
-    }
-
-    public int count() { return columns.size(); }
-  }
-
-  /**
-   * Provides a non-flattened, physical view of the schema. The top-level
-   * row includes maps, maps expand to a nested tuple schema. This view
-   * corresponds, more-or-less, to the physical storage of vectors in
-   * a vector accessible or vector container.
-   */
-
-  private static class TupleSchemaImpl implements TupleSchema {
-
-    private NameSpace<LogicalColumn> columns;
-
-    public TupleSchemaImpl(NameSpace<LogicalColumn> ns) {
-      this.columns = ns;
-    }
-
-    @Override
-    public MaterializedField column(int index) {
-      return logicalColumn(index).field();
-    }
-
-    public LogicalColumn logicalColumn(int index) { return columns.get(index); }
-
-    @Override
-    public MaterializedField column(String name) {
-      LogicalColumn col = columns.get(name);
-      return col == null ? null : col.field();
-    }
-
-    @Override
-    public int columnIndex(String name) {
-      return columns.getIndex(name);
-    }
-
-    @Override
-    public int count() { return columns.count(); }
-  }
-
-  /**
-   * Represents the flattened view of the schema used to get and set columns.
-   * Represents a left-to-right, depth-first traversal of the row and map
-   * columns. Holds only materialized vectors (non-maps). For completeness,
-   * provides access to maps also via separate methods, but this is generally
-   * of little use.
-   */
-
-  public static class FlattenedSchema extends TupleSchemaImpl {
-    protected final TupleSchemaImpl maps;
-
-    public FlattenedSchema(NameSpace<LogicalColumn> cols, NameSpace<LogicalColumn> maps) {
-      super(cols);
-      this.maps = new TupleSchemaImpl(maps);
-    }
-
-    public LogicalColumn logicalMap(int index) { return maps.logicalColumn(index); }
-    public MaterializedField map(int index) { return maps.column(index); }
-    public MaterializedField map(String name) { return maps.column(name); }
-    public int mapIndex(String name) { return maps.columnIndex(name); }
-    public int mapCount() { return maps.count(); }
-  }
-
-  /**
-   * Physical schema of a row set showing the logical hierarchy of fields
-   * with map fields as first-class fields. Map members appear as children
-   * under the map, much as they appear in the physical value-vector
-   * implementation.
-   */
-
-  public static class PhysicalSchema {
-    protected final NameSpace<LogicalColumn> schema = new NameSpace<>();
-
-    public LogicalColumn column(int index) {
-      return schema.get(index);
-    }
-
-    public LogicalColumn column(String name) {
-      return schema.get(name);
-    }
-
-    public int count() { return schema.count(); }
-
-    public NameSpace<LogicalColumn> nameSpace() { return schema; }
-  }
-
-  private static class SchemaExpander {
-    private final PhysicalSchema physicalSchema;
-    private final NameSpace<LogicalColumn> cols = new NameSpace<>();
-    private final NameSpace<LogicalColumn> maps = new NameSpace<>();
-
-    public SchemaExpander(BatchSchema schema) {
-      physicalSchema = expand("", schema);
-    }
-
-    private PhysicalSchema expand(String prefix, Iterable<MaterializedField> fields) {
-      PhysicalSchema physical = new PhysicalSchema();
-      for (MaterializedField field : fields) {
-        String name = prefix + field.getName();
-        int index;
-        LogicalColumn colSchema = new LogicalColumn(name, physical.count(), field);
-        physical.schema.add(field.getName(), colSchema);
-        PhysicalSchema children = null;
-        if (field.getType().getMinorType() == MinorType.MAP) {
-          index = maps.add(name, colSchema);
-          children = expand(name + ".", field.getChildren());
-        } else {
-          index = cols.add(name, colSchema);
-        }
-        colSchema.updateStructure(index, children);
-      }
-      return physical;
-    }
-  }
-
-  private final BatchSchema batchSchema;
-  private final TupleSchemaImpl accessSchema;
-  private final FlattenedSchema flatSchema;
-  private final PhysicalSchema physicalSchema;
-
-  public RowSetSchema(BatchSchema schema) {
-    batchSchema = schema;
-    SchemaExpander expander = new SchemaExpander(schema);
-    physicalSchema = expander.physicalSchema;
-    accessSchema = new TupleSchemaImpl(physicalSchema.nameSpace());
-    flatSchema = new FlattenedSchema(expander.cols, expander.maps);
-  }
-
-  /**
-   * A hierarchical schema that includes maps, with maps expanding
-   * to a nested tuple schema. Not used at present; this is intended
-   * to be the bases of non-flattened accessors if we find the need.
-   * @return the hierarchical access schema
-   */
-
-  public TupleSchema hierarchicalAccess() { return accessSchema; }
-
-  /**
-   * A flattened (left-to-right, depth-first traversal) of the non-map
-   * columns in the row. Used to define the column indexes in the
-   * get methods for row readers and the set methods for row writers.
-   * @return the flattened access schema
-   */
-
-  public FlattenedSchema flatAccess() { return flatSchema; }
-
-  /**
-   * Internal physical schema in hierarchical order. Mostly used to create
-   * the other schemas, but may be of use in special cases. Has the same
-   * structure as the batch schema, but with additional information.
-   * @return a tree-structured physical schema
-   */
-
-  public PhysicalSchema physical() { return physicalSchema; }
-
-  /**
-   * The batch schema used by the Drill runtime. Represents a tree-structured
-   * list of top-level fields, including maps. Maps contain a nested schema.
-   * @return the batch schema used by the Drill runtime
-   */
-
-  public BatchSchema batch() { return batchSchema; }
-
-  /**
-   * Convert this schema to a new batch schema that includes the specified
-   * selection vector mode.
-   * @param svMode selection vector mode for the new schema
-   * @return the new batch schema
-   */
-
-  public BatchSchema toBatchSchema(SelectionVectorMode svMode) {
-    List<MaterializedField> fields = new ArrayList<>();
-    for (MaterializedField field : batchSchema) {
-      fields.add(field);
-    }
-    return new BatchSchema(svMode, fields);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index 261a9c1..32b61ca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -17,12 +17,18 @@
  */
 package org.apache.drill.test.rowSet;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.accessor.AccessorUtilities;
-import org.apache.drill.exec.vector.accessor.ColumnAccessor.ValueType;
-import org.apache.drill.exec.vector.accessor.ColumnWriter;
-import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.bouncycastle.util.Arrays;
 import org.joda.time.Duration;
 import org.joda.time.Period;
 
@@ -62,11 +68,42 @@ public class RowSetUtilities {
    */
 
   public static void setFromInt(RowSetWriter rowWriter, int index, int value) {
-    ColumnWriter writer = rowWriter.column(index);
-    if (writer.valueType() == ValueType.PERIOD) {
-      setPeriodFromInt(writer, rowWriter.schema().column(index).getType().getMinorType(), value);
-    } else {
-      AccessorUtilities.setFromInt(writer, value);
+    ScalarWriter writer = rowWriter.scalar(index);
+    MaterializedField field = rowWriter.schema().column(index);
+    writer.setObject(testDataFromInt(writer.valueType(), field.getType(), value));
+  }
+
+  public static Object testDataFromInt(ValueType valueType, MajorType dataType, int value) {
+    switch (valueType) {
+    case BYTES:
+      return Integer.toHexString(value).getBytes();
+    case DOUBLE:
+      return (double) value;
+    case INTEGER:
+      switch (dataType.getMinorType()) {
+      case BIT:
+        return value & 0x01;
+      case SMALLINT:
+        return value % 32768;
+      case UINT2:
+        return value & 0xFFFF;
+      case TINYINT:
+        return value % 128;
+      case UINT1:
+        return value & 0xFF;
+      default:
+        return value;
+      }
+    case LONG:
+      return (long) value;
+    case STRING:
+      return Integer.toString(value);
+    case DECIMAL:
+      return BigDecimal.valueOf(value, dataType.getScale());
+    case PERIOD:
+      return periodFromInt(dataType.getMinorType(), value);
+    default:
+      throw new IllegalStateException("Unknown writer type: " + valueType);
     }
   }
 
@@ -81,26 +118,56 @@ public class RowSetUtilities {
    * @param writer column writer for a period column
    * @param minorType the Drill data type
    * @param value the integer value to apply
+   * @throws VectorOverflowException
    */
 
-  public static void setPeriodFromInt(ColumnWriter writer, MinorType minorType,
-      int value) {
+  public static Period periodFromInt(MinorType minorType, int value) {
     switch (minorType) {
     case INTERVAL:
-      writer.setPeriod(Duration.millis(value).toPeriod());
-      break;
+      return Duration.millis(value).toPeriod();
     case INTERVALYEAR:
-      writer.setPeriod(Period.years(value / 12).withMonths(value % 12));
-      break;
+      return Period.years(value / 12).withMonths(value % 12);
     case INTERVALDAY:
       int sec = value % 60;
       value = value / 60;
       int min = value % 60;
       value = value / 60;
-      writer.setPeriod(Period.days(value).withMinutes(min).withSeconds(sec));
-      break;
+      return Period.days(value).withMinutes(min).withSeconds(sec);
     default:
       throw new IllegalArgumentException("Writer is not an interval: " + minorType);
     }
   }
+
+  public static void assertEqualValues(ValueType type, Object expectedObj, Object actualObj) {
+    assertEqualValues(type.toString(), type, expectedObj, actualObj);
+  }
+
+  public static void assertEqualValues(String msg, ValueType type, Object expectedObj, Object actualObj) {
+    switch (type) {
+    case BYTES: {
+        byte expected[] = (byte[]) expectedObj;
+        byte actual[] = (byte[]) actualObj;
+        assertEquals(msg + " - byte lengths differ", expected.length, actual.length);
+        assertTrue(msg, Arrays.areEqual(expected, actual));
+        break;
+     }
+     case DOUBLE:
+       assertEquals(msg, (double) expectedObj, (double) actualObj, 0.0001);
+       break;
+     case INTEGER:
+     case LONG:
+     case STRING:
+     case DECIMAL:
+       assertEquals(msg, expectedObj, actualObj);
+       break;
+     case PERIOD: {
+       Period expected = (Period) expectedObj;
+       Period actual = (Period) actualObj;
+       assertEquals(msg, expected.normalizedStandard(), actual.normalizedStandard());
+       break;
+     }
+     default:
+        throw new IllegalStateException( "Unexpected type: " + type);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java
new file mode 100644
index 0000000..874c0e1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+
+/**
+ * Interface for writing values to a row set. Only available
+ * for newly-created, single, direct row sets. Eventually, if
+ * we want to allow updating a row set, we have to create a
+ * new row set with the updated columns, then merge the new
+ * and old row sets to create a new immutable row set.
+ * <p>
+ * Typical usage:
+ * <pre></code>
+ * void writeABatch() {
+ *   RowSetWriter writer = ...
+ *   while (! writer.isFull()) {
+ *     writer.scalar(0).setInt(10);
+ *     writer.scalar(1).setString("foo");
+ *     ...
+ *     writer.save();
+ *   }
+ * }</code></pre>
+ * The above writes until the batch is full, based on size. If values
+ * are large enough to potentially cause vector overflow, do the
+ * following instead:
+ * <pre></code>
+ * void writeABatch() {
+ *   RowSetWriter writer = ...
+ *   while (! writer.isFull()) {
+ *     writer.column(0).setInt(10);
+ *     try {
+ *        writer.column(1).setString("foo");
+ *     } catch (VectorOverflowException e) { break; }
+ *     ...
+ *     writer.save();
+ *   }
+ *   // Do something with the partially-written last row.
+ * }</code></pre>
+ * <p>
+ * This writer is for testing, so no provision is available to handle a
+ * partial last row. (Elsewhere n Drill there are classes that handle that case.)
+ */
+
+public interface RowSetWriter extends TupleWriter {
+
+  /**
+   * Write a row of values, given by Java objects. Object type must
+   * match expected column type. Stops writing, and returns false,
+   * if any value causes vector overflow. Value format:
+   * <ul>
+   * <li>For scalars, the value as a suitable Java type (int or
+   * Integer, say, for <tt>INTEGER</tt> values.)</li>
+   * <li>For scalar arrays, an array of a suitable Java primitive type
+   * for scalars. For example, <tt>int[]</tt> for an <tt>INTEGER</tt>
+   * column.</li>
+   * <li>For a Map, an <tt>Object<tt> array with values encoded as above.
+   * (In fact, the list here is the same as the map format.</li>
+   * <li>For a list (repeated map, list of list), an <tt>Object</tt>
+   * array with values encoded as above. (So, for a repeated map, an outer
+   * <tt>Object</tt> map encodes the array, an inner one encodes the
+   * map members.</li>
+   * </ul>
+   *
+   * @param values variable-length argument list of column values
+   */
+
+  void setRow(Object...values);
+
+  /**
+   * Indicates if the current row position is valid for
+   * writing. Will be false on the first row, and all subsequent
+   * rows until either the maximum number of rows are written,
+   * or a vector overflows. After that, will return true. The
+   * method returns false as soon as any column writer overflows
+   * even in the middle of a row write. That is, this writer
+   * does not automatically handle overflow rows because that
+   * added complexity is seldom needed for tests.
+   *
+   * @return true if the current row can be written, false
+   * if not
+   */
+
+  boolean isFull();
+  int rowIndex();
+
+  /**
+   * Saves the current row and moves to the next row.
+   * Done automatically if using <tt>setRow()</tt>.
+   */
+
+  void save();
+
+  /**
+   * Finish writing and finalize the row set being
+   * written.
+   * @return the completed, read-only row set without a
+   * selection vector
+   */
+
+  SingleRowSet done();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java
new file mode 100644
index 0000000..074842d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.util.List;
+
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+
+/**
+ * Implementation of a row set writer. Only available for newly-created,
+ * empty, direct, single row sets. Rewriting is not allowed, nor is writing
+ * to a hyper row set.
+ */
+
+public class RowSetWriterImpl extends AbstractTupleWriter implements RowSetWriter {
+
+  /**
+   * Writer index that points to each row in the row set. The index starts at
+   * the 0th row and advances one row on each increment. This allows writers to
+   * start positioned at the first row. Writes happen in the current row.
+   * Calling <tt>next()</tt> advances to the next position, effectively saving
+   * the current row. The most recent row can be abandoned easily simply by not
+   * calling <tt>next()</tt>. This means that the number of completed rows is
+   * the same as the row index.
+   */
+
+  static class WriterIndexImpl implements ColumnWriterIndex {
+
+    public enum State { OK, VECTOR_OVERFLOW, END_OF_BATCH }
+
+    private int rowIndex = 0;
+    private State state = State.OK;
+
+    @Override
+    public final int vectorIndex() { return rowIndex; }
+
+    public final boolean next() {
+      if (++rowIndex < ValueVector.MAX_ROW_COUNT) {
+        return true;
+      }
+      // Should not call next() again once batch is full.
+      assert rowIndex == ValueVector.MAX_ROW_COUNT;
+      rowIndex = ValueVector.MAX_ROW_COUNT;
+      state = state == State.OK ? State.END_OF_BATCH : state;
+      return false;
+    }
+
+    public int size() {
+      // The index always points to the next slot past the
+      // end of valid rows.
+      return rowIndex;
+    }
+
+    public boolean valid() { return state == State.OK; }
+
+    public boolean hasOverflow() { return state == State.VECTOR_OVERFLOW; }
+
+    @Override
+    public final void nextElement() { }
+
+    @Override
+    public void rollover() {
+      throw new UnsupportedOperationException("Rollover not supported in the row set writer.");
+    }
+
+    @Override
+    public int rowStartIndex() { return rowIndex; }
+
+    @Override
+    public ColumnWriterIndex outerIndex() { return null; }
+
+    @Override
+    public String toString() {
+      return new StringBuilder()
+        .append("[")
+        .append(getClass().getSimpleName())
+        .append(" state = ")
+        .append(state)
+        .append(", rowIndex = ")
+        .append(rowIndex)
+        .append("]")
+        .toString();
+    }
+  }
+
+  private final WriterIndexImpl writerIndex;
+  private final ExtendableRowSet rowSet;
+
+  protected RowSetWriterImpl(ExtendableRowSet rowSet, TupleMetadata schema, WriterIndexImpl index, List<AbstractObjectWriter> writers) {
+    super(schema, writers);
+    this.rowSet = rowSet;
+    this.writerIndex = index;
+    bindIndex(index);
+    startWrite();
+    startRow();
+  }
+
+  @Override
+  public void setRow(Object...values) {
+    setObject(values);
+    save();
+  }
+
+  @Override
+  public int rowIndex() { return writerIndex.vectorIndex(); }
+
+  @Override
+  public void save() {
+    endArrayValue();
+    saveRow();
+
+    // For convenience, start a new row after each save.
+    // The last (unused) row is abandoned when the batch is full.
+
+    if (writerIndex.next()) {
+      startRow();
+    }
+  }
+
+  @Override
+  public boolean isFull( ) { return ! writerIndex.valid(); }
+
+  @Override
+  public SingleRowSet done() {
+    endWrite();
+    rowSet.container().setRecordCount(writerIndex.vectorIndex());
+    return rowSet;
+  }
+
+  @Override
+  public int lastWriteIndex() {
+    return writerIndex.vectorIndex();
+  }
+}