You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:37 UTC

[10/15] drill git commit: DRILL-5657: Size-aware vector writer structure

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java
new file mode 100644
index 0000000..ffcc84a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderProtocol.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter.UndefinedColumnException;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Tests of the overall result set loader protocol focusing on which operations
+ * are valid in each state, basics of column lookup, basics of adding columns
+ * and so on. Uses the simplest possible type: a required int.
+ * <p>
+ * Run this test first to do a sanity check of the result set loader after making
+ * changes.
+ * <p>
+ * You will find that the result set loader creates a very complex tree of
+ * objects that can be quite hard to understand and debug. Please read the
+ * material in the various subsystems to see how the classes fit together
+ * to implement Drill's rich JSON-like data model.
+ * <p>
+ * To aid in debugging, you can also dump the result set loader, and all its
+ * child objects as follows:<pre><code>
+ * ((ResultSetLoaderImpl) rsLoader).dump(new HierarchicalPrinter());
+ * </code></pre>
+ * Simply insert that line into these tests anywhere you want to visualize
+ * the structure. The object tree will show all the components and their
+ * current state.
+ */
+
+public class TestResultSetLoaderProtocol extends SubOperatorTest {
+
+  @Test
+  public void testBasics() {
+    ResultSetLoaderImpl rsLoaderImpl = new ResultSetLoaderImpl(fixture.allocator());
+    ResultSetLoader rsLoader = rsLoaderImpl;
+    assertEquals(0, rsLoader.schemaVersion());
+    assertEquals(ResultSetLoader.DEFAULT_ROW_COUNT, rsLoader.targetRowCount());
+    assertEquals(ValueVector.MAX_BUFFER_SIZE, rsLoader.targetVectorSize());
+    assertEquals(0, rsLoader.writer().rowCount());
+    assertEquals(0, rsLoader.batchCount());
+    assertEquals(0, rsLoader.totalRowCount());
+
+    // Failures due to wrong state (Start)
+
+    try {
+      rsLoader.harvest();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Can define schema before starting the first batch.
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata schema = rootWriter.schema();
+    assertEquals(0, schema.size());
+
+    MaterializedField fieldA = SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REQUIRED);
+    rootWriter.addColumn(fieldA);
+
+    assertEquals(1, schema.size());
+    assertSame(fieldA, schema.column(0));
+    assertSame(fieldA, schema.column("a"));
+
+    // Error to start a row before the first batch.
+
+    try {
+      rootWriter.start();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Error to end a row before the first batch.
+
+    try {
+      rootWriter.save();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Because writing is an inner loop; no checks are
+    // done to ensure that writing occurs only in the proper
+    // state. So, can't test setInt() in the wrong state.
+
+    rsLoader.startBatch();
+    try {
+      rsLoader.startBatch();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+    assertFalse(rootWriter.isFull());
+
+    rootWriter.start();
+    rootWriter.scalar(0).setInt(100);
+    assertEquals(0, rootWriter.rowCount());
+    assertEquals(0, rsLoader.batchCount());
+    rootWriter.save();
+    assertEquals(1, rootWriter.rowCount());
+    assertEquals(1, rsLoader.batchCount());
+    assertEquals(1, rsLoader.totalRowCount());
+
+    // Can add a field after first row, prior rows are
+    // "back-filled".
+
+    MaterializedField fieldB = SchemaBuilder.columnSchema("b", MinorType.INT, DataMode.OPTIONAL);
+    rootWriter.addColumn(fieldB);
+
+    assertEquals(2, schema.size());
+    assertSame(fieldB, schema.column(1));
+    assertSame(fieldB, schema.column("b"));
+
+    rootWriter.start();
+    rootWriter.scalar(0).setInt(200);
+    rootWriter.scalar(1).setInt(210);
+    rootWriter.save();
+    assertEquals(2, rootWriter.rowCount());
+    assertEquals(1, rsLoader.batchCount());
+    assertEquals(2, rsLoader.totalRowCount());
+
+    // Harvest the first batch. Version number is the number
+    // of columns added.
+
+    assertFalse(rootWriter.isFull());
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(2, rsLoader.schemaVersion());
+    assertEquals(0, rootWriter.rowCount());
+    assertEquals(1, rsLoader.batchCount());
+    assertEquals(2, rsLoader.totalRowCount());
+
+    SingleRowSet expected = fixture.rowSetBuilder(result.batchSchema())
+        .addRow(100, null)
+        .addRow(200, 210)
+        .build();
+    new RowSetComparison(expected)
+        .verifyAndClearAll(result);
+
+    // Between batches: batch-based operations fail
+
+    try {
+      rootWriter.start();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+    try {
+      rsLoader.harvest();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+    try {
+      rootWriter.save();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Create a second batch
+
+    rsLoader.startBatch();
+    assertEquals(0, rootWriter.rowCount());
+    assertEquals(1, rsLoader.batchCount());
+    assertEquals(2, rsLoader.totalRowCount());
+    rootWriter.start();
+    rootWriter.scalar(0).setInt(300);
+    rootWriter.scalar(1).setInt(310);
+    rootWriter.save();
+    assertEquals(1, rootWriter.rowCount());
+    assertEquals(2, rsLoader.batchCount());
+    assertEquals(3, rsLoader.totalRowCount());
+    rootWriter.start();
+    rootWriter.scalar(0).setInt(400);
+    rootWriter.scalar(1).setInt(410);
+    rootWriter.save();
+
+    // Harvest. Schema has not changed.
+
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(2, rsLoader.schemaVersion());
+    assertEquals(0, rootWriter.rowCount());
+    assertEquals(2, rsLoader.batchCount());
+    assertEquals(4, rsLoader.totalRowCount());
+
+    expected = fixture.rowSetBuilder(result.batchSchema())
+        .addRow(300, 310)
+        .addRow(400, 410)
+        .build();
+    new RowSetComparison(expected)
+        .verifyAndClearAll(result);
+
+    // Next batch. Schema has changed.
+
+    rsLoader.startBatch();
+    rootWriter.start();
+    rootWriter.scalar(0).setInt(500);
+    rootWriter.scalar(1).setInt(510);
+    rootWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL));
+    rootWriter.scalar(2).setInt(520);
+    rootWriter.save();
+    rootWriter.start();
+    rootWriter.scalar(0).setInt(600);
+    rootWriter.scalar(1).setInt(610);
+    rootWriter.scalar(2).setInt(620);
+    rootWriter.save();
+
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(3, rsLoader.schemaVersion());
+    expected = fixture.rowSetBuilder(result.batchSchema())
+        .addRow(500, 510, 520)
+        .addRow(600, 610, 620)
+        .build();
+    new RowSetComparison(expected)
+        .verifyAndClearAll(result);
+
+    rsLoader.close();
+
+    // Key operations fail after close.
+
+    try {
+      rootWriter.start();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+    try {
+      rsLoader.writer();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+    try {
+      rsLoader.startBatch();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+    try {
+      rsLoader.harvest();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+    try {
+      rootWriter.save();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Benign to close twice
+
+    rsLoader.close();
+  }
+
+  /**
+   * Schemas are case insensitive by default. Verify that
+   * the schema mechanism works, with emphasis on the
+   * case insensitive case.
+   */
+
+  @Test
+  public void testCaseInsensitiveSchema() {
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator());
+    RowSetLoader rootWriter = rsLoader.writer();
+    TupleMetadata schema = rootWriter.schema();
+
+    // No columns defined in schema
+
+    assertNull(schema.column("a"));
+    try {
+      schema.column(0);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // Expected
+    }
+
+    // No columns defined in writer
+
+    try {
+      rootWriter.column("a");
+      fail();
+    } catch (UndefinedColumnException e) {
+      // Expected
+    }
+    try {
+      rootWriter.column(0);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // Expected
+    }
+
+    // Define a column
+
+    MaterializedField colSchema = SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED);
+    rootWriter.addColumn(colSchema);
+
+    // Can now be found, case insensitive
+
+    assertSame(colSchema, schema.column(0));
+    assertSame(colSchema, schema.column("a"));
+    assertSame(colSchema, schema.column("A"));
+    assertNotNull(rootWriter.column(0));
+    assertNotNull(rootWriter.column("a"));
+    assertNotNull(rootWriter.column("A"));
+    assertEquals(1, schema.size());
+    assertEquals(0, schema.index("a"));
+    assertEquals(0, schema.index("A"));
+
+    // Reject a duplicate name, case insensitive
+
+    try {
+      rootWriter.addColumn(colSchema);
+      fail();
+    } catch(IllegalArgumentException e) {
+      // Expected
+    }
+    try {
+      MaterializedField testCol = SchemaBuilder.columnSchema("A", MinorType.VARCHAR, DataMode.REQUIRED);
+      rootWriter.addColumn(testCol);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+      assertTrue(e.getMessage().contains("Duplicate"));
+    }
+
+    // Can still add required fields while writing the first row.
+
+    rsLoader.startBatch();
+    rootWriter.start();
+    rootWriter.scalar(0).setString("foo");
+
+    MaterializedField col2 = SchemaBuilder.columnSchema("b", MinorType.VARCHAR, DataMode.REQUIRED);
+    rootWriter.addColumn(col2);
+    assertSame(col2, schema.column(1));
+    assertSame(col2, schema.column("b"));
+    assertSame(col2, schema.column("B"));
+    assertEquals(2, schema.size());
+    assertEquals(1, schema.index("b"));
+    assertEquals(1, schema.index("B"));
+    rootWriter.scalar(1).setString("second");
+
+    // After first row, can add an optional or repeated.
+    // Also allows a required field: values will be back-filled.
+
+    rootWriter.save();
+    rootWriter.start();
+    rootWriter.scalar(0).setString("bar");
+    rootWriter.scalar(1).setString("");
+
+    MaterializedField col3 = SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.REQUIRED);
+    rootWriter.addColumn(col3);
+    assertSame(col3, schema.column(2));
+    assertSame(col3, schema.column("c"));
+    assertSame(col3, schema.column("C"));
+    assertEquals(3, schema.size());
+    assertEquals(2, schema.index("c"));
+    assertEquals(2, schema.index("C"));
+    rootWriter.scalar("c").setString("c.2");
+
+    MaterializedField col4 = SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL);
+    rootWriter.addColumn(col4);
+    assertSame(col4, schema.column(3));
+    assertSame(col4, schema.column("d"));
+    assertSame(col4, schema.column("D"));
+    assertEquals(4, schema.size());
+    assertEquals(3, schema.index("d"));
+    assertEquals(3, schema.index("D"));
+    rootWriter.scalar("d").setString("d.2");
+
+    MaterializedField col5 = SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REPEATED);
+    rootWriter.addColumn(col5);
+    assertSame(col5, schema.column(4));
+    assertSame(col5, schema.column("e"));
+    assertSame(col5, schema.column("E"));
+    assertEquals(5, schema.size());
+    assertEquals(4, schema.index("e"));
+    assertEquals(4, schema.index("E"));
+    rootWriter.array(4).set("e1", "e2", "e3");
+    rootWriter.save();
+
+    // Verify. No reason to expect problems, but might as well check.
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(5, rsLoader.schemaVersion());
+    SingleRowSet expected = fixture.rowSetBuilder(result.batchSchema())
+        .addRow("foo", "second", "",    null,  new String[] { } )
+        .addRow("bar", "",       "c.2", "d.2", new String[] {"e1", "e2", "e3"} )
+        .build();
+    new RowSetComparison(expected)
+        .verifyAndClearAll(result);
+
+    // Handy way to test that close works to abort an in-flight batch
+    // and clean up.
+
+    rsLoader.close();
+  }
+
+  /**
+   * Provide a schema up front to the loader; schema is built before
+   * the first row.
+   * <p>
+   * Also verifies the test-time method to set a row of values using
+   * a single method.
+   */
+
+  @Test
+  public void testInitialSchema() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .addNullable("b", MinorType.INT)
+        .add("c", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    rootWriter
+        .addRow(10, 100, "fred")
+        .addRow(20, null, "barney")
+        .addRow(30, 300, "wilma");
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+
+    RowSet expected = fixture.rowSetBuilder(schema)
+        .addRow(10, 100, "fred")
+        .addRow(20, null, "barney")
+        .addRow(30, 300, "wilma")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(actual);
+    rsLoader.close();
+  }
+
+  /**
+   * The writer protocol allows a client to write to a row any number of times
+   * before invoking <tt>save()</tt>. In this case, each new value simply
+   * overwrites the previous value. Here, we test the most basic case: a simple,
+   * flat tuple with no arrays. We use a very large Varchar that would, if
+   * overwrite were not working, cause vector overflow.
+   * <p>
+   * The ability to overwrite rows is seldom needed except in one future use
+   * case: writing a row, then applying a filter "in-place" to discard unwanted
+   * rows, without having to send the row downstream.
+   * <p>
+   * Because of this use case, specific rules apply when discarding row or
+   * overwriting values.
+   * <ul>
+   * <li>Values can be written once per row. Fixed-width columns actually allow
+   * multiple writes. But, because of the way variable-width columns work,
+   * multiple writes will cause undefined results.</li>
+   * <li>To overwrite a row, call <tt>start()</tt> without calling
+   * <tt>save()</tt> on the previous row. Doing so ignores data for the
+   * previous row and starts a new row in place of the old one.</li>
+   * </ul>
+   * Note that there is no explicit method to discard a row. Instead,
+   * the rule is that a row is not saved until <tt>save()</tt> is called.
+   */
+
+  @Test
+  public void testOverwriteRow() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    // Can't use the shortcut to populate rows when doing overwrites.
+
+    ScalarWriter aWriter = rootWriter.scalar("a");
+    ScalarWriter bWriter = rootWriter.scalar("b");
+
+    // Write 100,000 rows, overwriting 99% of them. This will cause vector
+    // overflow and data corruption if overwrite does not work; but will happily
+    // produce the correct result if everything works as it should.
+
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    rsLoader.startBatch();
+    while (count < 100_000) {
+      rootWriter.start();
+      count++;
+      aWriter.setInt(count);
+      bWriter.setBytes(value, value.length);
+      if (count % 100 == 0) {
+        rootWriter.save();
+      }
+    }
+
+    // Verify using a reader.
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(count / 100, result.rowCount());
+    RowSetReader reader = result.reader();
+    int rowId = 1;
+    while (reader.next()) {
+      assertEquals(rowId * 100, reader.scalar("a").getInt());
+      assertTrue(Arrays.equals(value, reader.scalar("b").getBytes()));
+      rowId++;
+    }
+
+    result.clear();
+    rsLoader.close();
+  }
+
+  /**
+   * Test that memory is released if the loader is closed with an active
+   * batch (that is, before the batch is harvested.)
+   */
+
+  @Test
+  public void testCloseWithoutHarvest() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setSchema(schema)
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    for (int i = 0; i < 100; i++) {
+      rootWriter.start();
+      rootWriter.scalar("a").setInt(i);
+      rootWriter.scalar("b").setString("b-" + i);
+      rootWriter.save();
+    }
+
+    // Don't harvest the batch. Allocator will complain if the
+    // loader does not release memory.
+
+    rsLoader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTorture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTorture.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTorture.java
new file mode 100644
index 0000000..33b9826
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderTorture.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ArrayReader;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarElementReader;
+import org.apache.drill.exec.vector.accessor.ScalarReader;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleReader;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.LogFixture;
+import org.apache.drill.test.LogFixture.LogFixtureBuilder;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Runs a worst-case scenario test that combines aspects of all
+ * previous tests. Run this test only <i>after</i> all other tests
+ * pass. Combined conditions tested:
+ * <ul>
+ * <li>Nested maps and map arrays.</li>
+ * <li>Nullable VarChar (which has an offset vector and null-bit vector
+ * be kept in sync.)
+ * <li>Repeated Varchar (which requires to offset vectors be kept in
+ * sync.)</li>
+ * <li>Null values.</li>
+ * <li>Omitted values.</li>
+ * <li>Skipped rows.</li>
+ * <li>Vector overflow deep in the structure.</li>
+ * <li>Multiple batches.</li>
+ * </ul>
+ * The proposition that this test asserts is that if this test passes,
+ * then most clients will also work as they generally do not do all these
+ * things in a single query.
+ */
+
+public class TestResultSetLoaderTorture extends SubOperatorTest {
+
+  private static class TestSetup {
+    int n1Cycle = 5;
+    int n2Cycle = 7;
+    int s2Cycle = 11;
+    int m2Cycle = 13;
+    int n3Cycle = 17;
+    int s3Cycle = 19;
+    int skipCycle = 23;
+    int nullCycle = 3;
+    int m2Count = 9;
+    int s3Count = 29;
+
+    String s3Value;
+
+    public TestSetup() {
+      byte s3Bytes[] = new byte[512];
+      Arrays.fill(s3Bytes, (byte) 'X');
+      s3Value = new String(s3Bytes, Charsets.UTF_8);
+    }
+  }
+
+  // Write rows, skipping every 10th.
+  // n0 is the row id, so appears in every row.
+  // For n1, n2 and n3 and s2, omit selected values and makes others null.
+  // For s3, write values large enough to cause overflow; but skip some
+  // values and write 0 values for others.
+
+  private static class BatchWriter {
+
+    TestSetup setup;
+    RowSetLoader rootWriter;
+    ScalarWriter n1Writer;
+    ArrayWriter a2Writer;
+    ScalarWriter n2Writer;
+    ScalarWriter s2Writer;
+    ScalarWriter n3Writer;
+    ScalarWriter s3Writer;
+    int rowId = 0;
+    int innerCount = 0;
+    int writeRowCount = 0;
+    int startPrint = -1;
+    int endPrint = -1;
+    boolean lastRowDiscarded;
+
+    public BatchWriter(TestSetup setup, RowSetLoader rootWriter) {
+      this.setup = setup;
+      this.rootWriter = rootWriter;
+
+      TupleWriter m1Writer = rootWriter.tuple("m1");
+      n1Writer = m1Writer.scalar("n1");
+      a2Writer = m1Writer.array("m2");
+      TupleWriter m2Writer = a2Writer.tuple();
+      n2Writer = m2Writer.scalar("n2");
+      s2Writer = m2Writer.scalar("s2");
+      TupleWriter m3Writer = m2Writer.tuple("m3");
+      n3Writer = m3Writer.scalar("n3");
+      s3Writer = m3Writer.array("s3").scalar();
+    }
+
+    public void writeBatch() {
+
+      // Write until overflow
+
+      writeRowCount = rootWriter.rowCount();
+      //System.out.println("Start count: " + writeRowCount);
+      while (! rootWriter.isFull()) {
+        lastRowDiscarded = false;
+        writeRow();
+        rowId++;
+      }
+//      System.out.println("End of batch: rowId: " + rowId +
+//          ", count: " + writeRowCount +
+//          ", writer count:" + rootWriter.rowCount());
+    }
+
+    private void writeRow() {
+      rootWriter.start();
+
+      // Outer column
+
+      rootWriter.scalar("n0").setInt(rowId);
+      print("n0", rowId);
+
+      // Map 1: non-array
+
+      setInt("n1", n1Writer, rowId, setup.n1Cycle);
+
+      // Map2: an array.
+
+      if (rowId % setup.m2Cycle != 0) {
+        writeM2Array();
+      }
+
+      // Skip some rows
+
+      if (rowId % setup.skipCycle != 0) {
+        rootWriter.save();
+        writeRowCount++;
+      } else {
+        lastRowDiscarded = true;
+//        System.out.println("Skip row ID: " + rowId +
+//            ", count: " + writeRowCount +
+//            ", row set: " + rootWriter.rowCount());
+      }
+      if (rowId >= startPrint &&  rowId <= endPrint) {
+        System.out.println();
+      }
+    }
+
+    private void writeM2Array() {
+      for (int i = 0; i < setup.m2Count; i++) {
+
+        // n2: usual int
+
+        setInt("n2." + i, n2Writer, innerCount, setup.n2Cycle);
+
+        // S2: a nullable Varchar
+
+        if (innerCount % setup.s2Cycle == 0) {
+          // Skip
+        } else if (innerCount % setup.s2Cycle % setup.nullCycle == 0) {
+          s2Writer.setNull();
+          print("s2." + i, null);
+        } else {
+          s2Writer.setString("s2-" + innerCount);
+          print("s2." + i, "s2-" + innerCount);
+        }
+
+        // Map3: a non-repeated map
+
+        // n2: usual int
+
+        setInt("n3." + i, n3Writer, innerCount, setup.n3Cycle);
+
+        // s3: a repeated VarChar
+
+        if (innerCount % setup.s3Cycle != 0) {
+          for (int j = 0; j < setup.s3Count; j++) {
+            s3Writer.setString(setup.s3Value + (innerCount * setup.s3Count + j));
+          }
+          print("s3." + i, setup.s3Count + "x");
+        }
+        innerCount++;
+        a2Writer.save();
+      }
+    }
+
+    public void setInt(String label, ScalarWriter writer, int id, int cycle) {
+      int cycleIndex = id % cycle;
+      if (cycleIndex == 0) {
+        // Skip
+      } else if (cycleIndex % setup.nullCycle == 0) {
+        writer.setNull();
+        print(label, null);
+      } else {
+        writer.setInt(id * cycle);
+        print(label, id * cycle);
+      }
+    }
+
+    public void print(String label, Object value) {
+      if (rowId >= startPrint &&  rowId <= endPrint) {
+        System.out.print(label);
+        System.out.print(" = ");
+        System.out.print(value);
+        System.out.print(" ");
+      }
+    }
+
+    public int rowCount() {
+      return writeRowCount -
+          (lastRowDiscarded ? 0 : 1);
+    }
+  }
+
+  public static class ReadState {
+    int rowId = 0;
+    int innerCount = 0;
+  }
+
+  private static class BatchReader {
+
+    private TestSetup setup;
+    private RowSetReader rootReader;
+    ScalarReader n1Reader;
+    ArrayReader a2Reader;
+    ScalarReader n2Reader;
+    ScalarReader s2Reader;
+    ScalarReader n3Reader;
+    ScalarElementReader s3Reader;
+    ReadState readState;
+
+    public BatchReader(TestSetup setup, RowSetReader reader, ReadState readState) {
+      this.setup = setup;
+      this.rootReader = reader;
+      this.readState = readState;;
+
+      TupleReader m1Reader = rootReader.tuple("m1");
+      n1Reader = m1Reader.scalar("n1");
+      a2Reader = m1Reader.array("m2");
+      TupleReader m2Reader = a2Reader.tuple();
+      n2Reader = m2Reader.scalar("n2");
+      s2Reader = m2Reader.scalar("s2");
+      TupleReader m3Reader = m2Reader.tuple("m3");
+      n3Reader = m3Reader.scalar("n3");
+      s3Reader = m3Reader.array("s3").elements();
+    }
+
+    public void verify() {
+      while (rootReader.next()) {
+//        System.out.println(readState.rowId);
+        verifyRow();
+        readState.rowId++;
+      }
+    }
+
+    private void verifyRow() {
+      // Skipped original row? Bump the row id.
+
+      if (readState.rowId % setup.skipCycle == 0) {
+        if (readState.rowId % setup.m2Cycle != 0) {
+          readState.innerCount += setup.m2Count;
+        }
+        readState.rowId++;
+      }
+
+      // Outer column
+
+      assertEquals(readState.rowId, rootReader.scalar("n0").getInt());
+
+      // Map 1: non-array
+
+      checkInt(n1Reader, readState.rowId, setup.n1Cycle);
+
+      // Map2: an array.
+
+      if (readState.rowId % setup.m2Cycle == 0) {
+        assertEquals(0, a2Reader.size());
+      } else {
+        verifyM2Array();
+      }
+    }
+
+    private void verifyM2Array() {
+      for (int i = 0; i < setup.m2Count; i++) {
+        a2Reader.setPosn(i);
+
+        // n2: usual int
+
+        checkInt(n2Reader, readState.innerCount, setup.n2Cycle);
+
+        if (readState.innerCount % setup.s2Cycle == 0) {
+          // Skipped values should be null
+          assertTrue(
+              String.format("Row %d, entry %d", rootReader.rowIndex(), i),
+              s2Reader.isNull());
+        } else if (readState.innerCount % setup.s2Cycle % setup.nullCycle == 0) {
+          assertTrue(s2Reader.isNull());
+        } else {
+          assertEquals("s2-" + readState.innerCount, s2Reader.getString());
+        }
+
+        // Map3: a non-repeated map
+
+        // n2: usual int
+
+        checkInt(n3Reader, readState.innerCount, setup.n3Cycle);
+
+        // s3: a repeated VarChar
+
+        if (readState.innerCount % setup.s3Cycle == 0) {
+          assertEquals(0, s3Reader.size());
+        } else {
+          for (int j = 0; j < setup.s3Count; j++) {
+            assertEquals(setup.s3Value + (readState.innerCount * setup.s3Count + j), s3Reader.getString(j));
+          }
+        }
+        readState.innerCount++;
+      }
+    }
+
+    public void checkInt(ScalarReader reader, int id, int cycle) {
+      if (id % cycle == 0) {
+        // Skipped values should be null
+        assertTrue("id = " + id + " expected null for skipped", reader.isNull());
+      } else if (id % cycle % setup.nullCycle == 0) {
+        assertTrue(reader.isNull());
+      } else {
+        assertEquals(id * cycle, reader.getInt());
+      }
+    }
+  }
+
+  @Test
+  public void tortureTest() {
+    LogFixtureBuilder logBuilder = new LogFixtureBuilder()
+
+        // Enable to get detailed tracing when things go wrong.
+
+//        .logger("org.apache.drill.exec.physical.rowSet", Level.TRACE)
+        ;
+    try (LogFixture logFixture = logBuilder.build()) {
+      doTortureTest();
+    }
+  }
+
+  private void doTortureTest() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("n0", MinorType.INT)
+        .addMap("m1")
+          .addNullable("n1", MinorType.INT)
+          .addMapArray("m2")
+            .addNullable("n2", MinorType.INT)
+            .addNullable("s2", MinorType.VARCHAR)
+            .addMap("m3")
+              .addNullable("n3", MinorType.INT)
+              .addArray("s3", MinorType.VARCHAR)
+              .buildMap()
+            .buildMap()
+          .buildMap()
+        .buildSchema();
+    ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    TestSetup setup = new TestSetup();
+    BatchWriter batchWriter = new BatchWriter(setup, rootWriter);
+
+    int totalRowCount = 0;
+
+    ReadState readState = new ReadState();
+    for (int batchCount = 0; batchCount < 10; batchCount++) {
+      rsLoader.startBatch();
+      batchWriter.writeBatch();
+
+      // Now the hard part. Verify the above batch.
+
+      RowSet result = fixture.wrap(rsLoader.harvest());
+//      result.print();
+
+      // Should have overflowed
+
+      int savedCount = batchWriter.rowCount();
+      assertEquals(savedCount, result.rowCount());
+
+      totalRowCount += savedCount;
+      assertEquals(totalRowCount, rsLoader.totalRowCount());
+      assertEquals(batchCount + 1, rsLoader.batchCount());
+
+      BatchReader reader = new BatchReader(setup, result.reader(), readState);
+      reader.verify();
+      result.clear();
+    }
+
+    // Last row overflow row
+
+    {
+      rsLoader.startBatch();
+
+      // Use this to visualize a string buffer. There is also a method
+      // to visualize offset vectors. These two are the most pesky vectors
+      // to get right.
+
+//      VectorPrinter.printStrings((VarCharVector) ((NullableVarCharVector) ((AbstractScalarWriter) batchWriter.s2Writer).vector()).getValuesVector(), 0, 8);
+      RowSet result = fixture.wrap(rsLoader.harvest());
+
+      // Use this here, or earlier, when things go amiss and you need
+      // to see what the actual results might be.
+
+//      result.print();
+
+      totalRowCount++;
+      assertEquals(totalRowCount, rsLoader.totalRowCount());
+
+      BatchReader reader = new BatchReader(setup, result.reader(), readState);
+      reader.verify();
+      result.clear();
+    }
+    rsLoader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetSchemaChange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetSchemaChange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetSchemaChange.java
new file mode 100644
index 0000000..9787189
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetSchemaChange.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+public class TestResultSetSchemaChange extends SubOperatorTest {
+
+  /**
+   * Test the case where the schema changes in the first batch.
+   * Schema changes before the first record are trivial and tested
+   * elsewhere. Here we write some records, then add new columns, as a
+   * JSON reader might do.
+   */
+
+  @Test
+  public void testSchemaChangeFirstBatch() {
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator());
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    // Create initial rows
+
+    rsLoader.startBatch();
+    int rowCount = 0;
+    for (int i = 0; i < 2;  i++) {
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setString("a_" + rowCount);
+      rootWriter.save();
+    }
+
+    // Add a second column: nullable.
+
+    rootWriter.addColumn(SchemaBuilder.columnSchema("b", MinorType.INT, DataMode.OPTIONAL));
+    for (int i = 0; i < 2;  i++) {
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setString("a_" + rowCount);
+      rootWriter.scalar(1).setInt(rowCount);
+      rootWriter.save();
+    }
+
+    // Add a third column. Use variable-width so that offset
+    // vectors must be back-filled.
+
+    rootWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.OPTIONAL));
+    for (int i = 0; i < 2;  i++) {
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setString("a_" + rowCount);
+      rootWriter.scalar(1).setInt(rowCount);
+      rootWriter.scalar(2).setString("c_" + rowCount);
+      rootWriter.save();
+    }
+
+    // Fourth: Required Varchar. Previous rows are back-filled with empty strings.
+    // And a required int. Back-filled with zeros.
+    // May occasionally be useful. But, does have to work to prevent
+    // vector corruption if some reader decides to go this route.
+
+    rootWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED));
+    rootWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.INT,     DataMode.REQUIRED));
+    for (int i = 0; i < 2;  i++) {
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setString("a_" + rowCount);
+      rootWriter.scalar(1).setInt(rowCount);
+      rootWriter.scalar(2).setString("c_" + rowCount);
+      rootWriter.scalar(3).setString("d_" + rowCount);
+      rootWriter.scalar(4).setInt(rowCount * 10);
+      rootWriter.save();
+    }
+
+    // Add an array. Now two offset vectors must be back-filled.
+
+    rootWriter.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.REPEATED));
+    for (int i = 0; i < 2;  i++) {
+      rootWriter.start();
+      rowCount++;
+      rootWriter.scalar(0).setString("a_" + rowCount);
+      rootWriter.scalar(1).setInt(rowCount);
+      rootWriter.scalar(2).setString("c_" + rowCount);
+      rootWriter.scalar(3).setString("d_" + rowCount);
+      rootWriter.scalar(4).setInt(rowCount * 10);
+      ScalarWriter arrayWriter = rootWriter.column(5).array().scalar();
+      arrayWriter.setString("f_" + rowCount + "-1");
+      arrayWriter.setString("f_" + rowCount + "-2");
+      rootWriter.save();
+    }
+
+    // Harvest the batch and verify.
+
+    RowSet actual = fixture.wrap(rsLoader.harvest());
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .addNullable("b", MinorType.INT)
+        .addNullable("c", MinorType.VARCHAR)
+        .add("d", MinorType.VARCHAR)
+        .add("e", MinorType.INT)
+        .addArray("f", MinorType.VARCHAR)
+        .build();
+    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("a_1", null, null,   "",       0, new String[] {})
+        .addRow("a_2", null, null,   "",       0, new String[] {})
+        .addRow("a_3",    3, null,   "",       0, new String[] {})
+        .addRow("a_4",    4, null,   "",       0, new String[] {})
+        .addRow("a_5",    5, "c_5",  "",       0, new String[] {})
+        .addRow("a_6",    6, "c_6",  "",       0, new String[] {})
+        .addRow("a_7",    7, "c_7",  "d_7",   70, new String[] {})
+        .addRow("a_8",    8, "c_8",  "d_8",   80, new String[] {})
+        .addRow("a_9",    9, "c_9",  "d_9",   90, new String[] {"f_9-1",  "f_9-2"})
+        .addRow("a_10",  10, "c_10", "d_10", 100, new String[] {"f_10-1", "f_10-2"})
+        .build();
+
+    new RowSetComparison(expected)
+        .verifyAndClearAll(actual);
+    rsLoader.close();
+  }
+
+  /**
+   * Test a schema change on the row that overflows. If the
+   * new column is added after overflow, it will appear as
+   * a schema-change in the following batch. This is fine as
+   * we are essentially time-shifting: pretending that the
+   * overflow row was written in the next batch (which, in
+   * fact, it is: that's what overflow means.)
+   */
+
+  @Test
+  public void testSchemaChangeWithOverflow() {
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED));
+
+    rsLoader.startBatch();
+    byte value[] = new byte[512];
+    Arrays.fill(value, (byte) 'X');
+    int count = 0;
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      rootWriter.scalar(0).setBytes(value, value.length);
+
+      // Relies on fact that isFull becomes true right after
+      // a vector overflows; don't have to wait for saveRow().
+
+      if (rootWriter.isFull()) {
+        rootWriter.addColumn(SchemaBuilder.columnSchema("b", MinorType.INT, DataMode.OPTIONAL));
+        rootWriter.scalar(1).setInt(count);
+
+        // Add a Varchar to ensure its offset fiddling is done properly
+
+        rootWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.OPTIONAL));
+        rootWriter.scalar(2).setString("c-" + count);
+
+        // Allow adding a required column at this point.
+        // (Not intuitively obvious that this should work; we back-fill
+        // with zeros.)
+
+        rootWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.INT, DataMode.REQUIRED));
+      }
+      rootWriter.save();
+      count++;
+    }
+
+    // Result should include only the first column.
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("a", MinorType.VARCHAR)
+        .build();
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertTrue(result.batchSchema().isEquivalent(expectedSchema));
+    assertEquals(count - 1, result.rowCount());
+    result.clear();
+    assertEquals(1, rsLoader.schemaVersion());
+
+    // Double check: still can add a required column after
+    // starting the next batch. (No longer in overflow state.)
+
+    rsLoader.startBatch();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.INT, DataMode.REQUIRED));
+
+    // Next batch should start with the overflow row, including
+    // the column added at the end of the previous batch, after
+    // overflow.
+
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(5, rsLoader.schemaVersion());
+    assertEquals(1, result.rowCount());
+    expectedSchema = new SchemaBuilder(expectedSchema)
+        .addNullable("b", MinorType.INT)
+        .addNullable("c", MinorType.VARCHAR)
+        .add("d", MinorType.INT)
+        .add("e", MinorType.INT)
+        .build();
+    assertTrue(result.batchSchema().isEquivalent(expectedSchema));
+    RowSetReader reader = result.reader();
+    reader.next();
+    assertEquals(count - 1, reader.scalar(1).getInt());
+    assertEquals("c-" + (count - 1), reader.scalar(2).getString());
+    assertEquals(0, reader.scalar("d").getInt());
+    assertEquals(0, reader.scalar("e").getInt());
+    result.clear();
+
+    rsLoader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestTupleSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestTupleSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestTupleSchema.java
new file mode 100644
index 0000000..45c0b55
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestTupleSchema.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.TupleSchema.MapColumnMetadata;
+import org.apache.drill.exec.record.TupleSchema.PrimitiveColumnMetadata;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Test the tuple and column metadata, including extended attributes.
+ */
+
+public class TestTupleSchema extends SubOperatorTest {
+
+  /**
+   * Test a fixed-width, primitive, required column. Includes basic
+   * tests common to all data types. (Basic tests are not repeated for
+   * other types.)
+   */
+
+  @Test
+  public void testRequiredFixedWidthColumn() {
+
+    MaterializedField field = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REQUIRED );
+    ColumnMetadata col = TupleSchema.fromField(field);
+
+    // Code may depend on the specific column class
+
+    assertTrue(col instanceof PrimitiveColumnMetadata);
+
+    // Generic checks
+
+    assertEquals(ColumnMetadata.StructureType.PRIMITIVE, col.structureType());
+    assertNull(col.mapSchema());
+    assertSame(field, col.schema());
+    assertEquals(field.getName(), col.name());
+    assertEquals(field.getType(), col.majorType());
+    assertEquals(field.getType().getMinorType(), col.type());
+    assertEquals(field.getDataMode(), col.mode());
+    assertFalse(col.isNullable());
+    assertFalse(col.isArray());
+    assertFalse(col.isVariableWidth());
+    assertFalse(col.isMap());
+    assertFalse(col.isList());
+    assertTrue(col.isEquivalent(col));
+
+    ColumnMetadata col2 = TupleSchema.fromField(field);
+    assertTrue(col.isEquivalent(col2));
+
+    MaterializedField field3 = SchemaBuilder.columnSchema("d", MinorType.INT, DataMode.REQUIRED );
+    ColumnMetadata col3 = TupleSchema.fromField(field3);
+    assertFalse(col.isEquivalent(col3));
+
+    MaterializedField field4 = SchemaBuilder.columnSchema("c", MinorType.BIGINT, DataMode.REQUIRED );
+    ColumnMetadata col4 = TupleSchema.fromField(field4);
+    assertFalse(col.isEquivalent(col4));
+
+    MaterializedField field5 = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL );
+    ColumnMetadata col5 = TupleSchema.fromField(field5);
+    assertFalse(col.isEquivalent(col5));
+
+    ColumnMetadata col6 = col.cloneEmpty();
+    assertTrue(col.isEquivalent(col6));
+
+    assertEquals(4, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(4, col.expectedWidth());
+
+    assertEquals(1, col.expectedElementCount());
+    col.setExpectedElementCount(2);
+    assertEquals(1, col.expectedElementCount());
+  }
+
+  @Test
+  public void testNullableFixedWidthColumn() {
+
+    MaterializedField field = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL );
+    ColumnMetadata col = TupleSchema.fromField(field);
+
+    assertEquals(ColumnMetadata.StructureType.PRIMITIVE, col.structureType());
+    assertTrue(col.isNullable());
+    assertFalse(col.isArray());
+    assertFalse(col.isVariableWidth());
+    assertFalse(col.isMap());
+    assertFalse(col.isList());
+
+    assertEquals(4, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(4, col.expectedWidth());
+
+    assertEquals(1, col.expectedElementCount());
+    col.setExpectedElementCount(2);
+    assertEquals(1, col.expectedElementCount());
+  }
+
+  @Test
+  public void testRepeatedFixedWidthColumn() {
+
+    MaterializedField field = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REPEATED );
+    ColumnMetadata col = TupleSchema.fromField(field);
+
+    assertFalse(col.isNullable());
+    assertTrue(col.isArray());
+    assertFalse(col.isVariableWidth());
+    assertFalse(col.isMap());
+    assertFalse(col.isList());
+
+    assertEquals(4, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(4, col.expectedWidth());
+
+    assertEquals(ColumnMetadata.DEFAULT_ARRAY_SIZE, col.expectedElementCount());
+
+    col.setExpectedElementCount(2);
+    assertEquals(2, col.expectedElementCount());
+
+    col.setExpectedElementCount(0);
+    assertEquals(1, col.expectedElementCount());
+  }
+
+  @Test
+  public void testRequiredVariableWidthColumn() {
+
+    MaterializedField field = SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.REQUIRED );
+    ColumnMetadata col = TupleSchema.fromField(field);
+
+    assertEquals(ColumnMetadata.StructureType.PRIMITIVE, col.structureType());
+    assertNull(col.mapSchema());
+    assertFalse(col.isNullable());
+    assertFalse(col.isArray());
+    assertTrue(col.isVariableWidth());
+    assertFalse(col.isMap());
+    assertFalse(col.isList());
+
+    // A different precision is a different type.
+
+    MaterializedField field2 = new SchemaBuilder.ColumnBuilder("c", MinorType.VARCHAR)
+        .setMode(DataMode.REQUIRED)
+        .setPrecision(10)
+        .build();
+
+    ColumnMetadata col2 = TupleSchema.fromField(field2);
+    assertFalse(col.isEquivalent(col2));
+
+    assertEquals(50, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(10, col.expectedWidth());
+
+    assertEquals(1, col.expectedElementCount());
+    col.setExpectedElementCount(2);
+    assertEquals(1, col.expectedElementCount());
+
+    // If precision is provided, then that is the default width
+
+    col = TupleSchema.fromField(field2);
+    assertEquals(10, col.expectedWidth());
+  }
+
+  @Test
+  public void testNullableVariableWidthColumn() {
+
+    MaterializedField field = SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.OPTIONAL );
+    ColumnMetadata col = TupleSchema.fromField(field);
+
+    assertTrue(col.isNullable());
+    assertFalse(col.isArray());
+    assertTrue(col.isVariableWidth());
+    assertFalse(col.isMap());
+    assertFalse(col.isList());
+
+    assertEquals(50, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(10, col.expectedWidth());
+
+    assertEquals(1, col.expectedElementCount());
+    col.setExpectedElementCount(2);
+    assertEquals(1, col.expectedElementCount());
+  }
+
+  @Test
+  public void testRepeatedVariableWidthColumn() {
+
+    MaterializedField field = SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.REPEATED );
+    ColumnMetadata col = TupleSchema.fromField(field);
+
+    assertFalse(col.isNullable());
+    assertTrue(col.isArray());
+    assertTrue(col.isVariableWidth());
+    assertFalse(col.isMap());
+    assertFalse(col.isList());
+
+    assertEquals(50, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(10, col.expectedWidth());
+
+    assertEquals(ColumnMetadata.DEFAULT_ARRAY_SIZE, col.expectedElementCount());
+
+    col.setExpectedElementCount(2);
+    assertEquals(2, col.expectedElementCount());
+  }
+
+  /**
+   * Tests a map column. Maps can only be required or repeated, not nullable.
+   * (But, the columns in the map can be nullable.)
+   */
+
+  @Test
+  public void testMapColumn() {
+
+    MaterializedField field = SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REQUIRED );
+    ColumnMetadata col = TupleSchema.fromField(field);
+
+    assertTrue(col instanceof MapColumnMetadata);
+    assertNotNull(col.mapSchema());
+    assertEquals(0, col.mapSchema().size());
+    assertSame(col, col.mapSchema().parent());
+
+    MapColumnMetadata mapCol = (MapColumnMetadata) col;
+    assertNull(mapCol.parentTuple());
+
+    assertEquals(ColumnMetadata.StructureType.TUPLE, col.structureType());
+    assertFalse(col.isNullable());
+    assertFalse(col.isArray());
+    assertFalse(col.isVariableWidth());
+    assertTrue(col.isMap());
+    assertFalse(col.isList());
+
+    assertEquals(0, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(0, col.expectedWidth());
+
+    assertEquals(1, col.expectedElementCount());
+    col.setExpectedElementCount(2);
+    assertEquals(1, col.expectedElementCount());
+  }
+
+  @Test
+  public void testRepeatedMapColumn() {
+
+    MaterializedField field = SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REPEATED );
+    ColumnMetadata col = TupleSchema.fromField(field);
+
+    assertTrue(col instanceof MapColumnMetadata);
+    assertNotNull(col.mapSchema());
+    assertEquals(0, col.mapSchema().size());
+
+    assertFalse(col.isNullable());
+    assertTrue(col.isArray());
+    assertFalse(col.isVariableWidth());
+    assertTrue(col.isMap());
+    assertFalse(col.isList());
+
+    assertEquals(0, col.expectedWidth());
+    col.setExpectedWidth(10);
+    assertEquals(0, col.expectedWidth());
+
+    assertEquals(ColumnMetadata.DEFAULT_ARRAY_SIZE, col.expectedElementCount());
+
+    col.setExpectedElementCount(2);
+    assertEquals(2, col.expectedElementCount());
+  }
+
+    // List
+
+    // Repeated list
+
+  /**
+   * Test the basics of an empty root tuple (i.e. row) schema.
+   */
+
+  @Test
+  public void testEmptyRootTuple() {
+
+    TupleMetadata root = new TupleSchema();
+
+    assertEquals(0, root.size());
+    assertTrue(root.isEmpty());
+    assertEquals(-1, root.index("foo"));
+
+    try {
+      root.metadata(0);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // Expected
+    }
+    assertNull(root.metadata("foo"));
+
+    try {
+      root.column(0);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // Expected
+    }
+    assertNull(root.column("foo"));
+
+    try {
+      root.fullName(0);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // Expected
+    }
+
+    // The full name method does not check if the column is actually
+    // in the tuple.
+
+    MaterializedField field = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REQUIRED );
+    ColumnMetadata col = TupleSchema.fromField(field);
+    assertEquals("c", root.fullName(col));
+
+    assertTrue(root.isEquivalent(root));
+    assertNull(root.parent());
+    assertTrue(root.toFieldList().isEmpty());
+  }
+
+  /**
+   * Test the basics of a non-empty root tuple (i.e. a row) using a pair
+   * of primitive columns.
+   */
+
+  @Test
+  public void testNonEmptyRootTuple() {
+
+    TupleMetadata root = new TupleSchema();
+
+    MaterializedField fieldA = SchemaBuilder.columnSchema("a", MinorType.INT, DataMode.REQUIRED );
+    ColumnMetadata colA = root.add(fieldA);
+
+    assertEquals(1, root.size());
+    assertFalse(root.isEmpty());
+    assertEquals(0, root.index("a"));
+    assertEquals(-1, root.index("b"));
+
+    assertSame(fieldA, root.column(0));
+    assertSame(fieldA, root.column("a"));
+    assertSame(fieldA, root.column("A"));
+
+    assertSame(colA, root.metadata(0));
+    assertSame(colA, root.metadata("a"));
+
+    assertEquals("a", root.fullName(0));
+    assertEquals("a", root.fullName(colA));
+
+    try {
+      root.add(fieldA);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+
+    MaterializedField fieldB = SchemaBuilder.columnSchema("b", MinorType.VARCHAR, DataMode.OPTIONAL );
+    ColumnMetadata colB = TupleSchema.fromField(fieldB);
+    int indexB = root.addColumn(colB);
+
+    assertEquals(1, indexB);
+    assertEquals(2, root.size());
+    assertFalse(root.isEmpty());
+    assertEquals(indexB, root.index("b"));
+
+    assertSame(fieldB, root.column(1));
+    assertSame(fieldB, root.column("b"));
+
+    assertSame(colB, root.metadata(1));
+    assertSame(colB, root.metadata("b"));
+
+    assertEquals("b", root.fullName(1));
+    assertEquals("b", root.fullName(colB));
+
+    try {
+      root.add(fieldB);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+
+    List<MaterializedField> fieldList = root.toFieldList();
+    assertSame(fieldA, fieldList.get(0));
+    assertSame(fieldB, fieldList.get(1));
+
+    TupleMetadata emptyRoot = new TupleSchema();
+    assertFalse(emptyRoot.isEquivalent(root));
+
+    // Same schema: the tuples are equivalent
+
+    TupleMetadata root3 = new TupleSchema();
+    root3.add(fieldA);
+    root3.addColumn(colB);
+    assertTrue(root3.isEquivalent(root));
+    assertTrue(root.isEquivalent(root3));
+
+    // Same columns, different order. The tuples are not equivalent.
+
+    TupleMetadata root4 = new TupleSchema();
+    root4.addColumn(colB);
+    root4.add(fieldA);
+    assertFalse(root4.isEquivalent(root));
+    assertFalse(root.isEquivalent(root4));
+
+    // A tuple is equivalent to its copy.
+
+    assertTrue(root.isEquivalent(((TupleSchema) root).copy()));
+
+    // And it is equivalent to the round trip to a batch schema.
+
+    BatchSchema batchSchema = ((TupleSchema) root).toBatchSchema(SelectionVectorMode.NONE);
+    assertTrue(root.isEquivalent(TupleSchema.fromFields(batchSchema)));
+  }
+
+  /**
+   * Test a complex map schema of the form:<br>
+   * a.`b.x`.`c.y`.d<br>
+   * in which columns "a", "b.x" and "c.y" are maps, "b.x" and "c.y" are names
+   * that contains dots, and d is primitive.
+   */
+
+  @Test
+  public void testMapTuple() {
+
+    TupleMetadata root = new TupleSchema();
+
+    MaterializedField fieldA = SchemaBuilder.columnSchema("a", MinorType.MAP, DataMode.REQUIRED);
+    ColumnMetadata colA = root.add(fieldA);
+    TupleMetadata mapA = colA.mapSchema();
+
+    MaterializedField fieldB = SchemaBuilder.columnSchema("b.x", MinorType.MAP, DataMode.REQUIRED);
+    ColumnMetadata colB = mapA.add(fieldB);
+    TupleMetadata mapB = colB.mapSchema();
+
+    MaterializedField fieldC = SchemaBuilder.columnSchema("c.y", MinorType.MAP, DataMode.REQUIRED);
+    ColumnMetadata colC = mapB.add(fieldC);
+    TupleMetadata mapC = colC.mapSchema();
+
+    MaterializedField fieldD = SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED);
+    ColumnMetadata colD = mapC.add(fieldD);
+
+    MaterializedField fieldE = SchemaBuilder.columnSchema("e", MinorType.INT, DataMode.REQUIRED);
+    ColumnMetadata colE = mapC.add(fieldE);
+
+    assertEquals(1, root.size());
+    assertEquals(1, mapA.size());
+    assertEquals(1, mapB.size());
+    assertEquals(2, mapC.size());
+
+    assertSame(colA, root.metadata("a"));
+    assertSame(colB, mapA.metadata("b.x"));
+    assertSame(colC, mapB.metadata("c.y"));
+    assertSame(colD, mapC.metadata("d"));
+    assertSame(colE, mapC.metadata("e"));
+
+    // The full name contains quoted names if the contain dots.
+    // This name is more for diagnostic than semantic purposes.
+
+    assertEquals("a", root.fullName(0));
+    assertEquals("a.`b.x`", mapA.fullName(0));
+    assertEquals("a.`b.x`.`c.y`", mapB.fullName(0));
+    assertEquals("a.`b.x`.`c.y`.d", mapC.fullName(0));
+    assertEquals("a.`b.x`.`c.y`.e", mapC.fullName(1));
+
+    assertEquals(1, colA.schema().getChildren().size());
+    assertEquals(1, colB.schema().getChildren().size());
+    assertEquals(2, colC.schema().getChildren().size());
+
+    // Yes, it is awful that MaterializedField does not provide indexed
+    // access to its children. That's one reason we have the TupleMetadata
+    // classes..
+
+    assertSame(fieldB, colA.schema().getChildren().iterator().next());
+    assertSame(fieldC, colB.schema().getChildren().iterator().next());
+    Iterator<MaterializedField> iterC = colC.schema().getChildren().iterator();
+    assertSame(fieldD, iterC.next());
+    assertSame(fieldE, iterC.next());
+
+    // Copying should be deep.
+
+    TupleMetadata root2 = ((TupleSchema) root).copy();
+    assertEquals(2, root2.metadata(0).mapSchema().metadata(0).mapSchema().metadata(0).mapSchema().size());
+    assert(root.isEquivalent(root2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
deleted file mode 100644
index b17bf18..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.record;
-
-import static org.junit.Assert.*;
-
-import org.apache.drill.categories.VectorTest;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.test.DrillTest;
-import org.apache.drill.test.OperatorFixture;
-import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.drill.test.rowSet.SchemaBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(VectorTest.class)
-public class TestVectorContainer extends DrillTest {
-
-  // TODO: Replace the following with an extension of SubOperatorTest class
-  // once that is available.
-
-  protected static OperatorFixture fixture;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    fixture = OperatorFixture.standardFixture();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    fixture.close();
-  }
-
-  /**
-   * Test of the ability to merge two schemas and to merge
-   * two vector containers. The merge is "horizontal", like
-   * a row-by-row join. Since each container is a list of
-   * vectors, we just combine the two lists to create the
-   * merged result.
-   */
-  @Test
-  public void testContainerMerge() {
-
-    // Simulated data from a reader
-
-    BatchSchema leftSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .addNullable("b", MinorType.VARCHAR)
-        .build();
-    SingleRowSet left = fixture.rowSetBuilder(leftSchema)
-        .add(10, "fred")
-        .add(20, "barney")
-        .add(30, "wilma")
-        .build();
-
-    // Simulated "implicit" coumns: row number and file name
-
-    BatchSchema rightSchema = new SchemaBuilder()
-        .add("x", MinorType.SMALLINT)
-        .add("y", MinorType.VARCHAR)
-        .build();
-    SingleRowSet right = fixture.rowSetBuilder(rightSchema)
-        .add(1, "foo.txt")
-        .add(2, "bar.txt")
-        .add(3, "dino.txt")
-        .build();
-
-    // The merge batch we expect to see
-
-    BatchSchema expectedSchema = new SchemaBuilder()
-        .add("a", MinorType.INT)
-        .addNullable("b", MinorType.VARCHAR)
-        .add("x", MinorType.SMALLINT)
-        .add("y", MinorType.VARCHAR)
-        .build();
-    SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
-        .add(10, "fred", 1, "foo.txt")
-        .add(20, "barney", 2, "bar.txt")
-        .add(30, "wilma", 3, "dino.txt")
-        .build();
-
-    // Merge containers without selection vector
-
-    RowSet merged = fixture.wrap(
-        left.container().merge(right.container()));
-
-    RowSetComparison comparison = new RowSetComparison(expected);
-    comparison.verify(merged);
-
-    // Merge containers via row set facade
-
-    RowSet mergedRs = left.merge(right);
-    comparison.verifyAndClearAll(mergedRs);
-
-    // Add a selection vector. Merging is forbidden, in the present code,
-    // for batches that have a selection vector.
-
-    SingleRowSet leftIndirect = left.toIndirect();
-    try {
-      leftIndirect.merge(right);
-      fail();
-    } catch (IllegalArgumentException e) {
-      // Expected
-    }
-    leftIndirect.clear();
-    right.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index 0f8f766..621d288 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -251,6 +251,7 @@ public class TestValueVector extends ExecTest {
     final DrillBuf newBuf = allocator.buffer(size);
     final DrillBuf writeBuf = newBuf;
     for(final DrillBuf buffer : buffers) {
+      @SuppressWarnings("resource")
       final DrillBuf readBuf = (DrillBuf) buffer.slice();
       final int nBytes = readBuf.readableBytes();
       final byte[] bytes = new byte[nBytes];
@@ -266,6 +267,7 @@ public class TestValueVector extends ExecTest {
     final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, RepeatedIntHolder.TYPE);
 
     // Create a new value vector.
+    @SuppressWarnings("resource")
     final RepeatedIntVector vector1 = new RepeatedIntVector(field, allocator);
 
     // Populate the vector.
@@ -321,6 +323,7 @@ the interface to load has changed
     final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, VarCharHolder.TYPE);
 
     // Create a new value vector for 1024 variable length strings.
+    @SuppressWarnings("resource")
     final VarCharVector vector1 = new VarCharVector(field, allocator);
     final VarCharVector.Mutator mutator = vector1.getMutator();
     vector1.allocateNew(1024 * 10, 1024);
@@ -337,7 +340,9 @@ the interface to load has changed
 
     // Combine the backing buffers so we can load them into a new vector.
     final DrillBuf[] buffers1 = vector1.getBuffers(false);
+    @SuppressWarnings("resource")
     final DrillBuf buffer1 = combineBuffers(allocator, buffers1);
+    @SuppressWarnings("resource")
     final VarCharVector vector2 = new VarCharVector(field, allocator);
     vector2.load(vector1.getMetadata(), buffer1);
 
@@ -360,6 +365,7 @@ the interface to load has changed
     final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableVarCharHolder.TYPE);
 
     // Create a new value vector for 1024 nullable variable length strings.
+    @SuppressWarnings("resource")
     final NullableVarCharVector vector1 = new NullableVarCharVector(field, allocator);
     final NullableVarCharVector.Mutator mutator = vector1.getMutator();
     vector1.allocateNew(1024 * 10, 1024);
@@ -394,7 +400,9 @@ the interface to load has changed
 
     // Combine into a single buffer so we can load it into a new vector.
     final DrillBuf[] buffers1 = vector1.getBuffers(false);
+    @SuppressWarnings("resource")
     final DrillBuf buffer1 = combineBuffers(allocator, buffers1);
+    @SuppressWarnings("resource")
     final NullableVarCharVector vector2 = new NullableVarCharVector(field, allocator);
     vector2.load(vector1.getMetadata(), buffer1);
 
@@ -673,6 +681,7 @@ the interface to load has changed
       }
 
       for (int i = 0; i < valueVectors.length; i++) {
+        @SuppressWarnings("resource")
         final ValueVector vv = valueVectors[i];
         final int vvCapacity = vv.getValueCapacity();
 
@@ -718,6 +727,7 @@ the interface to load has changed
    *
    * @param test test function to execute
    */
+  @SuppressWarnings("resource")
   private void testVectors(VectorVerifier test) throws Exception {
     final MaterializedField[] fields = {
         MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE),
@@ -777,6 +787,7 @@ the interface to load has changed
 
   @Test
   public void testVectorCanLoadEmptyBuffer() throws Exception {
+    @SuppressWarnings("resource")
     final DrillBuf empty = allocator.getEmpty();
 
     testVectors(new VectorVerifier() {
@@ -798,6 +809,7 @@ the interface to load has changed
     });
   }
 
+  @SuppressWarnings("resource")
   @Test
   public void testListVectorShouldNotThrowOversizedAllocationException() throws Exception {
     final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH,

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index a8eef3c..5af0306 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
index c792233..5ce8e3f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
@@ -103,7 +103,7 @@ public class TestCsv extends ClusterTest {
         .add("c", MinorType.VARCHAR)
         .build();
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .add("10", "foo", "bar")
+        .addRow("10", "foo", "bar")
         .build();
     new RowSetComparison(expected)
       .verifyAndClearAll(actual);
@@ -129,7 +129,7 @@ public class TestCsv extends ClusterTest {
         .add("c_2_2", MinorType.VARCHAR)
         .build();
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .add("10", "foo", "bar", "fourth", "fifth", "sixth")
+        .addRow("10", "foo", "bar", "fourth", "fifth", "sixth")
         .build();
     new RowSetComparison(expected)
       .verifyAndClearAll(actual);
@@ -151,7 +151,7 @@ public class TestCsv extends ClusterTest {
     assertEquals(expectedSchema, actual.batchSchema());
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .add("10", "foo", "bar")
+        .addRow("10", "foo", "bar")
         .build();
     new RowSetComparison(expected)
       .verifyAndClearAll(actual);

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
index 8366b7a..69667a8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
@@ -123,8 +123,8 @@ public class ExampleTest {
         .build();
 
       final RowSet rowSet = new RowSetBuilder(allocator, schema)
-        .add("1", "kiwi")
-        .add("2", "watermelon")
+        .addRow("1", "kiwi")
+        .addRow("2", "watermelon")
         .build();
 
       new JsonFileBuilder(rowSet).build(tableFile);

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index c03f0b7..a1b8af5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -42,8 +42,12 @@ import org.apache.drill.exec.ops.OperatorStatReceiver;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.testing.ExecutionControls;
@@ -290,21 +294,29 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
   }
 
   public RowSetBuilder rowSetBuilder(BatchSchema schema) {
+    return rowSetBuilder(TupleSchema.fromFields(schema));
+  }
+
+  public RowSetBuilder rowSetBuilder(TupleMetadata schema) {
     return new RowSetBuilder(allocator, schema);
   }
 
   public ExtendableRowSet rowSet(BatchSchema schema) {
-    return new DirectRowSet(allocator, schema);
+    return DirectRowSet.fromSchema(allocator, schema);
+  }
+
+  public ExtendableRowSet rowSet(TupleMetadata schema) {
+    return DirectRowSet.fromSchema(allocator, schema);
   }
 
   public RowSet wrap(VectorContainer container) {
     switch (container.getSchema().getSelectionVectorMode()) {
     case FOUR_BYTE:
-      return new HyperRowSetImpl(allocator(), container, container.getSelectionVector4());
+      return new HyperRowSetImpl(container, container.getSelectionVector4());
     case NONE:
-      return new DirectRowSet(allocator(), container);
+      return DirectRowSet.fromContainer(container);
     case TWO_BYTE:
-      return new IndirectRowSet(allocator(), container);
+      return IndirectRowSet.fromSv2(container, container.getSelectionVector2());
     default:
       throw new IllegalStateException( "Unexpected selection mode" );
     }
@@ -342,4 +354,14 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
   public OperatorContext operatorContext(PhysicalOperator config) {
     return new TestOperatorContext(context, allocator(), config, stats);
   }
+
+  public RowSet wrap(VectorContainer container, SelectionVector2 sv2) {
+    if (sv2 == null) {
+      assert container.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE;
+      return DirectRowSet.fromContainer(container);
+    } else {
+      assert container.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
+      return IndirectRowSet.fromSv2(container, sv2);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 58f888d..2d1aa9b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -55,7 +55,7 @@ import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
 import org.apache.drill.test.ClientFixture.StatementParser;
 import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetReader;
 
 import com.google.common.base.Preconditions;
 
@@ -338,7 +338,7 @@ public class QueryBuilder {
       dataBatch.release();
       VectorContainer container = loader.getContainer();
       container.setRecordCount(loader.getRecordCount());
-      return new DirectRowSet(client.allocator(), container);
+      return DirectRowSet.fromContainer(container);
     } catch (SchemaChangeException e) {
       throw new IllegalStateException(e);
     }
@@ -364,7 +364,7 @@ public class QueryBuilder {
     }
     RowSetReader reader = rowSet.reader();
     reader.next();
-    long value = reader.column(0).getLong();
+    long value = reader.scalar(0).getLong();
     rowSet.clear();
     return value;
   }
@@ -385,7 +385,7 @@ public class QueryBuilder {
     }
     RowSetReader reader = rowSet.reader();
     reader.next();
-    int value = reader.column(0).getInt();
+    int value = reader.scalar(0).getInt();
     rowSet.clear();
     return value;
   }
@@ -407,10 +407,10 @@ public class QueryBuilder {
     RowSetReader reader = rowSet.reader();
     reader.next();
     String value;
-    if (reader.column(0).isNull()) {
+    if (reader.scalar(0).isNull()) {
       value = null;
     } else {
-      value = reader.column(0).getString();
+      value = reader.scalar(0).getString();
     }
     rowSet.clear();
     return value;

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
index c329690..c1b9253 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
@@ -93,7 +93,7 @@ public class QueryRowSetIterator implements Iterator<DirectRowSet>, Iterable<Dir
       batch = null;
       VectorContainer container = loader.getContainer();
       container.setRecordCount(loader.getRecordCount());
-      return new DirectRowSet(allocator, container);
+      return DirectRowSet.fromContainer(container);
     } catch (SchemaChangeException e) {
       throw new IllegalStateException(e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
index 6400a5b..d128e4f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
@@ -19,12 +19,10 @@ package org.apache.drill.test.rowSet;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.TupleMetadata;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnAccessor.RowIndex;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
-import org.apache.drill.exec.vector.accessor.impl.TupleReaderImpl;
 
 /**
  * Basic implementation of a row set for both the single and multiple
@@ -33,119 +31,36 @@ import org.apache.drill.exec.vector.accessor.impl.TupleReaderImpl;
 
 public abstract class AbstractRowSet implements RowSet {
 
-  /**
-   * Row set index base class used when indexing rows within a row
-   * set for a row set reader. Keeps track of the current position,
-   * which starts before the first row, meaning that the client
-   * must call <tt>next()</tt> to advance to the first row.
-   */
-
-  public static abstract class RowSetIndex implements RowIndex {
-    protected int rowIndex = -1;
-
-    public int position() { return rowIndex; }
-    public abstract boolean next();
-    public abstract int size();
-    public abstract boolean valid();
-    public void set(int index) { rowIndex = index; }
-  }
-
-  /**
-   * Bounded (read-only) version of the row set index. When reading,
-   * the row count is fixed, and set here.
-   */
-
-  public static abstract class BoundedRowIndex extends RowSetIndex {
-
-    protected final int rowCount;
-
-    public BoundedRowIndex(int rowCount) {
-      this.rowCount = rowCount;
-    }
-
-    @Override
-    public boolean next() {
-      if (++rowIndex < rowCount ) {
-        return true;
-      } else {
-        rowIndex--;
-        return false;
-      }
-    }
-
-    @Override
-    public int size() { return rowCount; }
-
-    @Override
-    public boolean valid() { return rowIndex < rowCount; }
-  }
-
-  /**
-   * Reader implementation for a row set.
-   */
-
-  public class RowSetReaderImpl extends TupleReaderImpl implements RowSetReader {
-
-    protected final RowSetIndex index;
-
-    public RowSetReaderImpl(TupleSchema schema, RowSetIndex index, AbstractColumnReader[] readers) {
-      super(schema, readers);
-      this.index = index;
-    }
-
-    @Override
-    public boolean next() { return index.next(); }
-
-    @Override
-    public boolean valid() { return index.valid(); }
-
-    @Override
-    public int index() { return index.position(); }
-
-    @Override
-    public int size() { return index.size(); }
-
-    @Override
-    public int rowIndex() { return index.index(); }
-
-    @Override
-    public int batchIndex() { return index.batch(); }
-
-    @Override
-    public void set(int index) { this.index.set(index); }
-  }
-
-  protected final BufferAllocator allocator;
-  protected final RowSetSchema schema;
-  protected final VectorContainer container;
   protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+  protected VectorContainer container;
+  protected TupleMetadata schema;
 
-  public AbstractRowSet(BufferAllocator allocator, BatchSchema schema, VectorContainer container) {
-    this.allocator = allocator;
-    this.schema = new RowSetSchema(schema);
+  public AbstractRowSet(VectorContainer container, TupleMetadata schema) {
     this.container = container;
+    this.schema = schema;
   }
 
   @Override
-  public VectorAccessible vectorAccessible() { return container; }
+  public VectorAccessible vectorAccessible() { return container(); }
 
   @Override
   public VectorContainer container() { return container; }
 
   @Override
-  public int rowCount() { return container.getRecordCount(); }
+  public int rowCount() { return container().getRecordCount(); }
 
   @Override
   public void clear() {
+    VectorContainer container = container();
     container.zeroVectors();
     container.setRecordCount(0);
   }
 
   @Override
-  public RowSetSchema schema() { return schema; }
+  public TupleMetadata schema() { return schema; }
 
   @Override
-  public BufferAllocator allocator() { return allocator; }
+  public BufferAllocator allocator() { return container.getAllocator(); }
 
   @Override
   public void print() {
@@ -158,7 +73,5 @@ public abstract class AbstractRowSet implements RowSet {
   }
 
   @Override
-  public BatchSchema batchSchema() {
-    return container.getSchema();
-  }
+  public BatchSchema batchSchema() { return container().getSchema(); }
 }