You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:38 UTC

[11/15] drill git commit: DRILL-5657: Size-aware vector writer structure

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java
new file mode 100644
index 0000000..b23eb0d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderMaps.java
@@ -0,0 +1,810 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleReader;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Test (non-array) map support in the result set loader and related classes.
+ */
+
+public class TestResultSetLoaderMaps extends SubOperatorTest {
+
+  @Test
+  public void testBasics() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .add("c", MinorType.INT)
+          .add("d", MinorType.VARCHAR)
+          .buildMap()
+        .add("e", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Verify structure and schema
+
+    assertEquals(5, rsLoader.schemaVersion());
+    TupleMetadata actualSchema = rootWriter.schema();
+    assertEquals(3, actualSchema.size());
+    assertTrue(actualSchema.metadata(1).isMap());
+    assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+    assertEquals(2, actualSchema.column("m").getChildren().size());
+
+    rsLoader.startBatch();
+
+    // Write a row the way that clients will do.
+
+    ScalarWriter aWriter = rootWriter.scalar("a");
+    TupleWriter mWriter = rootWriter.tuple("m");
+    ScalarWriter cWriter = mWriter.scalar("c");
+    ScalarWriter dWriter = mWriter.scalar("d");
+    ScalarWriter eWriter = rootWriter.scalar("e");
+
+    rootWriter.start();
+    aWriter.setInt(10);
+    cWriter.setInt(110);
+    dWriter.setString("fred");
+    eWriter.setString("pebbles");
+    rootWriter.save();
+
+    // Try adding a duplicate column.
+
+    try {
+      mWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL));
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+
+    // Write another using the test-time conveniences
+
+    rootWriter.addRow(20, new Object[] {210, "barney"}, "bam-bam");
+
+    // Harvest the batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(5, rsLoader.schemaVersion());
+    assertEquals(2, actual.rowCount());
+
+    // Validate data
+
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, new Object[] {110, "fred"}, "pebbles")
+        .addRow(20, new Object[] {210, "barney"}, "bam-bam")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+    rsLoader.close();
+  }
+
+  /**
+   * Create schema with a map, then add columns to the map
+   * after delivering the first batch. The new columns should appear
+   * in the second-batch output.
+   */
+
+  @Test
+  public void testMapEvolution() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .add("b", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(3, rsLoader.schemaVersion());
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    rootWriter
+      .addRow(10, new Object[] {"fred"})
+      .addRow(20, new Object[] {"barney"});
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(3, rsLoader.schemaVersion());
+    assertEquals(2, actual.rowCount());
+
+    // Validate first batch
+
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, new Object[] {"fred"})
+        .addRow(20, new Object[] {"barney"})
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    // Add three columns in the second batch. One before
+    // the batch starts, one before the first row, and one after
+    // the first row.
+
+    TupleWriter mapWriter = rootWriter.tuple("m");
+    mapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REQUIRED));
+
+    rsLoader.startBatch();
+    mapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.BIGINT, DataMode.REQUIRED));
+
+    rootWriter.addRow(30, new Object[] {"wilma", 130, 130_000L});
+
+    mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED));
+    rootWriter.addRow(40, new Object[] {"betty", 140, 140_000L, "bam-bam"});
+
+    actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(6, rsLoader.schemaVersion());
+    assertEquals(2, actual.rowCount());
+
+    // Validate first batch
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .add("b", MinorType.VARCHAR)
+          .add("c", MinorType.INT)
+          .add("d", MinorType.BIGINT)
+          .add("e", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(30, new Object[] {"wilma", 130, 130_000L, ""})
+        .addRow(40, new Object[] {"betty", 140, 140_000L, "bam-bam"})
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test adding a map to a loader after writing the first row.
+   */
+
+  @Test
+  public void testMapAddition() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(1, rsLoader.schemaVersion());
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Start without the map. Add a map after the first row.
+
+    rsLoader.startBatch();
+    rootWriter.addRow(10);
+
+    int mapIndex = rootWriter.addColumn(SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REQUIRED));
+    TupleWriter mapWriter = rootWriter.tuple(mapIndex);
+
+    // Add a column to the map with the same name as the top-level column.
+    // Verifies that the name spaces are independent.
+
+    mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    rootWriter
+      .addRow(20, new Object[]{"fred"})
+      .addRow(30, new Object[]{"barney"});
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(3, rsLoader.schemaVersion());
+    assertEquals(3, actual.rowCount());
+
+    // Validate first batch
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .add("a", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(10, new Object[] {""})
+        .addRow(20, new Object[] {"fred"})
+        .addRow(30, new Object[] {"barney"})
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test adding an empty map to a loader after writing the first row.
+   * Then add columns in another batch. Yes, this is a bizarre condition,
+   * but we must check it anyway for robustness.
+   */
+
+  @Test
+  public void testEmptyMapAddition() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(1, rsLoader.schemaVersion());
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Start without the map. Add a map after the first row.
+
+    rsLoader.startBatch();
+    rootWriter.addRow(10);
+
+    int mapIndex = rootWriter.addColumn(SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REQUIRED));
+    TupleWriter mapWriter = rootWriter.tuple(mapIndex);
+
+    rootWriter
+      .addRow(20, new Object[]{})
+      .addRow(30, new Object[]{});
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(2, rsLoader.schemaVersion());
+    assertEquals(3, actual.rowCount());
+
+    // Validate first batch
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .buildMap()
+        .buildSchema();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(10, new Object[] {})
+        .addRow(20, new Object[] {})
+        .addRow(30, new Object[] {})
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    // Now add another column to the map
+
+    rsLoader.startBatch();
+    mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    rootWriter
+      .addRow(40, new Object[]{"fred"})
+      .addRow(50, new Object[]{"barney"});
+
+    actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(3, rsLoader.schemaVersion());
+    assertEquals(2, actual.rowCount());
+
+    // Validate first batch
+
+    expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .add("a", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(40, new Object[] {"fred"})
+        .addRow(50, new Object[] {"barney"})
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Create nested maps. Then, add columns to each map
+   * on the fly. Use required, variable-width columns since
+   * those require the most processing and are most likely to
+   * fail if anything is out of place.
+   */
+
+  @Test
+  public void testNestedMapsRequired() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m1")
+          .add("b", MinorType.VARCHAR)
+          .addMap("m2")
+            .add("c", MinorType.VARCHAR)
+            .buildMap()
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(5, rsLoader.schemaVersion());
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    rootWriter.addRow(10, new Object[] {"b1", new Object[] {"c1"}});
+
+    // Validate first batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(5, rsLoader.schemaVersion());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, new Object[] {"b1", new Object[] {"c1"}})
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    // Now add columns in the second batch.
+
+    rsLoader.startBatch();
+    rootWriter.addRow(20, new Object[] {"b2", new Object[] {"c2"}});
+
+    TupleWriter m1Writer = rootWriter.tuple("m1");
+    m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED));
+    TupleWriter m2Writer = m1Writer.tuple("m2");
+    m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    rootWriter.addRow(30, new Object[] {"b3", new Object[] {"c3", "e3"}, "d3"});
+
+    // And another set while the write proceeds.
+
+    m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.REQUIRED));
+    m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    rootWriter.addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"});
+
+    // Validate second batch
+
+    actual = fixture.wrap(rsLoader.harvest());
+    assertEquals(9, rsLoader.schemaVersion());
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m1")
+          .add("b", MinorType.VARCHAR)
+          .addMap("m2")
+            .add("c", MinorType.VARCHAR)
+            .add("e", MinorType.VARCHAR)
+            .add("g", MinorType.VARCHAR)
+            .buildMap()
+          .add("d", MinorType.VARCHAR)
+          .add("f", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(20, new Object[] {"b2", new Object[] {"c2", "",   ""  }, "",    "" })
+        .addRow(30, new Object[] {"b3", new Object[] {"c3", "e3", ""  }, "d3",  "" })
+        .addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"})
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Create nested maps. Then, add columns to each map
+   * on the fly. This time, with nullable types.
+   */
+
+  @Test
+  public void testNestedMapsNullable() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m1")
+          .addNullable("b", MinorType.VARCHAR)
+          .addMap("m2")
+            .addNullable("c", MinorType.VARCHAR)
+            .buildMap()
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    rootWriter.addRow(10, new Object[] {"b1", new Object[] {"c1"}});
+
+    // Validate first batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, new Object[] {"b1", new Object[] {"c1"}})
+        .build();
+//    actual.print();
+//    expected.print();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    // Now add columns in the second batch.
+
+    rsLoader.startBatch();
+    rootWriter.addRow(20, new Object[] {"b2", new Object[] {"c2"}});
+
+    TupleWriter m1Writer = rootWriter.tuple("m1");
+    m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL));
+    TupleWriter m2Writer = m1Writer.tuple("m2");
+    m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.OPTIONAL));
+
+    rootWriter.addRow(30, new Object[] {"b3", new Object[] {"c3", "e3"}, "d3"});
+
+    // And another set while the write proceeds.
+
+    m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.OPTIONAL));
+    m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.OPTIONAL));
+
+    rootWriter.addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"});
+
+    // Validate second batch
+
+    actual = fixture.wrap(rsLoader.harvest());
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m1")
+          .addNullable("b", MinorType.VARCHAR)
+          .addMap("m2")
+            .addNullable("c", MinorType.VARCHAR)
+            .addNullable("e", MinorType.VARCHAR)
+            .addNullable("g", MinorType.VARCHAR)
+            .buildMap()
+          .addNullable("d", MinorType.VARCHAR)
+          .addNullable("f", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(20, new Object[] {"b2", new Object[] {"c2", null, null}, null, null})
+        .addRow(30, new Object[] {"b3", new Object[] {"c3", "e3", null}, "d3", null})
+        .addRow(40, new Object[] {"b4", new Object[] {"c4", "e4", "g4"}, "d4", "e4"})
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test a map that contains a scalar array. No reason to suspect that this
+   * will have problem as the array writer is fully tested in the accessor
+   * subsystem. Still, need to test the cardinality methods of the loader
+   * layer.
+   */
+
+  @Test
+  public void testMapWithArray() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .addArray("c", MinorType.INT)
+          .addArray("d", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Write some rows
+
+    rsLoader.startBatch();
+    rootWriter
+      .addRow(10, new Object[] {new int[] {110, 120, 130},
+                                new String[] {"d1.1", "d1.2", "d1.3", "d1.4"}})
+      .addRow(20, new Object[] {new int[] {210}, new String[] {}})
+      .addRow(30, new Object[] {new int[] {}, new String[] {"d3.1"}})
+      ;
+
+    // Validate first batch
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+    SingleRowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, new Object[] {new int[] {110, 120, 130},
+                                  new String[] {"d1.1", "d1.2", "d1.3", "d1.4"}})
+        .addRow(20, new Object[] {new int[] {210}, new String[] {}})
+        .addRow(30, new Object[] {new int[] {}, new String[] {"d3.1"}})
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    // Add another array after the first row in the second batch.
+
+    rsLoader.startBatch();
+    rootWriter
+      .addRow(40, new Object[] {new int[] {410, 420}, new String[] {"d4.1", "d4.2"}})
+      .addRow(50, new Object[] {new int[] {510}, new String[] {"d5.1"}})
+      ;
+
+    TupleWriter mapWriter = rootWriter.tuple("m");
+    mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REPEATED));
+    rootWriter
+      .addRow(60, new Object[] {new int[] {610, 620}, new String[] {"d6.1", "d6.2"}, new String[] {"e6.1", "e6.2"}})
+      .addRow(70, new Object[] {new int[] {710}, new String[] {}, new String[] {"e7.1", "e7.2"}})
+      ;
+
+    // Validate first batch. The new array should have been back-filled with
+    // empty offsets for the missing rows.
+
+    actual = fixture.wrap(rsLoader.harvest());
+//    System.out.println(actual.schema().toString());
+    expected = fixture.rowSetBuilder(actual.schema())
+        .addRow(40, new Object[] {new int[] {410, 420}, new String[] {"d4.1", "d4.2"}, new String[] {}})
+        .addRow(50, new Object[] {new int[] {510}, new String[] {"d5.1"}, new String[] {}})
+        .addRow(60, new Object[] {new int[] {610, 620}, new String[] {"d6.1", "d6.2"}, new String[] {"e6.1", "e6.2"}})
+        .addRow(70, new Object[] {new int[] {710}, new String[] {}, new String[] {"e7.1", "e7.2"}})
+        .build();
+//    expected.print();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Create a schema with a map, then trigger an overflow on one of the columns
+   * in the map. Proper overflow handling should occur regardless of nesting
+   * depth.
+   */
+
+  @Test
+  public void testMapWithOverflow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m1")
+          .add("b", MinorType.INT)
+          .addMap("m2")
+            .add("c", MinorType.INT) // Before overflow, written
+            .add("d", MinorType.VARCHAR)
+            .add("e", MinorType.INT) // After overflow, not yet written
+            .buildMap()
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    rsLoader.startBatch();
+    while (! rootWriter.isFull()) {
+      rootWriter.addRow(count, new Object[] {count * 10, new Object[] {count * 100, value, count * 1000}});
+      count++;
+    }
+
+    // Our row count should include the overflow row
+
+    int expectedCount = ValueVector.MAX_BUFFER_SIZE / value.length;
+    assertEquals(expectedCount + 1, count);
+
+    // Loader's row count should include only "visible" rows
+
+    assertEquals(expectedCount, rootWriter.rowCount());
+
+    // Total count should include invisible and look-ahead rows.
+
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+    // Result should exclude the overflow row
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(expectedCount, result.rowCount());
+    result.clear();
+
+    // Next batch should start with the overflow row
+
+    rsLoader.startBatch();
+    assertEquals(1, rootWriter.rowCount());
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(1, result.rowCount());
+    result.clear();
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test the case in which a new column is added during the overflow row. Unlike
+   * the top-level schema case, internally we must create a copy of the map, and
+   * move vectors across only when the result is to include the schema version
+   * of the target column. For overflow, the new column is added after the
+   * first batch; it is added in the second batch that contains the overflow
+   * row in which the column was added.
+   */
+
+  @Test
+  public void testMapOverflowWithNewColumn() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .add("b", MinorType.INT)
+          .add("c", MinorType.VARCHAR)
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    assertEquals(4, rsLoader.schemaVersion());
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Can't use the shortcut to populate rows when doing a schema
+    // change.
+
+    ScalarWriter aWriter = rootWriter.scalar("a");
+    TupleWriter mWriter = rootWriter.tuple("m");
+    ScalarWriter bWriter = mWriter.scalar("b");
+    ScalarWriter cWriter = mWriter.scalar("c");
+
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    rsLoader.startBatch();
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      aWriter.setInt(count);
+      bWriter.setInt(count * 10);
+      cWriter.setBytes(value, value.length);
+      if (rootWriter.isFull()) {
+
+        // Overflow just occurred. Add another column.
+
+        mWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.INT, DataMode.OPTIONAL));
+        mWriter.scalar("d").setInt(count * 100);
+      }
+      rootWriter.save();
+      count++;
+    }
+
+    // Result set should include the original columns, but not d.
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+
+    assertEquals(4, rsLoader.schemaVersion());
+    assertTrue(schema.isEquivalent(result.schema()));
+    BatchSchema expectedSchema = new BatchSchema(SelectionVectorMode.NONE, schema.toFieldList());
+    assertTrue(expectedSchema.isEquivalent(result.batchSchema()));
+
+    // Use a reader to validate row-by-row. Too large to create an expected
+    // result set.
+
+    RowSetReader reader = result.reader();
+    TupleReader mapReader = reader.tuple("m");
+    int rowId = 0;
+    while (reader.next()) {
+      assertEquals(rowId, reader.scalar("a").getInt());
+      assertEquals(rowId * 10, mapReader.scalar("b").getInt());
+      assertTrue(Arrays.equals(value, mapReader.scalar("c").getBytes()));
+      rowId++;
+    }
+    result.clear();
+
+    // Next batch should start with the overflow row
+
+    rsLoader.startBatch();
+    assertEquals(1, rootWriter.rowCount());
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(1, result.rowCount());
+
+    reader = result.reader();
+    mapReader = reader.tuple("m");
+    while (reader.next()) {
+      assertEquals(rowId, reader.scalar("a").getInt());
+      assertEquals(rowId * 10, mapReader.scalar("b").getInt());
+      assertTrue(Arrays.equals(value, mapReader.scalar("c").getBytes()));
+      assertEquals(rowId * 100, mapReader.scalar("d").getInt());
+    }
+    result.clear();
+
+    rsLoader.close();
+  }
+
+  /**
+   * Version of the {#link TestResultSetLoaderProtocol#testOverwriteRow()} test
+   * that uses nested columns.
+   */
+
+  @Test
+  public void testOverwriteRow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addMap("m")
+          .add("b", MinorType.INT)
+          .add("c", MinorType.VARCHAR)
+        .buildMap()
+      .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Can't use the shortcut to populate rows when doing overwrites.
+
+    ScalarWriter aWriter = rootWriter.scalar("a");
+    TupleWriter mWriter = rootWriter.tuple("m");
+    ScalarWriter bWriter = mWriter.scalar("b");
+    ScalarWriter cWriter = mWriter.scalar("c");
+
+    // Write 100,000 rows, overwriting 99% of them. This will cause vector
+    // overflow and data corruption if overwrite does not work; but will happily
+    // produce the correct result if everything works as it should.
+
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    rsLoader.startBatch();
+    while (count < 100_000) {
+      rootWriter.start();
+      count++;
+      aWriter.setInt(count);
+      bWriter.setInt(count * 10);
+      cWriter.setBytes(value, value.length);
+      if (count % 100 == 0) {
+        rootWriter.save();
+      }
+    }
+
+    // Verify using a reader.
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(count / 100, result.rowCount());
+    RowSetReader reader = result.reader();
+    TupleReader mReader = reader.tuple("m");
+    int rowId = 1;
+    while (reader.next()) {
+      assertEquals(rowId * 100, reader.scalar("a").getInt());
+      assertEquals(rowId * 1000, mReader.scalar("b").getInt());
+      assertTrue(Arrays.equals(value, mReader.scalar("c").getBytes()));
+      rowId++;
+    }
+
+    result.clear();
+    rsLoader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java
new file mode 100644
index 0000000..2c4c87b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOmittedValues.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+public class TestResultSetLoaderOmittedValues extends SubOperatorTest {
+
+  /**
+   * Test "holes" in the middle of a batch, and unset columns at
+   * the end. Ending the batch should fill in missing values.
+   */
+
+  @Test
+  public void testOmittedValuesAtEnd() {
+
+    // Create columns up front
+
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .addNullable("c", MinorType.VARCHAR)
+        .add("d", MinorType.INT)
+        .addNullable("e", MinorType.INT)
+        .addArray("f", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    int rowCount = 0;
+    ScalarWriter arrayWriter;
+    for (int i = 0; i < 2;  i++) { // Row 0, 1
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setInt(rowCount);
+      rootWriter.scalar(1).setString("b_" + rowCount);
+      rootWriter.scalar(2).setString("c_" + rowCount);
+      rootWriter.scalar(3).setInt(rowCount * 10);
+      rootWriter.scalar(4).setInt(rowCount * 100);
+      arrayWriter = rootWriter.column(5).array().scalar();
+      arrayWriter.setString("f_" + rowCount + "-1");
+      arrayWriter.setString("f_" + rowCount + "-2");
+      rootWriter.save();
+    }
+
+    // Holes in half the columns
+
+    for (int i = 0; i < 2;  i++) { // Rows 2, 3
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setInt(rowCount);
+      rootWriter.scalar(1).setString("b_" + rowCount);
+      rootWriter.scalar(3).setInt(rowCount * 10);
+      arrayWriter = rootWriter.column(5).array().scalar();
+      arrayWriter.setString("f_" + rowCount + "-1");
+      arrayWriter.setString("f_" + rowCount + "-2");
+      rootWriter.save();
+    }
+
+    // Holes in the other half
+
+    for (int i = 0; i < 2;  i++) { // Rows 4, 5
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setInt(rowCount);
+      rootWriter.scalar(2).setString("c_" + rowCount);
+      rootWriter.scalar(4).setInt(rowCount * 100);
+      rootWriter.save();
+    }
+
+    // All columns again.
+
+    for (int i = 0; i < 2;  i++) { // Rows 6, 7
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setInt(rowCount);
+      rootWriter.scalar(1).setString("b_" + rowCount);
+      rootWriter.scalar(2).setString("c_" + rowCount);
+      rootWriter.scalar(3).setInt(rowCount * 10);
+      rootWriter.scalar(4).setInt(rowCount * 100);
+      arrayWriter = rootWriter.column(5).array().scalar();
+      arrayWriter.setString("f_" + rowCount + "-1");
+      arrayWriter.setString("f_" + rowCount + "-2");
+      rootWriter.save();
+    }
+
+    // Omit all but the key column at end
+
+    for (int i = 0; i < 2;  i++) { // Rows 8, 9
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setInt(rowCount);
+      rootWriter.save();
+    }
+
+    // Harvest the row and verify.
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+//    actual.print();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .addNullable("c", MinorType.VARCHAR)
+        .add("3", MinorType.INT)
+        .addNullable("e", MinorType.INT)
+        .addArray("f", MinorType.VARCHAR)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(  1, "b_1", "c_1",  10,  100, new String[] {"f_1-1",  "f_1-2"})
+        .addRow(  2, "b_2", "c_2",  20,  200, new String[] {"f_2-1",  "f_2-2"})
+        .addRow(  3, "b_3", null,   30, null, new String[] {"f_3-1",  "f_3-2"})
+        .addRow(  4, "b_4", null,   40, null, new String[] {"f_4-1",  "f_4-2"})
+        .addRow(  5, "",    "c_5",   0,  500, new String[] {})
+        .addRow(  6, "",    "c_6",   0,  600, new String[] {})
+        .addRow(  7, "b_7", "c_7",  70,  700, new String[] {"f_7-1",  "f_7-2"})
+        .addRow(  8, "b_8", "c_8",  80,  800, new String[] {"f_8-1",  "f_8-2"})
+        .addRow(  9, "",    null,    0, null, new String[] {})
+        .addRow( 10, "",    null,    0, null, new String[] {})
+        .build();
+
+    new RowSetComparison(expected)
+        .verifyAndClearAll(actual);
+    rsLoader.close();
+  }
+
+  /**
+   * Test "holes" at the end of a batch when batch overflows. Completed
+   * batch must be finalized correctly, new batch initialized correct,
+   * for the missing values.
+   */
+
+  @Test
+  public void testOmittedValuesAtEndWithOverflow() {
+    TupleMetadata schema = new SchemaBuilder()
+        // Row index
+        .add("a", MinorType.INT)
+        // Column that forces overflow
+        .add("b", MinorType.VARCHAR)
+        // Column with all holes
+        .addNullable("c", MinorType.VARCHAR)
+        // Column with some holes
+        .addNullable("d", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Fill the batch. Column d has some values. Column c is worst case: no values.
+
+    rsLoader.startBatch();
+    byte value[] = new byte[533];
+    Arrays.fill(value, (byte) 'X');
+    int rowNumber = 0;
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rowNumber++;
+      rootWriter.scalar(0).setInt(rowNumber);
+      rootWriter.scalar(1).setBytes(value, value.length);
+      if (rowNumber < 10_000) {
+        rootWriter.scalar(3).setString("d-" + rowNumber);
+      }
+      rootWriter.save();
+      assertEquals(rowNumber, rsLoader.totalRowCount());
+    }
+
+    // Harvest and verify
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(rowNumber - 1, result.rowCount());
+    RowSetReader reader = result.reader();
+    int rowIndex = 0;
+    while (reader.next()) {
+      int expectedRowNumber = 1 + rowIndex;
+      assertEquals(expectedRowNumber, reader.scalar(0).getInt());
+      assertTrue(reader.scalar(2).isNull());
+      if (expectedRowNumber < 10_000) {
+        assertEquals("d-" + expectedRowNumber, reader.scalar(3).getString());
+      } else {
+        assertTrue(reader.scalar(3).isNull());
+      }
+      rowIndex++;
+    }
+
+    // Start count for this batch is one less than current
+    // count, because of the overflow row.
+
+    int startRowNumber = rowNumber;
+
+    // Write a few more rows to the next batch
+
+    rsLoader.startBatch();
+    for (int i = 0; i < 10; i++) {
+      rootWriter.start();
+      rowNumber++;
+      rootWriter.scalar(0).setInt(rowNumber);
+      rootWriter.scalar(1).setBytes(value, value.length);
+      if (i > 5) {
+        rootWriter.scalar(3).setString("d-" + rowNumber);
+      }
+      rootWriter.save();
+      assertEquals(rowNumber, rsLoader.totalRowCount());
+    }
+
+    // Verify that holes were preserved.
+
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(rowNumber, rsLoader.totalRowCount());
+    assertEquals(rowNumber - startRowNumber + 1, result.rowCount());
+//    result.print();
+    reader = result.reader();
+    rowIndex = 0;
+    while (reader.next()) {
+      int expectedRowNumber = startRowNumber + rowIndex;
+      assertEquals(expectedRowNumber, reader.scalar(0).getInt());
+      assertTrue(reader.scalar(2).isNull());
+      if (rowIndex > 6) {
+        assertEquals("d-" + expectedRowNumber, reader.scalar(3).getString());
+      } else {
+        assertTrue("Row " + rowIndex + " col d should be null", reader.scalar(3).isNull());
+      }
+      rowIndex++;
+    }
+    assertEquals(rowIndex, 11);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test that omitting the call to saveRow() effectively discards
+   * the row. Note that the vectors still contain values in the
+   * discarded position; just the various pointers are unset. If
+   * the batch ends before the discarded values are overwritten, the
+   * discarded values just exist at the end of the vector. Since vectors
+   * start with garbage contents, the discarded values are simply a different
+   * kind of garbage. But, if the client writes a new row, then the new
+   * row overwrites the discarded row. This works because we only change
+   * the tail part of a vector; never the internals.
+   */
+
+  @Test
+  public void testSkipRows() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    int rowNumber = 0;
+    for (int i = 0; i < 14; i++) {
+      rootWriter.start();
+      rowNumber++;
+      rootWriter.scalar(0).setInt(rowNumber);
+      if (i % 3 == 0) {
+        rootWriter.scalar(1).setNull();
+      } else {
+        rootWriter.scalar(1).setString("b-" + rowNumber);
+      }
+      if (i % 2 == 0) {
+        rootWriter.save();
+      }
+    }
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+//    result.print();
+    SingleRowSet expected = fixture.rowSetBuilder(result.batchSchema())
+        .addRow( 1, null)
+        .addRow( 3, "b-3")
+        .addRow( 5, "b-5")
+        .addRow( 7, null)
+        .addRow( 9, "b-9")
+        .addRow(11, "b-11")
+        .addRow(13, null)
+        .build();
+//    expected.print();
+    new RowSetComparison(expected)
+      .verifyAndClearAll(result);
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test that discarding a row works even if that row happens to be an
+   * overflow row.
+   */
+
+  @Test
+  public void testSkipOverflowRow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rootWriter.scalar(0).setInt(count);
+      rootWriter.scalar(1).setBytes(value, value.length);
+
+      // Relies on fact that isFull becomes true right after
+      // a vector overflows; don't have to wait for saveRow().
+      // Keep all rows, but discard the overflow row.
+
+      if (! rootWriter.isFull()) {
+        rootWriter.save();
+      }
+      count++;
+    }
+
+    // Discard the results.
+
+    rsLoader.harvest().zeroVectors();
+
+    // Harvest the next batch. Will be empty (because overflow row
+    // was discarded.)
+
+    rsLoader.startBatch();
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(0, result.rowCount());
+    result.clear();
+
+    rsLoader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java
new file mode 100644
index 0000000..0146cfe
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarElementReader;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Exercise the vector overflow functionality for the result set loader.
+ */
+
+public class TestResultSetLoaderOverflow extends SubOperatorTest {
+
+  /**
+   * Test that the writer detects a vector overflow. The offending column
+   * value should be moved to the next batch.
+   */
+
+  @Test
+  public void testVectorSizeLimit() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("s", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rootWriter.scalar(0).setBytes(value, value.length);
+      rootWriter.save();
+      count++;
+    }
+
+    // Number of rows should be driven by vector size.
+    // Our row count should include the overflow row
+
+    int expectedCount = ValueVector.MAX_BUFFER_SIZE / value.length;
+    assertEquals(expectedCount + 1, count);
+
+    // Loader's row count should include only "visible" rows
+
+    assertEquals(expectedCount, rootWriter.rowCount());
+
+    // Total count should include invisible and look-ahead rows.
+
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+    // Result should exclude the overflow row
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(expectedCount, result.rowCount());
+    result.clear();
+
+    // Next batch should start with the overflow row
+
+    rsLoader.startBatch();
+    assertEquals(1, rootWriter.rowCount());
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(1, result.rowCount());
+    result.clear();
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test that the writer detects a vector overflow. The offending column
+   * value should be moved to the next batch.
+   */
+
+  @Test
+  public void testBatchSizeLimit() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("s", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .setBatchSizeLimit(
+            8 * 1024 * 1024 + // Data
+            2 * ValueVector.MAX_ROW_COUNT * 4) // Offsets, doubled because of +1
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rootWriter.scalar(0).setBytes(value, value.length);
+      rootWriter.save();
+      count++;
+    }
+
+    // Our row count should include the overflow row
+
+    int expectedCount = 8 * 1024 * 1024 / value.length;
+    assertEquals(expectedCount + 1, count);
+
+    // Loader's row count should include only "visible" rows
+
+    assertEquals(expectedCount, rootWriter.rowCount());
+
+    // Total count should include invisible and look-ahead rows.
+
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+    // Result should exclude the overflow row
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(expectedCount, result.rowCount());
+    result.clear();
+
+    // Next batch should start with the overflow row
+
+    rsLoader.startBatch();
+    assertEquals(1, rootWriter.rowCount());
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(1, result.rowCount());
+    result.clear();
+
+    rsLoader.close();
+  }
+
+  /**
+   * Load a batch to overflow. Then, close the loader with the overflow
+   * batch unharvested. The Loader should release the memory allocated
+   * to the unused overflow vectors.
+   */
+
+  @Test
+  public void testCloseWithOverflow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("s", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rootWriter.scalar(0).setBytes(value, value.length);
+      rootWriter.save();
+      count++;
+    }
+
+    assertTrue(count < ValueVector.MAX_ROW_COUNT);
+
+    // Harvest the full batch
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    result.clear();
+
+    // Close without harvesting the overflow batch.
+
+    rsLoader.close();
+  }
+
+  /**
+   * Case where a single array fills up the vector to the maximum size
+   * limit. Overflow won't work here; the attempt will fail with a user
+   * exception.
+   */
+
+  @Test
+  public void testOversizeArray() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("s", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Create a single array as the column value in the first row. When
+    // this overflows, an exception is thrown since overflow is not possible.
+
+    rsLoader.startBatch();
+    byte value[] = new byte[473];
+    Arrays.fill(value, (byte) 'X');
+    rootWriter.start();
+    ScalarWriter array = rootWriter.array(0).scalar();
+    try {
+      for (int i = 0; i < ValueVector.MAX_ROW_COUNT; i++) {
+        array.setBytes(value, value.length);
+      }
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("column value is larger than the maximum"));
+    }
+    rsLoader.close();
+  }
+
+  /**
+   * Test a row with a single array column which overflows. Verifies
+   * that all the fiddly bits about offset vectors and so on works
+   * correctly. Run this test (the simplest case) if you change anything
+   * about the array handling code.
+   */
+
+  @Test
+  public void testSizeLimitOnArray() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("s", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Fill batch with rows of with a single array, three values each. Tack on
+    // a suffix to each so we can be sure the proper data is written and moved
+    // to the overflow batch.
+
+    rsLoader.startBatch();
+    byte value[] = new byte[473];
+    Arrays.fill(value, (byte) 'X');
+    String strValue = new String(value, Charsets.UTF_8);
+    int count = 0;
+    int rowSize = 0;
+    int totalSize = 0;
+    int valuesPerArray = 13;
+    while (rootWriter.start()) {
+      totalSize += rowSize;
+      rowSize = 0;
+      ScalarWriter array = rootWriter.array(0).scalar();
+      for (int i = 0; i < valuesPerArray; i++) {
+        String cellValue = strValue + (count + 1) + "." + i;
+        array.setString(cellValue);
+        rowSize += cellValue.length();
+      }
+      rootWriter.save();
+      count++;
+    }
+
+    // Row count should include the overflow row.
+
+    int expectedCount = count - 1;
+
+    // Size without overflow row should fit in the vector, size
+    // with overflow should not.
+
+    assertTrue(totalSize <= ValueVector.MAX_BUFFER_SIZE);
+    assertTrue(totalSize + rowSize > ValueVector.MAX_BUFFER_SIZE);
+
+    // Result should exclude the overflow row. Last row
+    // should hold the last full array.
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(expectedCount, result.rowCount());
+    RowSetReader reader = result.reader();
+    reader.set(expectedCount - 1);
+    ScalarElementReader arrayReader = reader.column(0).elements();
+    assertEquals(valuesPerArray, arrayReader.size());
+    for (int i = 0; i < valuesPerArray; i++) {
+      String cellValue = strValue + (count - 1) + "." + i;
+      assertEquals(cellValue, arrayReader.getString(i));
+    }
+    result.clear();
+
+    // Next batch should start with the overflow row.
+    // The only row in this next batch should be the whole
+    // array being written at the time of overflow.
+
+    rsLoader.startBatch();
+//    VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) rootWriter.array(0).scalar()).vector(), 0, 5);
+//    ((ResultSetLoaderImpl) rsLoader).dump(new HierarchicalPrinter());
+    assertEquals(1, rootWriter.rowCount());
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+    result = fixture.wrap(rsLoader.harvest());
+//    VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) rootWriter.array(0).scalar()).vector(), 0, 5);
+    assertEquals(1, result.rowCount());
+    reader = result.reader();
+    reader.next();
+    arrayReader = reader.column(0).elements();
+    assertEquals(valuesPerArray, arrayReader.size());
+    for (int i = 0; i < valuesPerArray; i++) {
+      String cellValue = strValue + (count) + "." + i;
+      assertEquals(cellValue, arrayReader.getString(i));
+    }
+    result.clear();
+
+    rsLoader.close();
+  }
+
+  /**
+   * Test the complete set of array overflow cases:
+   * <ul>
+   * <li>Array a is written before the column that has overflow,
+   * and must be copied, in its entirety, to the overflow row.</li>
+   * <li>Column b causes the overflow.</li>
+   * <li>Column c is written after the overflow, and should go
+   * to the look-ahead row.</li>
+   * <li>Column d is written for a while, then has empties before
+   * the overflow row, but is written in the overflow row.<li>
+   * <li>Column e is like d, but is not written in the overflow
+   * row.</li>
+   */
+
+  @Test
+  public void testArrayOverflowWithOtherArrays() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("a", MinorType.INT)
+        .addArray("b", MinorType.VARCHAR)
+        .addArray("c", MinorType.INT)
+        .addArray("d", MinorType.INT)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Fill batch with rows of with a single array, three values each. Tack on
+    // a suffix to each so we can be sure the proper data is written and moved
+    // to the overflow batch.
+
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    String strValue = new String(value, Charsets.UTF_8);
+
+    int aCount = 3;
+    int bCount = 11;
+    int cCount = 5;
+    int dCount = 7;
+
+    int cCutoff = ValueVector.MAX_BUFFER_SIZE / value.length / bCount / 2;
+
+    ScalarWriter aWriter = rootWriter.array("a").scalar();
+    ScalarWriter bWriter = rootWriter.array("b").scalar();
+    ScalarWriter cWriter = rootWriter.array("c").scalar();
+    ScalarWriter dWriter = rootWriter.array("d").scalar();
+
+    int count = 0;
+    rsLoader.startBatch();
+    while (rootWriter.start()) {
+      if (rootWriter.rowCount() == 2952) {
+        count = count + 0;
+      }
+      for (int i = 0; i < aCount; i++) {
+        aWriter.setInt(count * aCount + i);
+      }
+      for (int i = 0; i < bCount; i++) {
+        String cellValue = strValue + (count * bCount + i);
+        bWriter.setString(cellValue);
+      }
+      if (count < cCutoff) {
+        for (int i = 0; i < cCount; i++) {
+          cWriter.setInt(count * cCount + i);
+        }
+      }
+
+      // Relies on fact that isFull becomes true right after
+      // a vector overflows; don't have to wait for saveRow().
+
+      if (count < cCutoff || rootWriter.isFull()) {
+        for (int i = 0; i < dCount; i++) {
+          dWriter.setInt(count * dCount + i);
+        }
+      }
+      rootWriter.save();
+      count++;
+    }
+
+    // Verify
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(count - 1, result.rowCount());
+
+    RowSetReader reader = result.reader();
+    ScalarElementReader aReader = reader.array("a").elements();
+    ScalarElementReader bReader = reader.array("b").elements();
+    ScalarElementReader cReader = reader.array("c").elements();
+    ScalarElementReader dReader = reader.array("d").elements();
+
+    while (reader.next()) {
+      int rowId = reader.rowIndex();
+      assertEquals(aCount, aReader.size());
+      for (int i = 0; i < aCount; i++) {
+        assertEquals(rowId * aCount + i, aReader.getInt(i));
+      }
+      assertEquals(bCount, bReader.size());
+      for (int i = 0; i < bCount; i++) {
+        String cellValue = strValue + (rowId * bCount + i);
+        assertEquals(cellValue, bReader.getString(i));
+      }
+      if (rowId < cCutoff) {
+        assertEquals(cCount, cReader.size());
+        for (int i = 0; i < cCount; i++) {
+          assertEquals(rowId * cCount + i, cReader.getInt(i));
+        }
+        assertEquals(dCount, dReader.size());
+        for (int i = 0; i < dCount; i++) {
+          assertEquals(rowId * dCount + i, dReader.getInt(i));
+        }
+      } else {
+        assertEquals(0, cReader.size());
+        assertEquals(0, dReader.size());
+      }
+    }
+    result.clear();
+    int firstCount = count - 1;
+
+    // One row is in the batch. Write more, skipping over the
+    // initial few values for columns c and d. Column d has a
+    // roll-over value, c has an empty roll-over.
+
+    rsLoader.startBatch();
+    for (int j = 0; j < 5; j++) {
+      rootWriter.start();
+      for (int i = 0; i < aCount; i++) {
+        aWriter.setInt(count * aCount + i);
+      }
+      for (int i = 0; i < bCount; i++) {
+        String cellValue = strValue + (count * bCount + i);
+        bWriter.setString(cellValue);
+      }
+      if (j > 3) {
+        for (int i = 0; i < cCount; i++) {
+          cWriter.setInt(count * cCount + i);
+        }
+        for (int i = 0; i < dCount; i++) {
+          dWriter.setInt(count * dCount + i);
+        }
+      }
+      rootWriter.save();
+      count++;
+    }
+
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(6, result.rowCount());
+
+    reader = result.reader();
+    aReader = reader.array("a").elements();
+    bReader = reader.array("b").elements();
+    cReader = reader.array("c").elements();
+    dReader = reader.array("d").elements();
+
+    int j = 0;
+    while (reader.next()) {
+      int rowId = firstCount + reader.rowIndex();
+      assertEquals(aCount, aReader.size());
+      for (int i = 0; i < aCount; i++) {
+        assertEquals("Index " + i, rowId * aCount + i, aReader.getInt(i));
+      }
+      assertEquals(bCount, bReader.size());
+      for (int i = 0; i < bCount; i++) {
+        String cellValue = strValue + (rowId * bCount + i);
+        assertEquals(cellValue, bReader.getString(i));
+      }
+      if (j > 4) {
+        assertEquals(cCount, cReader.size());
+        for (int i = 0; i < cCount; i++) {
+          assertEquals(rowId * cCount + i, cReader.getInt(i));
+        }
+      } else {
+        assertEquals(0, cReader.size());
+      }
+      if (j == 0 || j > 4) {
+        assertEquals(dCount, dReader.size());
+        for (int i = 0; i < dCount; i++) {
+          assertEquals(rowId * dCount + i, dReader.getInt(i));
+        }
+      } else {
+        assertEquals(0, dReader.size());
+      }
+      j++;
+    }
+    result.clear();
+
+    rsLoader.close();
+  }
+
+  /**
+   * Create an array that contains more than 64K values. Drill has no numeric
+   * limit on array lengths. (Well, it does, but the limit is about 2 billion
+   * which, even for bytes, is too large to fit into a vector...)
+   */
+
+  @Test
+  public void testLargeArray() {
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator());
+    RowSetLoader rootWriter = rsLoader.writer();
+    MaterializedField field = SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REPEATED);
+    rootWriter.addColumn(field);
+
+    // Create a single array as the column value in the first row. When
+    // this overflows, an exception is thrown since overflow is not possible.
+
+    rsLoader.startBatch();
+    rootWriter.start();
+    ScalarWriter array = rootWriter.array(0).scalar();
+    try {
+      for (int i = 0; i < Integer.MAX_VALUE; i++) {
+        array.setInt(i+1);
+      }
+      fail();
+    } catch (UserException e) {
+      // Expected
+    }
+    rsLoader.close();
+  }
+
+  /**
+   * Test the case that an array has "missing values" before the overflow.
+   */
+
+  @Test
+  public void testMissingArrayValues() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .addArray("c", MinorType.INT)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+
+    int blankAfter = ValueVector.MAX_BUFFER_SIZE / 512 * 2 / 3;
+    ScalarWriter cWriter = rootWriter.array("c").scalar();
+
+    rsLoader.startBatch();
+    int rowId = 0;
+    while (rootWriter.start()) {
+      rootWriter.scalar("a").setInt(rowId);
+      rootWriter.scalar("b").setBytes(value, value.length);
+      if (rowId < blankAfter) {
+        for (int i = 0; i < 3; i++) {
+          cWriter.setInt(rowId * 3 + i);
+        }
+      }
+      rootWriter.save();
+      rowId++;
+    }
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(rowId - 1, result.rowCount());
+    RowSetReader reader = result.reader();
+    ScalarElementReader cReader = reader.array("c").elements();
+    while (reader.next()) {
+      assertEquals(reader.rowIndex(), reader.scalar("a").getInt());
+      assertTrue(Arrays.equals(value, reader.scalar("b").getBytes()));
+      if (reader.rowIndex() < blankAfter) {
+        assertEquals(3, cReader.size());
+        for (int i = 0; i < 3; i++) {
+          assertEquals(reader.rowIndex() * 3 + i, cReader.getInt(i));
+        }
+      } else {
+        assertEquals(0, cReader.size());
+      }
+    }
+    result.clear();
+    rsLoader.close();
+  }
+
+  @Test
+  public void testOverflowWithNullables() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("n", MinorType.INT)
+        .addNullable("a", MinorType.VARCHAR)
+        .addNullable("b", MinorType.VARCHAR)
+        .addNullable("c", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rootWriter.scalar(0).setInt(count);
+      rootWriter.scalar(1).setNull();
+      rootWriter.scalar(2).setBytes(value, value.length);
+      rootWriter.scalar(3).setNull();
+      rootWriter.save();
+      count++;
+    }
+
+    // Result should exclude the overflow row
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(count - 1, result.rowCount());
+
+    RowSetReader reader = result.reader();
+    while (reader.next()) {
+      assertEquals(reader.rowIndex(), reader.scalar(0).getInt());
+      assertTrue(reader.scalar(1).isNull());
+      assertTrue(Arrays.equals(value, reader.scalar(2).getBytes()));
+      assertTrue(reader.scalar(3).isNull());
+    }
+    result.clear();
+
+    // Next batch should start with the overflow row
+
+    rsLoader.startBatch();
+    result = fixture.wrap(rsLoader.harvest());
+    reader = result.reader();
+    assertEquals(1, result.rowCount());
+    assertTrue(reader.next());
+    assertEquals(count - 1, reader.scalar(0).getInt());
+    assertTrue(reader.scalar(1).isNull());
+    assertTrue(Arrays.equals(value, reader.scalar(2).getBytes()));
+    assertTrue(reader.scalar(3).isNull());
+    result.clear();
+
+    rsLoader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
new file mode 100644
index 0000000..5c6ff7b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProjection.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test of the basics of the projection mechanism.
+ */
+
+public class TestResultSetLoaderProjection extends SubOperatorTest {
+
+  @Test
+  public void testProjectionMap() {
+
+    // Null map means everything is projected
+
+    {
+      ProjectionSet projSet = ProjectionSetImpl.parse(null);
+      assertTrue(projSet instanceof NullProjectionSet);
+      assertTrue(projSet.isProjected("foo"));
+    }
+
+    // Empty list means everything is projected
+
+    {
+      ProjectionSet projSet = ProjectionSetImpl.parse(new ArrayList<SchemaPath>());
+      assertTrue(projSet instanceof NullProjectionSet);
+      assertTrue(projSet.isProjected("foo"));
+    }
+
+    // Simple non-map columns
+
+    {
+      List<SchemaPath> projCols = new ArrayList<>();
+      projCols.add(SchemaPath.getSimplePath("foo"));
+      projCols.add(SchemaPath.getSimplePath("bar"));
+      ProjectionSet projSet = ProjectionSetImpl.parse(projCols);
+      assertTrue(projSet instanceof ProjectionSetImpl);
+      assertTrue(projSet.isProjected("foo"));
+      assertTrue(projSet.isProjected("bar"));
+      assertFalse(projSet.isProjected("mumble"));
+    }
+
+    // Whole-map projection (note, fully projected maps are
+    // identical to projected simple columns at this level of
+    // abstraction.)
+
+    {
+      List<SchemaPath> projCols = new ArrayList<>();
+      projCols.add(SchemaPath.getSimplePath("map"));
+      ProjectionSet projSet = ProjectionSetImpl.parse(projCols);
+      assertTrue(projSet instanceof ProjectionSetImpl);
+      assertTrue(projSet.isProjected("map"));
+      assertFalse(projSet.isProjected("another"));
+      ProjectionSet mapProj = projSet.mapProjection("map");
+      assertNotNull(mapProj);
+      assertTrue(mapProj instanceof NullProjectionSet);
+      assertTrue(mapProj.isProjected("foo"));
+      assertNotNull(projSet.mapProjection("another"));
+      assertFalse(projSet.mapProjection("another").isProjected("anyCol"));
+    }
+
+    // Selected map projection, multiple levels, full projection
+    // at leaf level.
+
+    {
+      List<SchemaPath> projCols = new ArrayList<>();
+      projCols.add(SchemaPath.getCompoundPath("map", "a"));
+      projCols.add(SchemaPath.getCompoundPath("map", "b"));
+      projCols.add(SchemaPath.getCompoundPath("map", "map2", "x"));
+      ProjectionSet projSet = ProjectionSetImpl.parse(projCols);
+      assertTrue(projSet instanceof ProjectionSetImpl);
+      assertTrue(projSet.isProjected("map"));
+
+      // Map: an explicit map at top level
+
+      ProjectionSet mapProj = projSet.mapProjection("map");
+      assertTrue(mapProj instanceof ProjectionSetImpl);
+      assertTrue(mapProj.isProjected("a"));
+      assertTrue(mapProj.isProjected("b"));
+      assertTrue(mapProj.isProjected("map2"));
+      assertFalse(projSet.isProjected("bogus"));
+
+      // Map b: an implied nested map
+
+      ProjectionSet bMapProj = mapProj.mapProjection("b");
+      assertNotNull(bMapProj);
+      assertTrue(bMapProj instanceof NullProjectionSet);
+      assertTrue(bMapProj.isProjected("foo"));
+
+      // Map2, an nested map, has an explicit projection
+
+      ProjectionSet map2Proj = mapProj.mapProjection("map2");
+      assertNotNull(map2Proj);
+      assertTrue(map2Proj instanceof ProjectionSetImpl);
+      assertTrue(map2Proj.isProjected("x"));
+      assertFalse(map2Proj.isProjected("bogus"));
+    }
+  }
+
+  /**
+   * Test imposing a selection mask between the client and the underlying
+   * vector container.
+   */
+
+  @Test
+  public void testProjectionStatic() {
+    List<SchemaPath> selection = Lists.newArrayList(
+        SchemaPath.getSimplePath("c"),
+        SchemaPath.getSimplePath("b"),
+        SchemaPath.getSimplePath("e"));
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .add("c", MinorType.INT)
+        .add("d", MinorType.INT)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+
+    doProjectionTest(rsLoader);
+  }
+
+  @Test
+  public void testProjectionDynamic() {
+    List<SchemaPath> selection = Lists.newArrayList(
+        SchemaPath.getSimplePath("c"),
+        SchemaPath.getSimplePath("b"),
+        SchemaPath.getSimplePath("e"));
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REQUIRED));
+    rootWriter.addColumn(SchemaBuilder.columnSchema("b", MinorType.INT, DataMode.REQUIRED));
+    rootWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REQUIRED));
+    rootWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.INT, DataMode.REQUIRED));
+
+    doProjectionTest(rsLoader);
+  }
+
+  private void doProjectionTest(ResultSetLoader rsLoader) {
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // All columns appear, including non-projected ones.
+
+    TupleMetadata actualSchema = rootWriter.schema();
+    assertEquals(4, actualSchema.size());
+    assertEquals("a", actualSchema.column(0).getName());
+    assertEquals("b", actualSchema.column(1).getName());
+    assertEquals("c", actualSchema.column(2).getName());
+    assertEquals("d", actualSchema.column(3).getName());
+    assertEquals(0, actualSchema.index("A"));
+    assertEquals(3, actualSchema.index("d"));
+    assertEquals(-1, actualSchema.index("e"));
+
+    // Non-projected columns identify themselves via metadata
+
+    assertFalse(actualSchema.metadata("a").isProjected());
+    assertTrue(actualSchema.metadata("b").isProjected());
+    assertTrue(actualSchema.metadata("c").isProjected());
+    assertFalse(actualSchema.metadata("d").isProjected());
+
+    // Write some data. Doesn't need much.
+
+    rsLoader.startBatch();
+    for (int i = 1; i < 3; i++) {
+      rootWriter.start();
+      rootWriter.scalar(0).setInt(i * 5);
+      rootWriter.scalar(1).setInt(i);
+      rootWriter.scalar(2).setInt(i * 10);
+      rootWriter.scalar(3).setInt(i * 20);
+      rootWriter.save();
+    }
+
+    // Verify. Result should only have the projected
+    // columns, only if defined by the loader, in the order
+    // of definition.
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("b", MinorType.INT)
+        .add("c", MinorType.INT)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow(1, 10)
+        .addRow(2, 20)
+        .build();
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+//    actual.print();
+    new RowSetComparison(expected)
+        .verifyAndClearAll(actual);
+    rsLoader.close();
+  }
+
+  @Test
+  public void testMapProjection() {
+    List<SchemaPath> selection = Lists.newArrayList(
+        SchemaPath.getSimplePath("m1"),
+        SchemaPath.getCompoundPath("m2", "d"));
+    TupleMetadata schema = new SchemaBuilder()
+        .addMap("m1")
+          .add("a", MinorType.INT)
+          .add("b", MinorType.INT)
+          .buildMap()
+        .addMap("m2")
+          .add("c", MinorType.INT)
+          .add("d", MinorType.INT)
+          .buildMap()
+        .addMap("m3")
+          .add("e", MinorType.INT)
+          .add("f", MinorType.INT)
+          .buildMap()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Verify the projected columns
+
+    TupleMetadata actualSchema = rootWriter.schema();
+    ColumnMetadata m1Md = actualSchema.metadata("m1");
+    assertTrue(m1Md.isMap());
+    assertTrue(m1Md.isProjected());
+    assertEquals(2, m1Md.mapSchema().size());
+    assertTrue(m1Md.mapSchema().metadata("a").isProjected());
+    assertTrue(m1Md.mapSchema().metadata("b").isProjected());
+
+    ColumnMetadata m2Md = actualSchema.metadata("m2");
+    assertTrue(m2Md.isMap());
+    assertTrue(m2Md.isProjected());
+    assertEquals(2, m2Md.mapSchema().size());
+    assertFalse(m2Md.mapSchema().metadata("c").isProjected());
+    assertTrue(m2Md.mapSchema().metadata("d").isProjected());
+
+    ColumnMetadata m3Md = actualSchema.metadata("m3");
+    assertTrue(m3Md.isMap());
+    assertFalse(m3Md.isProjected());
+    assertEquals(2, m3Md.mapSchema().size());
+    assertFalse(m3Md.mapSchema().metadata("e").isProjected());
+    assertFalse(m3Md.mapSchema().metadata("f").isProjected());
+
+    // Write a couple of rows.
+
+    rsLoader.startBatch();
+    rootWriter.start();
+    rootWriter.tuple("m1").scalar("a").setInt(1);
+    rootWriter.tuple("m1").scalar("b").setInt(2);
+    rootWriter.tuple("m2").scalar("c").setInt(3);
+    rootWriter.tuple("m2").scalar("d").setInt(4);
+    rootWriter.tuple("m3").scalar("e").setInt(5);
+    rootWriter.tuple("m3").scalar("f").setInt(6);
+    rootWriter.save();
+
+    rootWriter.start();
+    rootWriter.tuple("m1").scalar("a").setInt(11);
+    rootWriter.tuple("m1").scalar("b").setInt(12);
+    rootWriter.tuple("m2").scalar("c").setInt(13);
+    rootWriter.tuple("m2").scalar("d").setInt(14);
+    rootWriter.tuple("m3").scalar("e").setInt(15);
+    rootWriter.tuple("m3").scalar("f").setInt(16);
+    rootWriter.save();
+
+    // Verify. Only the projected columns appear in the result set.
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+      .addMap("m1")
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .buildMap()
+      .addMap("m2")
+        .add("d", MinorType.INT)
+        .buildMap()
+      .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+      .addRow(new Object[] {1, 2}, new Object[] {4})
+      .addRow(new Object[] {11, 12}, new Object[] {14})
+      .build();
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(rsLoader.harvest()));
+    rsLoader.close();
+  }
+
+  /**
+   * Test a map array. Use the convenience methods to set values.
+   * Only the projected array members should appear in the harvested
+   * results.
+   */
+
+  @Test
+  public void testMapArrayProjection() {
+    List<SchemaPath> selection = Lists.newArrayList(
+        SchemaPath.getSimplePath("m1"),
+        SchemaPath.getCompoundPath("m2", "d"));
+    TupleMetadata schema = new SchemaBuilder()
+        .addMapArray("m1")
+          .add("a", MinorType.INT)
+          .add("b", MinorType.INT)
+          .buildMap()
+        .addMapArray("m2")
+          .add("c", MinorType.INT)
+          .add("d", MinorType.INT)
+          .buildMap()
+        .addMapArray("m3")
+          .add("e", MinorType.INT)
+          .add("f", MinorType.INT)
+          .buildMap()
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Write a couple of rows.
+
+    rsLoader.startBatch();
+    rootWriter.addRow(
+        new Object[] { new Object[] {10, 20}, new Object[] {11, 21}},
+        new Object[] { new Object[] {30, 40}, new Object[] {31, 42}},
+        new Object[] { new Object[] {50, 60}, new Object[] {51, 62}});
+    rootWriter.addRow(
+        new Object[] { new Object[] {110, 120}, new Object[] {111, 121}},
+        new Object[] { new Object[] {130, 140}, new Object[] {131, 142}},
+        new Object[] { new Object[] {150, 160}, new Object[] {151, 162}});
+
+    // Verify. Only the projected columns appear in the result set.
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+      .addMapArray("m1")
+        .add("a", MinorType.INT)
+        .add("b", MinorType.INT)
+        .buildMap()
+      .addMapArray("m2")
+        .add("d", MinorType.INT)
+        .buildMap()
+      .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+      .addRow(
+          new Object[] { new Object[] {10, 20}, new Object[] {11, 21}},
+          new Object[] { new Object[] {40}, new Object[] {42}})
+      .addRow(
+          new Object[] { new Object[] {110, 120}, new Object[] {111, 121}},
+          new Object[] { new Object[] {140}, new Object[] {142}})
+      .build();
+    new RowSetComparison(expected)
+        .verifyAndClearAll(fixture.wrap(rsLoader.harvest()));
+    rsLoader.close();
+  }
+
+  /**
+   * Verify that the projection code plays nice with vector overflow. Overflow
+   * is the most complex operation in this subsystem with many specialized
+   * methods that must work together flawlessly. This test ensures that
+   * non-projected columns stay in the background and don't interfere
+   * with overflow logic.
+   */
+
+  @Test
+  public void testProjectWithOverflow() {
+    List<SchemaPath> selection = Lists.newArrayList(
+        SchemaPath.getSimplePath("small"),
+        SchemaPath.getSimplePath("dummy"));
+    TupleMetadata schema = new SchemaBuilder()
+        .add("big", MinorType.VARCHAR)
+        .add("small", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setProjection(selection)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    byte big[] = new byte[600];
+    Arrays.fill(big, (byte) 'X');
+    byte small[] = new byte[512];
+    Arrays.fill(small, (byte) 'X');
+
+    rsLoader.startBatch();
+    int count = 0;
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rootWriter.scalar(0).setBytes(big, big.length);
+      rootWriter.scalar(1).setBytes(small, small.length);
+      rootWriter.save();
+      count++;
+    }
+
+    // Number of rows should be driven by size of the
+    // projected vector ("small"), not by the larger, unprojected
+    // "big" vector.
+    // Our row count should include the overflow row
+
+    int expectedCount = ValueVector.MAX_BUFFER_SIZE / small.length;
+    assertEquals(expectedCount + 1, count);
+
+    // Loader's row count should include only "visible" rows
+
+    assertEquals(expectedCount, rootWriter.rowCount());
+
+    // Total count should include invisible and look-ahead rows.
+
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+    // Result should exclude the overflow row
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(expectedCount, result.rowCount());
+    result.clear();
+
+    // Next batch should start with the overflow row
+
+    rsLoader.startBatch();
+    assertEquals(1, rootWriter.rowCount());
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(1, result.rowCount());
+    result.clear();
+
+    rsLoader.close();
+  }
+}