You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:39 UTC

[12/15] drill git commit: DRILL-5657: Size-aware vector writer structure

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/record/TupleSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TupleSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TupleSchema.java
new file mode 100644
index 0000000..27a88f0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TupleSchema.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+/**
+ * Defines the schema of a tuple: either the top-level row or a nested
+ * "map" (really structure). A schema is a collection of columns (backed
+ * by vectors in the loader itself.) Columns are accessible by name or
+ * index. New columns may be added at any time; the new column takes the
+ * next available index.
+ */
+
+public class TupleSchema implements TupleMetadata {
+
+  /**
+   * Abstract definition of column metadata. Allows applications to create
+   * specialized forms of a column metadata object by extending from this
+   * abstract class.
+   * <p>
+   * Note that, by design, primitive columns do not have a link to their
+   * tuple parent, or their index within that parent. This allows the same
+   * metadata to be shared between two views of a tuple, perhaps physical
+   * and projected views. This restriction does not apply to map columns,
+   * since maps (and the row itself) will, by definition, differ between
+   * the two views.
+   */
+
+  public static abstract class AbstractColumnMetadata implements ColumnMetadata {
+
+    protected MaterializedField schema;
+    protected boolean projected = true;
+
+    /**
+     * Predicted number of elements per array entry. Default is
+     * taken from the often hard-coded value of 10.
+     */
+
+    protected int expectedElementCount = 1;
+
+    public AbstractColumnMetadata(MaterializedField schema) {
+      this.schema = schema;
+      if (isArray()) {
+        expectedElementCount = DEFAULT_ARRAY_SIZE;
+      }
+    }
+
+    public AbstractColumnMetadata(AbstractColumnMetadata from) {
+      schema = from.schema;
+      expectedElementCount = from.expectedElementCount;
+    }
+
+    protected void bind(TupleSchema parentTuple) { }
+
+    @Override
+    public MaterializedField schema() { return schema; }
+
+    public void replaceField(MaterializedField field) {
+      this.schema = field;
+    }
+    @Override
+    public String name() { return schema().getName(); }
+
+    @Override
+    public MajorType majorType() { return schema().getType(); }
+
+    @Override
+    public MinorType type() { return schema().getType().getMinorType(); }
+
+    @Override
+    public DataMode mode() { return schema().getDataMode(); }
+
+    @Override
+    public boolean isNullable() { return mode() == DataMode.OPTIONAL; }
+
+    @Override
+    public boolean isArray() { return mode() == DataMode.REPEATED; }
+
+    @Override
+    public boolean isList() { return false; }
+
+    @Override
+    public boolean isVariableWidth() {
+      MinorType type = type();
+      return type == MinorType.VARCHAR || type == MinorType.VAR16CHAR || type == MinorType.VARBINARY;
+    }
+
+    @Override
+    public boolean isEquivalent(ColumnMetadata other) {
+      return schema().isEquivalent(other.schema());
+    }
+
+    @Override
+    public int expectedWidth() { return 0; }
+
+    @Override
+    public void setExpectedWidth(int width) { }
+
+    @Override
+    public void setExpectedElementCount(int childCount) {
+      // The allocation utilities don't like an array size of zero, so set to
+      // 1 as the minimum. Adjusted to avoid trivial errors if the caller
+      // makes an error.
+
+      if (isArray()) {
+        expectedElementCount = Math.max(1, childCount);
+      }
+    }
+
+    @Override
+    public int expectedElementCount() { return expectedElementCount; }
+
+    @Override
+    public void setProjected(boolean projected) {
+      this.projected = projected;
+    }
+
+    @Override
+    public boolean isProjected() { return projected; }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder()
+          .append("[")
+          .append(getClass().getSimpleName())
+          .append(" ")
+          .append(schema().toString())
+          .append(",")
+          .append(projected ? "" : "not ")
+          .append("projected");
+      if (isArray()) {
+        buf.append(", cardinality: ")
+           .append(expectedElementCount);
+      }
+      return buf
+          .append("]")
+          .toString();
+    }
+
+    public abstract AbstractColumnMetadata copy();
+  }
+
+  /**
+   * Primitive (non-map) column. Describes non-nullable, nullable and
+   * array types (which differ only in mode, but not in metadata structure.)
+   */
+
+  public static class PrimitiveColumnMetadata extends AbstractColumnMetadata {
+
+    protected int expectedWidth;
+
+    public PrimitiveColumnMetadata(MaterializedField schema) {
+      super(schema);
+      expectedWidth = TypeHelper.getSize(majorType());
+      if (isVariableWidth()) {
+
+        // The above getSize() method uses the deprecated getWidth()
+        // method to get the expected VarChar size. If zero (which
+        // it will be), try the revised precision field.
+
+        int precision = majorType().getPrecision();
+        if (precision > 0) {
+          expectedWidth = precision;
+        } else {
+          // TypeHelper includes the offset vector width
+
+          expectedWidth = expectedWidth - 4;
+        }
+      }
+    }
+
+    public PrimitiveColumnMetadata(PrimitiveColumnMetadata from) {
+      super(from);
+      expectedWidth = from.expectedWidth;
+    }
+
+    @Override
+    public AbstractColumnMetadata copy() {
+      return new PrimitiveColumnMetadata(this);
+    }
+
+    @Override
+    public ColumnMetadata.StructureType structureType() { return ColumnMetadata.StructureType.PRIMITIVE; }
+
+    @Override
+    public TupleMetadata mapSchema() { return null; }
+
+    @Override
+    public boolean isMap() { return false; }
+
+    @Override
+    public int expectedWidth() { return expectedWidth; }
+
+    @Override
+    public void setExpectedWidth(int width) {
+      // The allocation utilities don't like a width of zero, so set to
+      // 1 as the minimum. Adjusted to avoid trivial errors if the caller
+      // makes an error.
+
+      if (isVariableWidth()) {
+        expectedWidth = Math.max(1, width);
+      }
+    }
+
+    @Override
+    public ColumnMetadata cloneEmpty() {
+      return new PrimitiveColumnMetadata(this);
+    }
+  }
+
+  /**
+   * Describes a map and repeated map. Both are tuples that have a tuple
+   * schema as part of the column definition.
+   */
+
+  public static class MapColumnMetadata extends AbstractColumnMetadata {
+    private TupleMetadata parentTuple;
+    private final TupleSchema mapSchema;
+
+    /**
+     * Build a new map column from the field provided
+     *
+     * @param schema materialized field description of the map
+     */
+
+    public MapColumnMetadata(MaterializedField schema) {
+      this(schema, null);
+    }
+
+    /**
+     * Build a map column metadata by cloning the type information (but not
+     * the children) of the materialized field provided. Use the hints
+     * provided.
+     *
+     * @param schema the schema to use
+     * @param hints metadata hints for this column
+     */
+
+    private MapColumnMetadata(MaterializedField schema, TupleSchema mapSchema) {
+      super(schema);
+      if (mapSchema == null) {
+        this.mapSchema = new TupleSchema();
+      } else {
+        this.mapSchema = mapSchema;
+      }
+      this.mapSchema.bind(this);
+    }
+
+    @Override
+    public AbstractColumnMetadata copy() {
+      return new MapColumnMetadata(schema, (TupleSchema) mapSchema.copy());
+    }
+
+    @Override
+    protected void bind(TupleSchema parentTuple) {
+      this.parentTuple = parentTuple;
+    }
+
+    @Override
+    public ColumnMetadata.StructureType structureType() { return ColumnMetadata.StructureType.TUPLE; }
+
+    @Override
+    public TupleMetadata mapSchema() { return mapSchema; }
+
+    @Override
+    public int expectedWidth() { return 0; }
+
+    @Override
+    public boolean isMap() { return true; }
+
+    public TupleMetadata parentTuple() { return parentTuple; }
+
+    public TupleSchema mapSchemaImpl() { return mapSchema; }
+
+    @Override
+    public ColumnMetadata cloneEmpty() {
+      return new MapColumnMetadata(schema().cloneEmpty(), null);
+    }
+  }
+
+  private MapColumnMetadata parentMap;
+  private final TupleNameSpace<ColumnMetadata> nameSpace = new TupleNameSpace<>();
+
+  public void bind(MapColumnMetadata parentMap) {
+    this.parentMap = parentMap;
+  }
+
+  public static TupleSchema fromFields(Iterable<MaterializedField> fields) {
+    TupleSchema tuple = new TupleSchema();
+    for (MaterializedField field : fields) {
+      tuple.add(field);
+    }
+    return tuple;
+  }
+
+  public TupleMetadata copy() {
+    TupleMetadata tuple = new TupleSchema();
+    for (ColumnMetadata md : this) {
+      tuple.addColumn(((AbstractColumnMetadata) md).copy());
+    }
+    return tuple;
+  }
+
+  /**
+   * Create a column metadata object that holds the given
+   * {@link MaterializedField}. The type of the object will be either a
+   * primitive or map column, depending on the field's type.
+   *
+   * @param field the materialized field to wrap
+   * @return the column metadata that wraps the field
+   */
+
+  public static AbstractColumnMetadata fromField(MaterializedField field) {
+    if (field.getType().getMinorType() == MinorType.MAP) {
+      return newMap(field);
+    } else {
+      return new PrimitiveColumnMetadata(field);
+    }
+  }
+
+  public static AbstractColumnMetadata fromView(MaterializedField field) {
+    if (field.getType().getMinorType() == MinorType.MAP) {
+      return new MapColumnMetadata(field, null);
+    } else {
+      return new PrimitiveColumnMetadata(field);
+    }
+  }
+
+  /**
+   * Create a tuple given the list of columns that make up the tuple.
+   * Creates nested maps as needed.
+   *
+   * @param columns list of columns that make up the tuple
+   * @return a tuple metadata object that contains the columns
+   */
+
+  public static TupleSchema fromColumns(List<ColumnMetadata> columns) {
+    TupleSchema tuple = new TupleSchema();
+    for (ColumnMetadata column : columns) {
+      tuple.add((AbstractColumnMetadata) column);
+    }
+    return tuple;
+  }
+
+  /**
+   * Create a column metadata object for a map column, given the
+   * {@link MaterializedField} that describes the column, and a list
+   * of column metadata objects that describe the columns in the map.
+   *
+   * @param field the materialized field that describes the map column
+   * @param schema metadata that describes the tuple of columns in
+   * the map
+   * @return a map column metadata for the map
+   */
+
+  public static MapColumnMetadata newMap(MaterializedField field, TupleSchema schema) {
+    return new MapColumnMetadata(field, schema);
+  }
+
+  public static MapColumnMetadata newMap(MaterializedField field) {
+    return new MapColumnMetadata(field, fromFields(field.getChildren()));
+  }
+
+  @Override
+  public ColumnMetadata add(MaterializedField field) {
+    AbstractColumnMetadata md = fromField(field);
+    add(md);
+    return md;
+  }
+
+  public ColumnMetadata addView(MaterializedField field) {
+    AbstractColumnMetadata md = fromView(field);
+    add(md);
+    return md;
+  }
+
+  /**
+   * Add a column metadata column created by the caller. Used for specialized
+   * cases beyond those handled by {@link #add(MaterializedField)}.
+   *
+   * @param md the custom column metadata which must have the correct
+   * index set (from {@link #size()}
+   */
+
+  public void add(AbstractColumnMetadata md) {
+    md.bind(this);
+    nameSpace.add(md.name(), md);
+    if (parentMap != null) {
+      parentMap.schema.addChild(md.schema());
+    }
+  }
+
+  @Override
+  public int addColumn(ColumnMetadata column) {
+    add((AbstractColumnMetadata) column);
+    return size() - 1;
+  }
+
+  @Override
+  public MaterializedField column(String name) {
+    ColumnMetadata md = metadata(name);
+    return md == null ? null : md.schema();
+  }
+
+  @Override
+  public ColumnMetadata metadata(String name) {
+    return nameSpace.get(name);
+  }
+
+  @Override
+  public int index(String name) {
+    return nameSpace.indexOf(name);
+  }
+
+  @Override
+  public MaterializedField column(int index) {
+    return metadata(index).schema();
+  }
+
+  @Override
+  public ColumnMetadata metadata(int index) {
+    return nameSpace.get(index);
+  }
+
+  @Override
+  public MapColumnMetadata parent() { return parentMap; }
+
+  @Override
+  public int size() { return nameSpace.count(); }
+
+  @Override
+  public boolean isEmpty() { return nameSpace.count( ) == 0; }
+
+  @Override
+  public Iterator<ColumnMetadata> iterator() {
+    return nameSpace.iterator();
+  }
+
+  @Override
+  public boolean isEquivalent(TupleMetadata other) {
+    TupleSchema otherSchema = (TupleSchema) other;
+    if (nameSpace.count() != otherSchema.nameSpace.count()) {
+      return false;
+    }
+    for (int i = 0; i < nameSpace.count(); i++) {
+      if (! nameSpace.get(i).isEquivalent(otherSchema.nameSpace.get(i))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public List<MaterializedField> toFieldList() {
+    List<MaterializedField> cols = new ArrayList<>();
+    for (ColumnMetadata md : nameSpace) {
+      cols.add(md.schema());
+    }
+    return cols;
+  }
+
+  public BatchSchema toBatchSchema(SelectionVectorMode svMode) {
+    return new BatchSchema(svMode, toFieldList());
+  }
+
+  @Override
+  public String fullName(int index) {
+    return fullName(metadata(index));
+  }
+
+  @Override
+  public String fullName(ColumnMetadata column) {
+    String quotedName = column.name();
+    if (quotedName.contains(".")) {
+      quotedName = "`" + quotedName + "`";
+    }
+    if (isRoot()) {
+      return column.name();
+    } else {
+      return fullName() + "." + quotedName;
+    }
+  }
+
+  public String fullName() {
+    if (isRoot()) {
+      return "<root>";
+    } else {
+      return parentMap.parentTuple().fullName(parentMap);
+    }
+  }
+
+  public boolean isRoot() { return parentMap == null; }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder()
+        .append("[")
+        .append(getClass().getSimpleName())
+        .append(" ");
+    boolean first = true;
+    for (ColumnMetadata md : nameSpace) {
+      if (! first) {
+        buf.append(", ");
+      }
+      buf.append(md.toString());
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index a38a7fe..42f3473 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -24,10 +24,16 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.record.DeadBuf;
 
 /**
- * A selection vector that fronts, at most, a
+ * A selection vector that fronts, at most, 64K values.
+ * The selection vector is used for two cases:
+ * <ol>
+ * <li>To create a list of values retained by a filter.</li>
+ * <li>To provide a redirection level for sorted
+ * batches.</li>
+ * </ol>
  */
+
 public class SelectionVector2 implements AutoCloseable {
-  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
 
   private final BufferAllocator allocator;
   private int recordCount;
@@ -39,9 +45,19 @@ public class SelectionVector2 implements AutoCloseable {
     this.allocator = allocator;
   }
 
+  /**
+   * Create a selection vector with the given buffer. The selection vector
+   * increments the buffer's reference count, talking ownership of the buffer.
+   *
+   * @param allocator allocator used to allocate the buffer
+   * @param buf the buffer containing the selection vector's data
+   * @param count the number of values in the selection vector
+   */
+
   public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count) {
     this.allocator = allocator;
     buffer = buf;
+    buffer.retain(1);
     recordCount = count;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
index cfb8645..a283924 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -27,15 +27,16 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.cache.VectorSerializer.Reader;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.drill.test.rowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.SchemaBuilder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -73,7 +74,7 @@ public class TestBatchSerialization extends DrillTest {
       if (i % 2 == 0) {
         RowSetUtilities.setFromInt(writer, 0, i);
       } else {
-        writer.column(0).setNull();
+        writer.scalar(0).setNull();
       }
       writer.save();
     }
@@ -125,9 +126,8 @@ public class TestBatchSerialization extends DrillTest {
 
     RowSet result;
     try (InputStream in = new BufferedInputStream(new FileInputStream(outFile))) {
-      result = fixture.wrap(
-        VectorSerializer.reader(fixture.allocator(), in)
-          .read());
+      Reader reader = VectorSerializer.reader(fixture.allocator(), in);
+      result = fixture.wrap(reader.read(), reader.sv2());
     }
 
     new RowSetComparison(expected)
@@ -163,17 +163,17 @@ public class TestBatchSerialization extends DrillTest {
 
   private SingleRowSet buildMapSet(BatchSchema schema) {
     return fixture.rowSetBuilder(schema)
-        .add(1, 100, "first")
-        .add(2, 200, "second")
-        .add(3, 300, "third")
+        .addRow(1, new Object[] {100, "first"})
+        .addRow(2, new Object[] {200, "second"})
+        .addRow(3, new Object[] {300, "third"})
         .build();
   }
 
   private SingleRowSet buildArraySet(BatchSchema schema) {
     return fixture.rowSetBuilder(schema)
-        .add(1, new String[] { "first, second, third" } )
-        .add(2, null)
-        .add(3, new String[] { "third, fourth, fifth" } )
+        .addRow(1, new String[] { "first, second, third" } )
+        .addRow(2, null)
+        .addRow(3, new String[] { "third, fourth, fifth" } )
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
index fa6e318..e7d0a97 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
@@ -80,16 +80,16 @@ public class TopNBatchTest extends PopUnitTestBase {
 
     try (RootAllocator allocator = new RootAllocator(100_000_000)) {
       expectedRowSet = new RowSetBuilder(allocator, batchSchema)
-        .add(110, 10)
-        .add(109, 9)
-        .add(108, 8)
-        .add(107, 7)
-        .add(106, 6)
-        .add(105, 5)
-        .add(104, 4)
-        .add(103, 3)
-        .add(102, 2)
-        .add(101, 1)
+        .addRow(110, 10)
+        .addRow(109, 9)
+        .addRow(108, 8)
+        .addRow(107, 7)
+        .addRow(106, 6)
+        .addRow(105, 5)
+        .addRow(104, 4)
+        .addRow(103, 3)
+        .addRow(102, 2)
+        .addRow(101, 1)
         .build();
 
       PriorityQueue queue;
@@ -121,10 +121,10 @@ public class TopNBatchTest extends PopUnitTestBase {
 
         for (int batchCounter = 0; batchCounter < numBatches; batchCounter++) {
           RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, batchSchema);
-          rowSetBuilder.add((batchCounter + bound), batchCounter);
+          rowSetBuilder.addRow((batchCounter + bound), batchCounter);
 
           for (int recordCounter = 0; recordCounter < numRecordsPerBatch; recordCounter++) {
-            rowSetBuilder.add(random.nextInt(bound), random.nextInt(bound));
+            rowSetBuilder.addRow(random.nextInt(bound), random.nextInt(bound));
           }
 
           VectorContainer vectorContainer = rowSetBuilder.build().container();
@@ -135,7 +135,7 @@ public class TopNBatchTest extends PopUnitTestBase {
         VectorContainer resultContainer = queue.getHyperBatch();
         resultContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
-        RowSet.HyperRowSet actualHyperSet = new HyperRowSetImpl(allocator, resultContainer, queue.getFinalSv4());
+        RowSet.HyperRowSet actualHyperSet = new HyperRowSetImpl(resultContainer, queue.getFinalSv4());
         new RowSetComparison(expectedRowSet).verify(actualHyperSet);
       } finally {
         if (expectedRowSet != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
index eafb4c8..202a0f1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
@@ -68,10 +68,10 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add(10, 100)
-        .add(20, 120)
-        .add(30, null)
-        .add(40, 140)
+        .addRow(10, 100)
+        .addRow(20, 120)
+        .addRow(30, null)
+        .addRow(40, 140)
         .build();
 
     BatchValidator validator = new BatchValidator(batch.vectorAccessible(), true);
@@ -88,10 +88,10 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add("col1.1", "col1.2")
-        .add("col2.1", "col2.2")
-        .add("col3.1", null)
-        .add("col4.1", "col4.2")
+        .addRow("col1.1", "col1.2")
+        .addRow("col2.1", "col2.2")
+        .addRow("col3.1", null)
+        .addRow("col4.1", "col4.2")
         .build();
 
     BatchValidator validator = new BatchValidator(batch.vectorAccessible(), true);
@@ -108,9 +108,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add(new int[] {}, new String[] {})
-        .add(new int[] {1, 2, 3}, new String[] {"fred", "barney", "wilma"})
-        .add(new int[] {4}, new String[] {"dino"})
+        .addRow(new int[] {}, new String[] {})
+        .addRow(new int[] {1, 2, 3}, new String[] {"fred", "barney", "wilma"})
+        .addRow(new int[] {4}, new String[] {"dino"})
         .build();
 
     BatchValidator validator = new BatchValidator(batch.vectorAccessible(), true);
@@ -126,9 +126,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add("x")
-        .add("y")
-        .add("z")
+        .addRow("x")
+        .addRow("y")
+        .addRow("z")
         .build();
 
     // Here we are evil: stomp on the last offset to simulate corruption.
@@ -160,9 +160,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add("x")
-        .add("y")
-        .add("z")
+        .addRow("x")
+        .addRow("y")
+        .addRow("z")
         .build();
 
     zapOffset(batch, 0, 1);
@@ -198,9 +198,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add("xx")
-        .add("yy")
-        .add("zz")
+        .addRow("xx")
+        .addRow("yy")
+        .addRow("zz")
         .build();
 
     zapOffset(batch, 2, 1);
@@ -222,9 +222,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add("xx")
-        .add("yy")
-        .add("zz")
+        .addRow("xx")
+        .addRow("yy")
+        .addRow("zz")
         .build();
 
     zapOffset(batch, 1, 10);
@@ -246,9 +246,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add("xx")
-        .add("yy")
-        .add("zz")
+        .addRow("xx")
+        .addRow("yy")
+        .addRow("zz")
         .build();
 
     zapOffset(batch, 3, 100_000);
@@ -270,9 +270,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add((Object) new String[] {})
-        .add((Object) new String[] {"fred", "barney", "wilma"})
-        .add((Object) new String[] {"dino"})
+        .addRow((Object) new String[] {})
+        .addRow((Object) new String[] {"fred", "barney", "wilma"})
+        .addRow((Object) new String[] {"dino"})
         .build();
 
     VectorAccessible va = batch.vectorAccessible();
@@ -298,9 +298,9 @@ public class TestBatchValidator /* TODO: extends SubOperatorTest */ {
         .build();
 
     SingleRowSet batch = fixture.rowSetBuilder(schema)
-        .add((Object) new String[] {})
-        .add((Object) new String[] {"fred", "barney", "wilma"})
-        .add((Object) new String[] {"dino"})
+        .addRow((Object) new String[] {})
+        .addRow((Object) new String[] {"fred", "barney", "wilma"})
+        .addRow((Object) new String[] {"dino"})
         .build();
 
     VectorAccessible va = batch.vectorAccessible();

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
index c52f1a9..563d97e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
@@ -60,7 +60,7 @@ public class TestExternalSort extends BaseTestQuery {
       final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema);
 
       for (int i = 0; i <= record_count; i += 2) {
-        rowSetBuilder.add(i);
+        rowSetBuilder.addRow(i);
       }
 
       final RowSet rowSet = rowSetBuilder.build();
@@ -76,7 +76,7 @@ public class TestExternalSort extends BaseTestQuery {
       final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema);
 
       for (int i = 1; i <= record_count; i += 2) {
-        rowSetBuilder.add((float) i);
+        rowSetBuilder.addRow((float) i);
       }
 
       final RowSet rowSet = rowSetBuilder.build();
@@ -131,7 +131,7 @@ public class TestExternalSort extends BaseTestQuery {
       final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema);
 
       for (int i = 0; i <= record_count; i += 2) {
-        rowSetBuilder.add(i);
+        rowSetBuilder.addRow(i);
       }
 
       final RowSet rowSet = rowSetBuilder.build();
@@ -147,7 +147,7 @@ public class TestExternalSort extends BaseTestQuery {
       final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema);
 
       for (int i = 1; i <= record_count; i += 2) {
-        rowSetBuilder.add(i);
+        rowSetBuilder.addRow(i);
       }
 
       final RowSet rowSet = rowSetBuilder.build();
@@ -199,7 +199,7 @@ public class TestExternalSort extends BaseTestQuery {
       final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema);
 
       for (int i = 0; i <= record_count; i += 2) {
-        rowSetBuilder.add(i, i);
+        rowSetBuilder.addRow(i, i);
       }
 
       final RowSet rowSet = rowSetBuilder.build();
@@ -216,7 +216,7 @@ public class TestExternalSort extends BaseTestQuery {
       final RowSetBuilder rowSetBuilder = new RowSetBuilder(allocator, schema);
 
       for (int i = 1; i <= record_count; i += 2) {
-        rowSetBuilder.add(i, i);
+        rowSetBuilder.addRow(i, i);
       }
 
       final RowSet rowSet = rowSetBuilder.build();

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
index c58abd6..cd408cb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
@@ -33,12 +33,12 @@ import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrap
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.TupleMetadata;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.drill.test.rowSet.RowSetSchema;
 import org.apache.drill.test.rowSet.SchemaBuilder;
 
 import com.google.common.collect.Lists;
@@ -93,7 +93,7 @@ public class SortTestUtilities {
     public void run() throws Exception {
       PriorityQueueCopierWrapper copier = makeCopier(fixture, sortOrder, nullOrder);
       List<BatchGroup> batches = new ArrayList<>();
-      RowSetSchema schema = null;
+      TupleMetadata schema = null;
       for (SingleRowSet rowSet : rowSets) {
         batches.add(new BatchGroup.InputBatch(rowSet.container(), rowSet.getSv2(),
                     fixture.allocator(), rowSet.size()));
@@ -103,7 +103,7 @@ public class SortTestUtilities {
       }
       int rowCount = outputRowCount();
       VectorContainer dest = new VectorContainer();
-      BatchMerger merger = copier.startMerge(schema.toBatchSchema(SelectionVectorMode.NONE),
+      BatchMerger merger = copier.startMerge(new BatchSchema(SelectionVectorMode.NONE, schema.toFieldList()),
                                              batches, dest, rowCount, null);
 
       verifyResults(merger, dest);
@@ -121,7 +121,7 @@ public class SortTestUtilities {
     protected void verifyResults(BatchMerger merger, VectorContainer dest) {
       for (RowSet expectedSet : expected) {
         assertTrue(merger.next());
-        RowSet rowSet = new DirectRowSet(fixture.allocator(), dest);
+        RowSet rowSet = DirectRowSet.fromContainer(dest);
         new RowSetComparison(expectedSet)
               .verifyAndClearAll(rowSet);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
index f1c622f..5d438ee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
@@ -29,15 +29,13 @@ import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrap
 import org.apache.drill.exec.physical.impl.xsort.managed.SortTestUtilities.CopierTester;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.drill.test.rowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.SchemaBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -51,19 +49,7 @@ import org.junit.experimental.categories.Category;
  */
 
 @Category(OperatorTest.class)
-public class TestCopier extends DrillTest {
-
-  public static OperatorFixture fixture;
-
-  @BeforeClass
-  public static void setup() {
-    fixture = OperatorFixture.builder().build();
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    fixture.close();
-  }
+public class TestCopier extends SubOperatorTest {
 
   @Test
   public void testEmptyInput() throws Exception {
@@ -101,12 +87,12 @@ public class TestCopier extends DrillTest {
     BatchSchema schema = SortTestUtilities.nonNullSchema();
     CopierTester tester = new CopierTester(fixture);
     tester.addInput(fixture.rowSetBuilder(schema)
-          .add(10, "10")
+          .addRow(10, "10")
           .withSv2()
           .build());
 
     tester.addOutput(fixture.rowSetBuilder(schema)
-          .add(10, "10")
+          .addRow(10, "10")
           .build());
     tester.run();
   }
@@ -116,17 +102,17 @@ public class TestCopier extends DrillTest {
     BatchSchema schema = SortTestUtilities.nonNullSchema();
     CopierTester tester = new CopierTester(fixture);
     tester.addInput(fixture.rowSetBuilder(schema)
-          .add(10, "10")
+          .addRow(10, "10")
           .withSv2()
           .build());
     tester.addInput(fixture.rowSetBuilder(schema)
-          .add(20, "20")
+          .addRow(20, "20")
           .withSv2()
           .build());
 
     tester.addOutput(fixture.rowSetBuilder(schema)
-          .add(10, "10")
-          .add(20, "20")
+          .addRow(10, "10")
+          .addRow(20, "20")
           .build());
     tester.run();
   }
@@ -137,7 +123,7 @@ public class TestCopier extends DrillTest {
     int value = first;
     for (int i = 0; i < count; i++, value += step) {
       RowSetUtilities.setFromInt(writer, 0, value);
-      writer.column(1).setString(Integer.toString(value));
+      writer.scalar(1).setString(Integer.toString(value));
       writer.save();
     }
     writer.done();
@@ -188,25 +174,25 @@ public class TestCopier extends DrillTest {
     tester.sortOrder = Ordering.ORDER_ASC;
     tester.nullOrder = Ordering.NULLS_LAST;
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(1, "1")
-        .add(4, "4")
-        .add(null, "null")
+        .addRow(1, "1")
+        .addRow(4, "4")
+        .addRow(null, "null")
         .withSv2()
         .build());
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(2, "2")
-        .add(3, "3")
-        .add(null, "null")
+        .addRow(2, "2")
+        .addRow(3, "3")
+        .addRow(null, "null")
         .withSv2()
         .build());
 
     tester.addOutput(fixture.rowSetBuilder(schema)
-        .add(1, "1")
-        .add(2, "2")
-        .add(3, "3")
-        .add(4, "4")
-        .add(null, "null")
-        .add(null, "null")
+        .addRow(1, "1")
+        .addRow(2, "2")
+        .addRow(3, "3")
+        .addRow(4, "4")
+        .addRow(null, "null")
+        .addRow(null, "null")
         .build());
 
     tester.run();
@@ -220,25 +206,25 @@ public class TestCopier extends DrillTest {
     tester.sortOrder = Ordering.ORDER_ASC;
     tester.nullOrder = Ordering.NULLS_FIRST;
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(null, "null")
-        .add(1, "1")
-        .add(4, "4")
+        .addRow(null, "null")
+        .addRow(1, "1")
+        .addRow(4, "4")
         .withSv2()
         .build());
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(null, "null")
-        .add(2, "2")
-        .add(3, "3")
+        .addRow(null, "null")
+        .addRow(2, "2")
+        .addRow(3, "3")
         .withSv2()
         .build());
 
     tester.addOutput(fixture.rowSetBuilder(schema)
-        .add(null, "null")
-        .add(null, "null")
-        .add(1, "1")
-        .add(2, "2")
-        .add(3, "3")
-        .add(4, "4")
+        .addRow(null, "null")
+        .addRow(null, "null")
+        .addRow(1, "1")
+        .addRow(2, "2")
+        .addRow(3, "3")
+        .addRow(4, "4")
         .build());
 
     tester.run();
@@ -252,25 +238,25 @@ public class TestCopier extends DrillTest {
     tester.sortOrder = Ordering.ORDER_DESC;
     tester.nullOrder = Ordering.NULLS_LAST;
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(4, "4")
-        .add(1, "1")
-        .add(null, "null")
+        .addRow(4, "4")
+        .addRow(1, "1")
+        .addRow(null, "null")
         .withSv2()
         .build());
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(3, "3")
-        .add(2, "2")
-        .add(null, "null")
+        .addRow(3, "3")
+        .addRow(2, "2")
+        .addRow(null, "null")
         .withSv2()
         .build());
 
     tester.addOutput(fixture.rowSetBuilder(schema)
-        .add(4, "4")
-        .add(3, "3")
-        .add(2, "2")
-        .add(1, "1")
-        .add(null, "null")
-        .add(null, "null")
+        .addRow(4, "4")
+        .addRow(3, "3")
+        .addRow(2, "2")
+        .addRow(1, "1")
+        .addRow(null, "null")
+        .addRow(null, "null")
         .build());
 
     tester.run();
@@ -284,25 +270,25 @@ public class TestCopier extends DrillTest {
     tester.sortOrder = Ordering.ORDER_DESC;
     tester.nullOrder = Ordering.NULLS_FIRST;
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(null, "null")
-        .add(4, "4")
-        .add(1, "1")
+        .addRow(null, "null")
+        .addRow(4, "4")
+        .addRow(1, "1")
         .withSv2()
         .build());
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(null, "null")
-        .add(3, "3")
-        .add(2, "2")
+        .addRow(null, "null")
+        .addRow(3, "3")
+        .addRow(2, "2")
         .withSv2()
         .build());
 
     tester.addOutput(fixture.rowSetBuilder(schema)
-        .add(null, "null")
-        .add(null, "null")
-        .add(4, "4")
-        .add(3, "3")
-        .add(2, "2")
-        .add(1, "1")
+        .addRow(null, "null")
+        .addRow(null, "null")
+        .addRow(4, "4")
+        .addRow(3, "3")
+        .addRow(2, "2")
+        .addRow(1, "1")
         .build());
 
     tester.run();
@@ -362,22 +348,22 @@ public class TestCopier extends DrillTest {
 
     CopierTester tester = new CopierTester(fixture);
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(1, 10, 100)
-        .add(5, 50, 500)
+        .addRow(1, new Object[] {10, new Object[] {100}})
+        .addRow(5, new Object[] {50, new Object[] {500}})
         .withSv2()
         .build());
 
     tester.addInput(fixture.rowSetBuilder(schema)
-        .add(2, 20, 200)
-        .add(6, 60, 600)
+        .addRow(2, new Object[] {20, new Object[] {200}})
+        .addRow(6, new Object[] {60, new Object[] {600}})
         .withSv2()
         .build());
 
     tester.addOutput(fixture.rowSetBuilder(schema)
-        .add(1, 10, 100)
-        .add(2, 20, 200)
-        .add(5, 50, 500)
-        .add(6, 60, 600)
+        .addRow(1, new Object[] {10, new Object[] {100}})
+        .addRow(2, new Object[] {20, new Object[] {200}})
+        .addRow(5, new Object[] {50, new Object[] {500}})
+        .addRow(6, new Object[] {60, new Object[] {600}})
         .build());
 
     tester.run();

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java
index ba5dfce..38e3698 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java
@@ -61,9 +61,9 @@ public class TestShortArrays extends SubOperatorTest {
         .addArray("b", MinorType.INT)
         .build();
     RowSetBuilder builder = fixture.rowSetBuilder(schema)
-        .add(1, new int[] {10});
+        .addRow(1, new int[] {10});
     for (int i = 2; i <= 10; i++) {
-      builder.add(i, new int[] {});
+      builder.addRow(i, new int[] {});
     }
     RowSet rows = builder.build();
 
@@ -87,9 +87,9 @@ public class TestShortArrays extends SubOperatorTest {
 
     SingleRowSet empty = fixture.rowSet(schema);
     vi.allocateBatch(empty.container(), 100);
-    assertEquals(2, empty.vectors().length);
+    assertEquals(2, empty.container().getNumberOfColumns());
     @SuppressWarnings("resource")
-    ValueVector bVector = empty.vectors()[1];
+    ValueVector bVector = empty.container().getValueVector(1).getValueVector();
     assertTrue(bVector instanceof RepeatedIntVector);
     assertEquals(16, ((RepeatedIntVector) bVector).getDataVector().getValueCapacity());
 

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index d83a765..93411d7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -46,8 +46,8 @@ import org.apache.drill.test.rowSet.HyperRowSetImpl;
 import org.apache.drill.test.rowSet.IndirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.test.rowSet.RowSet.RowSetReader;
-import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.SchemaBuilder;
@@ -193,11 +193,11 @@ public class TestSortImpl extends DrillTest {
 
   private static RowSet toRowSet(OperatorFixture fixture, SortResults results, VectorContainer dest) {
     if (results.getSv4() != null) {
-      return new HyperRowSetImpl(fixture.allocator(), dest, results.getSv4());
+      return new HyperRowSetImpl(dest, results.getSv4());
     } else if (results.getSv2() != null) {
-      return new IndirectRowSet(fixture.allocator(), dest, results.getSv2());
+      return IndirectRowSet.fromSv2(dest, results.getSv2());
     } else {
-      return new DirectRowSet(fixture.allocator(), dest);
+      return DirectRowSet.fromContainer(dest);
     }
   }
 
@@ -242,10 +242,10 @@ public class TestSortImpl extends DrillTest {
       BatchSchema schema = SortTestUtilities.nonNullSchema();
       SortTestFixture sortTest = new SortTestFixture(fixture);
       sortTest.addInput(fixture.rowSetBuilder(schema)
-          .add(1, "first")
+          .addRow(1, "first")
           .build());
       sortTest.addOutput(fixture.rowSetBuilder(schema)
-          .add(1, "first")
+          .addRow(1, "first")
           .build());
       sortTest.run();
     }
@@ -262,12 +262,12 @@ public class TestSortImpl extends DrillTest {
       BatchSchema schema = SortTestUtilities.nonNullSchema();
       SortTestFixture sortTest = new SortTestFixture(fixture);
       sortTest.addInput(fixture.rowSetBuilder(schema)
-          .add(2, "second")
-          .add(1, "first")
+          .addRow(2, "second")
+          .addRow(1, "first")
           .build());
       sortTest.addOutput(fixture.rowSetBuilder(schema)
-          .add(1, "first")
-          .add(2, "second")
+          .addRow(1, "first")
+          .addRow(2, "second")
           .build());
       sortTest.run();
     }
@@ -285,14 +285,14 @@ public class TestSortImpl extends DrillTest {
       BatchSchema schema = SortTestUtilities.nonNullSchema();
       SortTestFixture sortTest = new SortTestFixture(fixture);
       sortTest.addInput(fixture.rowSetBuilder(schema)
-          .add(2, "second")
+          .addRow(2, "second")
           .build());
       sortTest.addInput(fixture.rowSetBuilder(schema)
-          .add(1, "first")
+          .addRow(1, "first")
           .build());
       sortTest.addOutput(fixture.rowSetBuilder(schema)
-          .add(1, "first")
-          .add(2, "second")
+          .addRow(1, "first")
+          .addRow(2, "second")
           .build());
       sortTest.run();
     }
@@ -356,7 +356,7 @@ public class TestSortImpl extends DrillTest {
       RowSetBuilder builder = fixture.rowSetBuilder(schema);
       int end = Math.min(batchSize, targetCount - rowCount);
       for (int i = 0; i < end; i++) {
-        builder.add(currentValue, i + ", " + currentValue);
+        builder.addRow(currentValue, i + ", " + currentValue);
         currentValue = (currentValue + step) % targetCount;
         rowCount++;
       }
@@ -387,7 +387,7 @@ public class TestSortImpl extends DrillTest {
       RowSetReader reader = output.reader();
       while (reader.next()) {
         assertEquals("Value of " + batchCount + ":" + rowCount,
-            rowCount, reader.column(0).getInt());
+            rowCount, reader.scalar(0).getInt());
         rowCount++;
       }
     }
@@ -593,18 +593,18 @@ public class TestSortImpl extends DrillTest {
         }
       };
       sortTest.addInput(fixture.rowSetBuilder(schema)
-          .add(2, "second")
+          .addRow(2, "second")
           .build());
       sortTest.addInput(fixture.rowSetBuilder(schema)
-          .add(3, "third")
+          .addRow(3, "third")
           .build());
       sortTest.addInput(fixture.rowSetBuilder(schema)
-          .add(1, "first")
+          .addRow(1, "first")
           .build());
       sortTest.addOutput(fixture.rowSetBuilder(schema)
-          .add(1, "first")
-          .add(2, "second")
-          .add(3, "third")
+          .addRow(1, "first")
+          .addRow(2, "second")
+          .addRow(3, "third")
           .build());
       sortTest.run();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
index 5f04da6..c24f1a6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
@@ -36,8 +36,8 @@ import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.test.rowSet.RowSet.RowSetReader;
-import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
@@ -111,12 +111,12 @@ public class TestSorter extends DrillTest {
   public void testSingleRow() throws Exception {
     BatchSchema schema = SortTestUtilities.nonNullSchema();
     SingleRowSet rowSet = new RowSetBuilder(fixture.allocator(), schema)
-          .add(0, "0")
+          .addRow(0, "0")
           .withSv2()
           .build();
 
     SingleRowSet expected = new RowSetBuilder(fixture.allocator(), schema)
-        .add(0, "0")
+        .addRow(0, "0")
         .build();
     runSorterTest(rowSet, expected);
   }
@@ -127,14 +127,14 @@ public class TestSorter extends DrillTest {
   public void testTwoRows() throws Exception {
     BatchSchema schema = SortTestUtilities.nonNullSchema();
     SingleRowSet rowSet = new RowSetBuilder(fixture.allocator(), schema)
-        .add(1, "1")
-        .add(0, "0")
+        .addRow(1, "1")
+        .addRow(0, "0")
         .withSv2()
         .build();
 
     SingleRowSet expected = new RowSetBuilder(fixture.allocator(), schema)
-        .add(0, "0")
-        .add(1, "1")
+        .addRow(0, "0")
+        .addRow(1, "1")
         .build();
     runSorterTest(rowSet, expected);
   }
@@ -207,11 +207,11 @@ public class TestSorter extends DrillTest {
       for (int i = 0; i < items.length; i++) {
         DataItem item = items[i];
         if (nullable && item.isNull) {
-          writer.column(0).setNull();
+          writer.scalar(0).setNull();
         } else {
           RowSetUtilities.setFromInt(writer, 0, item.key);
         }
-        writer.column(1).setString(Integer.toString(item.value));
+        writer.scalar(1).setString(Integer.toString(item.value));
         writer.save();
       }
       writer.done();
@@ -221,7 +221,7 @@ public class TestSorter extends DrillTest {
     private void verify(RowSet actual) {
       DataItem expected[] = Arrays.copyOf(data, data.length);
       doSort(expected);
-      RowSet expectedRows = makeDataSet(actual.allocator(), actual.schema().batch(), expected);
+      RowSet expectedRows = makeDataSet(actual.allocator(), actual.batchSchema(), expected);
       doVerify(expected, expectedRows, actual);
     }
 
@@ -369,7 +369,7 @@ public class TestSorter extends DrillTest {
         int mo = rand.nextInt(12);
         int yr = rand.nextInt(10);
         Period period = makePeriod(yr, mo, day, hr, min, sec, ms);
-        builder.add(period);
+        builder.addRow(period);
       }
       return builder.build();
     }
@@ -383,7 +383,7 @@ public class TestSorter extends DrillTest {
       int prevMonths = 0;
       long prevMs = 0;
       while (reader.next()) {
-        Period period = reader.column(0).getPeriod().normalizedStandard();
+        Period period = reader.scalar(0).getPeriod().normalizedStandard();
         int years = period.getYears();
         assertTrue(prevYears <= years);
         if (prevYears != years) {
@@ -586,16 +586,16 @@ public class TestSorter extends DrillTest {
         .build();
 
     SingleRowSet input = fixture.rowSetBuilder(schema)
-        .add(3, "third")
-        .add(1, "first")
-        .add(2, "second")
+        .addRow(3, "third")
+        .addRow(1, "first")
+        .addRow(2, "second")
         .withSv2()
         .build();
 
     SingleRowSet output = fixture.rowSetBuilder(schema)
-        .add(1, "first")
-        .add(2, "second")
-        .add(3, "third")
+        .addRow(1, "first")
+        .addRow(2, "second")
+        .addRow(3, "third")
         .build();
     Sort popConfig = makeSortConfig("map.key", Ordering.ORDER_ASC, Ordering.NULLS_LAST);
     runSorterTest(popConfig, input, output);

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderLimits.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderLimits.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderLimits.java
new file mode 100644
index 0000000..f9f5128
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderLimits.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Tests of the row limit functionality of the result set loader. The
+ * row limit is set up front and has a default value. Because Drill must
+ * discover data structure as it reads, the result set loader also allows changing
+ * the row limit between batches (perhaps Drill discovers that rows are much
+ * narrower or wider than expected.)
+ * <p>
+ * The tests here are independent of the tests for vector allocation (which does,
+ * in fact, depend on the row count) and vector overflow (which an occur when
+ * the row limit turns out to be too large.)
+ */
+
+public class TestResultSetLoaderLimits extends SubOperatorTest {
+
+  /**
+   * Verify that the writer stops when reaching the row limit.
+   * In this case there is no look-ahead row.
+   */
+
+  @Test
+  public void testRowLimit() {
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator());
+    assertEquals(ResultSetLoaderImpl.DEFAULT_ROW_COUNT, rsLoader.targetRowCount());
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    byte value[] = new byte[200];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    rsLoader.startBatch();
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rootWriter.scalar(0).setBytes(value, value.length);
+      rootWriter.save();
+      count++;
+    }
+    assertEquals(ResultSetLoaderImpl.DEFAULT_ROW_COUNT, count);
+    assertEquals(count, rootWriter.rowCount());
+
+    rsLoader.harvest().clear();
+
+    // Do it again, a different way.
+
+    count = 0;
+    rsLoader.startBatch();
+    assertEquals(0, rootWriter.rowCount());
+    while (rootWriter.start()) {
+      rootWriter.scalar(0).setBytes(value, value.length);
+      rootWriter.save();
+      count++;
+    }
+    assertEquals(ResultSetLoaderImpl.DEFAULT_ROW_COUNT, count);
+    assertEquals(count, rootWriter.rowCount());
+
+    rsLoader.harvest().clear();
+
+    rsLoader.close();
+  }
+
+  private static final int TEST_ROW_LIMIT = 1024;
+
+  /**
+   * Verify that the caller can set a row limit lower than the default.
+   */
+
+  @Test
+  public void testCustomRowLimit() {
+
+    // Try to set a default value larger than the hard limit. Value
+    // is truncated to the limit.
+
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT + 1)
+        .build();
+    assertEquals(ValueVector.MAX_ROW_COUNT, options.rowCountLimit);
+
+    // Just a bit of paranoia that we check against the vector limit,
+    // not any previous value...
+
+    options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT + 1)
+        .setRowCountLimit(TEST_ROW_LIMIT)
+        .build();
+    assertEquals(TEST_ROW_LIMIT, options.rowCountLimit);
+
+    options = new OptionBuilder()
+        .setRowCountLimit(TEST_ROW_LIMIT)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT + 1)
+        .build();
+    assertEquals(ValueVector.MAX_ROW_COUNT, options.rowCountLimit);
+
+    // Can't set the limit lower than 1
+
+    options = new OptionBuilder()
+        .setRowCountLimit(0)
+        .build();
+    assertEquals(1, options.rowCountLimit);
+
+    // Do load with a (valid) limit lower than the default.
+
+    options = new OptionBuilder()
+        .setRowCountLimit(TEST_ROW_LIMIT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(TEST_ROW_LIMIT, rsLoader.targetRowCount());
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    rsLoader.startBatch();
+    int count = fillToLimit(rootWriter);
+    assertEquals(TEST_ROW_LIMIT, count);
+    assertEquals(count, rootWriter.rowCount());
+
+    // Should fail to write beyond the row limit
+
+    assertFalse(rootWriter.start());
+    try {
+      rootWriter.save();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    rsLoader.harvest().clear();
+    rsLoader.startBatch();
+    assertEquals(0, rootWriter.rowCount());
+
+    rsLoader.close();
+  }
+
+  private int fillToLimit(RowSetLoader rootWriter) {
+    byte value[] = new byte[200];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rootWriter.scalar(0).setBytes(value, value.length);
+      rootWriter.save();
+      count++;
+    }
+    return count;
+  }
+
+  /**
+   * Test that the row limit can change between batches.
+   */
+
+  @Test
+  public void testDynamicLimit() {
+
+    // Start with a small limit.
+
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(TEST_ROW_LIMIT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(TEST_ROW_LIMIT, rsLoader.targetRowCount());
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    rsLoader.startBatch();
+    int count = fillToLimit(rootWriter);
+    assertEquals(TEST_ROW_LIMIT, count);
+    assertEquals(count, rootWriter.rowCount());
+    rsLoader.harvest().clear();
+
+    // Reset the batch size larger and fill a second batch
+
+    int newLimit = 8000;
+    rsLoader.setTargetRowCount(newLimit);
+    rsLoader.startBatch();
+    count = fillToLimit(rootWriter);
+    assertEquals(newLimit, count);
+    assertEquals(count, rootWriter.rowCount());
+    rsLoader.harvest().clear();
+
+    // Put the limit back to a lower number.
+
+    newLimit = 1000;
+    rsLoader.setTargetRowCount(newLimit);
+    rsLoader.startBatch();
+    count = fillToLimit(rootWriter);
+    assertEquals(newLimit, count);
+    assertEquals(count, rootWriter.rowCount());
+    rsLoader.harvest().clear();
+
+    rsLoader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java
new file mode 100644
index 0000000..115e52d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMapArray.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ArrayReader;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarElementReader;
+import org.apache.drill.exec.vector.accessor.ScalarReader;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleReader;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Test map array support in the result set loader.
+ * <p>
+ * The tests here should be considered in the "extra for experts"
+ * category: run and/or debug these tests only after the scalar
+ * tests work. Maps, and especially repeated maps, are very complex
+ * constructs not to be tackled lightly.
+ */
+
+public class TestResultSetLoaderMapArray extends SubOperatorTest {
+
+  @Test
+  public void testBasics() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMapArray("m")
+          .add("c", MinorType.INT)
+          .add("d", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Verify structure and schema
+
+    TupleMetadata actualSchema = rootWriter.schema();
+    assertEquals(2, actualSchema.size());
+    assertTrue(actualSchema.metadata(1).isArray());
+    assertTrue(actualSchema.metadata(1).isMap());
+    assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+    assertEquals(2, actualSchema.column("m").getChildren().size());
+
+    // Write a couple of rows with arrays.
+
+    rsLoader.startBatch();
+    rootWriter
+      .addRow(10, new Object[] {
+          new Object[] {110, "d1.1"},
+          new Object[] {120, "d2.2"}})
+      .addRow(20, new Object[] {})
+      .addRow(30, new Object[] {
+          new Object[] {310, "d3.1"},
+          new Object[] {320, "d3.2"},
+          new Object[] {330, "d3.3"}})
+      ;
+
+    // Verify the first batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, new Object[] {
+            new Object[] {110, "d1.1"},
+            new Object[] {120, "d2.2"}})
+        .addRow(20, new Object[] {})
+        .addRow(30, new Object[] {
+            new Object[] {310, "d3.1"},
+            new Object[] {320, "d3.2"},
+            new Object[] {330, "d3.3"}})
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    // In the second, create a row, then add a map member.
+    // Should be back-filled to empty for the first row.
+
+    rsLoader.startBatch();
+    rootWriter
+      .addRow(40, new Object[] {
+          new Object[] {410, "d4.1"},
+          new Object[] {420, "d4.2"}});
+
+    TupleWriter mapWriter = rootWriter.array("m").tuple();
+    mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.OPTIONAL));
+
+    rootWriter
+      .addRow(50, new Object[] {
+          new Object[] {510, "d5.1", "e5.1"},
+          new Object[] {520, "d5.2", null}})
+      .addRow(60, new Object[] {
+          new Object[] {610, "d6.1", "e6.1"},
+          new Object[] {620, "d6.2", null},
+          new Object[] {630, "d6.3", "e6.3"}})
+      ;
+
+    // Verify the second batch
+
+    actual = fixture.wrap(rsLoader.harvest());
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMapArray("m")
+          .add("c", MinorType.INT)
+          .add("d", MinorType.VARCHAR)
+          .addNullable("e", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(40, new Object[] {
+            new Object[] {410, "d4.1", null},
+            new Object[] {420, "d4.2", null}})
+        .addRow(50, new Object[] {
+            new Object[] {510, "d5.1", "e5.1"},
+            new Object[] {520, "d5.2", null}})
+        .addRow(60, new Object[] {
+            new Object[] {610, "d6.1", "e6.1"},
+            new Object[] {620, "d6.2", null},
+            new Object[] {630, "d6.3", "e6.3"}})
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    rsLoader.close();
+  }
+
+  @Test
+  public void testNestedArray() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMapArray("m")
+          .add("c", MinorType.INT)
+          .addArray("d", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Write a couple of rows with arrays within arrays.
+    // (And, of course, the Varchar is actually an array of
+    // bytes, so that's three array levels.)
+
+    rsLoader.startBatch();
+    rootWriter
+      .addRow(10, new Object[] {
+          new Object[] {110, new String[] {"d1.1.1", "d1.1.2"}},
+          new Object[] {120, new String[] {"d1.2.1", "d1.2.2"}}})
+      .addRow(20, new Object[] {})
+      .addRow(30, new Object[] {
+          new Object[] {310, new String[] {"d3.1.1", "d3.2.2"}},
+          new Object[] {320, new String[] {}},
+          new Object[] {330, new String[] {"d3.3.1", "d1.2.2"}}})
+      ;
+
+    // Verify the batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, new Object[] {
+            new Object[] {110, new String[] {"d1.1.1", "d1.1.2"}},
+            new Object[] {120, new String[] {"d1.2.1", "d1.2.2"}}})
+        .addRow(20, new Object[] {})
+        .addRow(30, new Object[] {
+            new Object[] {310, new String[] {"d3.1.1", "d3.2.2"}},
+            new Object[] {320, new String[] {}},
+            new Object[] {330, new String[] {"d3.3.1", "d1.2.2"}}})
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test a doubly-nested arrays of maps.
+   */
+
+  @Test
+  public void testDoubleNestedArray() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMapArray("m1")
+          .add("b", MinorType.INT)
+          .addMapArray("m2")
+            .add("c", MinorType.INT)
+            .addArray("d", MinorType.VARCHAR)
+            .buildMap()
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+    rsLoader.startBatch();
+
+    ScalarWriter aWriter = rootWriter.scalar("a");
+    ArrayWriter a1Writer = rootWriter.array("m1");
+    TupleWriter m1Writer = a1Writer.tuple();
+    ScalarWriter bWriter = m1Writer.scalar("b");
+    ArrayWriter a2Writer = m1Writer.array("m2");
+    TupleWriter m2Writer = a2Writer.tuple();
+    ScalarWriter cWriter = m2Writer.scalar("c");
+    ScalarWriter dWriter = m2Writer.array("d").scalar();
+
+    for (int i = 0; i < 5; i++) {
+      rootWriter.start();
+      aWriter.setInt(i);
+      for (int j = 0; j < 4; j++) {
+        int a1Key = i + 10 + j;
+        bWriter.setInt(a1Key);
+        for (int k = 0; k < 3; k++) {
+          int a2Key = a1Key * 10 + k;
+          cWriter.setInt(a2Key);
+          for (int l = 0; l < 2; l++) {
+            dWriter.setString("d-" + (a2Key * 10 + l));
+          }
+          a2Writer.save();
+        }
+        a1Writer.save();
+      }
+      rootWriter.save();
+    }
+
+    RowSet results = fixture.wrap(rsLoader.harvest());
+    RowSetReader reader = results.reader();
+
+    ScalarReader aReader = reader.scalar("a");
+    ArrayReader a1Reader = reader.array("m1");
+    TupleReader m1Reader = a1Reader.tuple();
+    ScalarReader bReader = m1Reader.scalar("b");
+    ArrayReader a2Reader = m1Reader.array("m2");
+    TupleReader m2Reader = a2Reader.tuple();
+    ScalarReader cReader = m2Reader.scalar("c");
+    ScalarElementReader dReader = m2Reader.elements("d");
+
+    for (int i = 0; i < 5; i++) {
+      reader.next();
+      assertEquals(i, aReader.getInt());
+      for (int j = 0; j < 4; j++) {
+        a1Reader.setPosn(j);
+        int a1Key = i + 10 + j;
+        assertEquals(a1Key, bReader.getInt());
+        for (int k = 0; k < 3; k++) {
+          a2Reader.setPosn(k);
+          int a2Key = a1Key * 10 + k;
+          assertEquals(a2Key, cReader.getInt());
+          for (int l = 0; l < 2; l++) {
+            assertEquals("d-" + (a2Key * 10 + l), dReader.getString(l));
+          }
+        }
+      }
+    }
+    rsLoader.close();
+  }
+
+  /**
+   * Version of the {#link TestResultSetLoaderProtocol#testOverwriteRow()} test
+   * that uses nested columns inside an array of maps. Here we must call
+   * <tt>start()</tt> to reset the array back to the initial start position after
+   * each "discard."
+   */
+
+  @Test
+  public void testOverwriteRow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMapArray("m")
+          .add("b", MinorType.INT)
+          .add("c", MinorType.VARCHAR)
+        .buildMap()
+      .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Can't use the shortcut to populate rows when doing overwrites.
+
+    ScalarWriter aWriter = rootWriter.scalar("a");
+    ArrayWriter maWriter = rootWriter.array("m");
+    TupleWriter mWriter = maWriter.tuple();
+    ScalarWriter bWriter = mWriter.scalar("b");
+    ScalarWriter cWriter = mWriter.scalar("c");
+
+    // Write 100,000 rows, overwriting 99% of them. This will cause vector
+    // overflow and data corruption if overwrite does not work; but will happily
+    // produce the correct result if everything works as it should.
+
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    rsLoader.startBatch();
+    while (count < 10_000) {
+      rootWriter.start();
+      count++;
+      aWriter.setInt(count);
+      for (int i = 0; i < 10; i++) {
+        bWriter.setInt(count * 10 + i);
+        cWriter.setBytes(value, value.length);
+        maWriter.save();
+      }
+      if (count % 100 == 0) {
+        rootWriter.save();
+      }
+    }
+
+    // Verify using a reader.
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(count / 100, result.rowCount());
+    RowSetReader reader = result.reader();
+    ArrayReader maReader = reader.array("m");
+    TupleReader mReader = maReader.tuple();
+    int rowId = 1;
+    while (reader.next()) {
+      assertEquals(rowId * 100, reader.scalar("a").getInt());
+      assertEquals(10, maReader.size());
+      for (int i = 0; i < 10; i++) {
+        maReader.setPosn(i);
+        assertEquals(rowId * 1000 + i, mReader.scalar("b").getInt());
+        assertTrue(Arrays.equals(value, mReader.scalar("c").getBytes()));
+      }
+      rowId++;
+    }
+
+    result.clear();
+    rsLoader.close();
+  }
+
+  /**
+   * Check that the "fill-empties" logic descends down into
+   * a repeated map.
+   */
+
+  @Test
+  public void testOmittedValues() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("id", MinorType.INT)
+        .addMapArray("m")
+          .addNullable("a", MinorType.INT)
+          .addNullable("b", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    int mapSkip = 5;
+    int entrySkip = 3;
+    int rowCount = 1000;
+    int entryCount = 10;
+
+    rsLoader.startBatch();
+    ArrayWriter maWriter = rootWriter.array("m");
+    TupleWriter mWriter = maWriter.tuple();
+    for (int i = 0; i < rowCount; i++) {
+      rootWriter.start();
+      rootWriter.scalar(0).setInt(i);
+      if (i % mapSkip != 0) {
+        for (int j = 0; j < entryCount; j++) {
+          if (j % entrySkip != 0) {
+            mWriter.scalar(0).setInt(i * entryCount + j);
+            mWriter.scalar(1).setString("b-" + i + "." + j);
+          }
+          maWriter.save();
+        }
+      }
+      rootWriter.save();
+    }
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(rowCount, result.rowCount());
+    RowSetReader reader = result.reader();
+    ArrayReader maReader = reader.array("m");
+    TupleReader mReader = maReader.tuple();
+    for (int i = 0; i < rowCount; i++) {
+      assertTrue(reader.next());
+      assertEquals(i, reader.scalar(0).getInt());
+      if (i % mapSkip == 0) {
+        assertEquals(0, maReader.size());
+        continue;
+      }
+      assertEquals(entryCount, maReader.size());
+      for (int j = 0; j < entryCount; j++) {
+        maReader.setPosn(j);
+        if (j % entrySkip == 0) {
+          assertTrue(mReader.scalar(0).isNull());
+          assertTrue(mReader.scalar(1).isNull());
+        } else {
+          assertFalse(mReader.scalar(0).isNull());
+          assertFalse(mReader.scalar(1).isNull());
+          assertEquals(i * entryCount + j, mReader.scalar(0).getInt());
+          assertEquals("b-" + i + "." + j, mReader.scalar(1).getString());
+        }
+      }
+    }
+    result.clear();
+    rsLoader.close();
+  }
+
+  /**
+   * Test that memory is released if the loader is closed with an active
+   * batch (that is, before the batch is harvested.)
+   */
+
+  @Test
+  public void testCloseWithoutHarvest() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addMapArray("m")
+          .add("a", MinorType.INT)
+          .add("b", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    ArrayWriter maWriter = rootWriter.array("m");
+    TupleWriter mWriter = maWriter.tuple();
+    rsLoader.startBatch();
+    for (int i = 0; i < 40; i++) {
+      rootWriter.start();
+      for (int j = 0; j < 3; j++) {
+        mWriter.scalar("a").setInt(i);
+        mWriter.scalar("b").setString("b-" + i);
+        maWriter.save();
+      }
+      rootWriter.save();
+    }
+
+    // Don't harvest the batch. Allocator will complain if the
+    // loader does not release memory.
+
+    rsLoader.close();
+  }
+}