You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/11/14 17:18:54 UTC

[drill] 01/05: DRILL-7442: Create multi-batch row set reader

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 701a3166669a84563a76b48bc8e35e214d20187b
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Sun Nov 10 16:17:49 2019 -0800

    DRILL-7442: Create multi-batch row set reader
    
    Adds a ResultSetReader that works across multiple batches
    in a result set. Reuses the same row set and readers if
    schema is unchanged, creates a new set if the schema changes.
    
    Adds a unit test for the result set reader.
    
    Adds a "rebind" capability to the row set readers to focus
    on new buffers under an existing set of vectors. Used when
    a new batch arrives, if the schema is unchanged.
    
    Extends row set classses to be aware of the BatchAccessor class
    which encapsulates a container and optional selection vector,
    and tracks schema changes.
    
    Moves row set tests into the same package as the row sets.
    (Row set classes were moved a while back, but the tests were
    not moved.)
    
    Renames some BatchAccessor methods.
    
    closes #1897
---
 .../exec/physical/impl/protocol/BatchAccessor.java |  12 +-
 .../physical/impl/protocol/OperatorDriver.java     |   4 +-
 .../impl/protocol/OperatorRecordBatch.java         |  14 +-
 .../impl/protocol/VectorContainerAccessor.java     |  16 +-
 .../drill/exec/physical/impl/scan/ReaderState.java |   6 +-
 .../exec/physical/impl/scan/ScanOperatorExec.java  |   2 +-
 .../exec/physical/resultSet/ResultSetReader.java   |  92 ++++++++++
 .../resultSet/impl/ResultSetReaderImpl.java        | 103 +++++++++++
 .../exec/physical/resultSet/model/ReaderIndex.java |   5 +
 .../drill/exec/physical/rowSet/AbstractRowSet.java |   5 +
 .../apache/drill/exec/physical/rowSet/RowSet.java  |  14 +-
 .../exec/physical/rowSet/RowSetFormatter.java      |  20 +-
 .../drill/exec/physical/rowSet/RowSetReader.java   |  56 +++++-
 .../exec/physical/rowSet/RowSetReaderImpl.java     |  16 +-
 .../apache/drill/exec/physical/rowSet/RowSets.java |  66 +++++++
 .../org/apache/drill/exec/util/BatchPrinter.java   |   6 +-
 .../impl/protocol/TestOperatorRecordBatch.java     |   4 +-
 .../physical/impl/scan/TestFileScanFramework.java  |  32 ++--
 .../physical/impl/scan/TestScanOperExecBasics.java |  12 +-
 .../impl/scan/TestScanOperExecEarlySchema.java     |  34 ++--
 .../impl/scan/TestScanOperExecLateSchema.java      |  40 ++--
 .../impl/scan/TestScanOperExecOuputSchema.java     |  16 +-
 .../impl/scan/TestScanOperExecOverflow.java        |   8 +-
 .../impl/scan/TestScanOperExecSmoothing.java       |  14 +-
 .../impl/TestResultSetLoaderTypeConversion.java    |   4 +-
 .../resultSet/impl/TestResultSetReader.java        | 203 +++++++++++++++++++++
 .../physical/rowSet}/TestColumnConverter.java      |   8 +-
 .../physical/rowSet}/TestDummyWriter.java          |   2 +-
 .../physical/rowSet}/TestFillEmpties.java          |   8 +-
 .../physical/rowSet}/TestFixedWidthWriter.java     |   2 +-
 .../physical/rowSet}/TestHyperVectorReaders.java   |  12 +-
 .../physical/rowSet}/TestIndirectReaders.java      |  14 +-
 .../physical/rowSet}/TestMapAccessors.java         |  10 +-
 .../physical/rowSet}/TestOffsetVectorWriter.java   |   4 +-
 .../rowSet}/TestRepeatedListAccessors.java         |  16 +-
 .../test => exec/physical/rowSet}/TestRowSet.java  |  10 +-
 .../physical/rowSet}/TestScalarAccessors.java      |   9 +-
 .../physical/rowSet}/TestSchemaBuilder.java        |   4 +-
 .../physical/rowSet}/TestVariableWidthWriter.java  |   6 +-
 .../physical/rowSet}/TestVariantAccessors.java     |   8 +-
 .../drill/exec/store/mock/TestMockRowReader.java   |  36 ++--
 .../src/main/java/io/netty/buffer/DrillBuf.java    |  10 +-
 .../exec/vector/accessor/ColumnReaderIndex.java    |   7 +
 .../accessor/reader/AbstractScalarReader.java      |   3 +
 .../accessor/reader/AbstractTupleReader.java       |   7 +
 .../vector/accessor/reader/ArrayReaderImpl.java    |  14 +-
 .../vector/accessor/reader/BaseScalarReader.java   |  24 ++-
 .../exec/vector/accessor/reader/ReaderEvents.java  |   1 +
 .../vector/accessor/reader/UnionReaderImpl.java    |  13 +-
 49 files changed, 813 insertions(+), 219 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java
index b22353f..2418381 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java
@@ -36,15 +36,15 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
  */
 
 public interface BatchAccessor {
-  BatchSchema getSchema();
+  BatchSchema schema();
   int schemaVersion();
-  int getRowCount();
-  VectorContainer getOutgoingContainer();
+  int rowCount();
+  VectorContainer container();
   TypedFieldId getValueVectorId(SchemaPath path);
   VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids);
-  WritableBatch getWritableBatch();
-  SelectionVector2 getSelectionVector2();
-  SelectionVector4 getSelectionVector4();
+  WritableBatch writableBatch();
+  SelectionVector2 selectionVector2();
+  SelectionVector4 selectionVector4();
   Iterator<VectorWrapper<?>> iterator();
   void release();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
index 03e6d4e..d599f22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
@@ -178,7 +178,7 @@ public class OperatorDriver {
 
       // Report schema change.
 
-      batchAccessor.getOutgoingContainer().schemaChanged();
+      batchAccessor.container().schemaChanged();
       state = State.RUN;
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
@@ -207,7 +207,7 @@ public class OperatorDriver {
     // trivial changes within this operator.
 
     if (schemaChanged) {
-      batchAccessor.getOutgoingContainer().schemaChanged();
+      batchAccessor.container().schemaChanged();
     }
     if (state == State.STARTING || schemaChanged) {
       schemaVersion = newVersion;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 07f5069..102bb4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -98,14 +98,14 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
   }
 
   @Override
-  public BatchSchema getSchema() { return batchAccessor.getSchema(); }
+  public BatchSchema getSchema() { return batchAccessor.schema(); }
 
   @Override
-  public int getRecordCount() { return batchAccessor.getRowCount(); }
+  public int getRecordCount() { return batchAccessor.rowCount(); }
 
   @Override
   public VectorContainer getOutgoingContainer() {
-    return batchAccessor.getOutgoingContainer();
+    return batchAccessor.container();
   }
 
   @Override
@@ -120,17 +120,17 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
 
   @Override
   public WritableBatch getWritableBatch() {
-    return batchAccessor.getWritableBatch();
+    return batchAccessor.writableBatch();
   }
 
   @Override
   public SelectionVector2 getSelectionVector2() {
-    return batchAccessor.getSelectionVector2();
+    return batchAccessor.selectionVector2();
   }
 
   @Override
   public SelectionVector4 getSelectionVector4() {
-    return batchAccessor.getSelectionVector4();
+    return batchAccessor.selectionVector4();
   }
 
   @Override
@@ -165,7 +165,7 @@ public class OperatorRecordBatch implements CloseableRecordBatch {
 
   @Override
   public VectorContainer getContainer() {
-    return batchAccessor.getOutgoingContainer();
+    return batchAccessor.container();
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
index e97c374..bb6670f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java
@@ -40,7 +40,7 @@ public class VectorContainerAccessor implements BatchAccessor {
     }
 
     @Override
-    public SelectionVector2 getSelectionVector2() {
+    public SelectionVector2 selectionVector2() {
       return sv2;
     }
   }
@@ -50,7 +50,7 @@ public class VectorContainerAccessor implements BatchAccessor {
     private SelectionVector4 sv4;
 
     @Override
-    public SelectionVector4 getSelectionVector4() {
+    public SelectionVector4 selectionVector4() {
       return sv4;
     }
   }
@@ -92,7 +92,7 @@ public class VectorContainerAccessor implements BatchAccessor {
   public int batchCount() { return batchCount; }
 
   @Override
-  public BatchSchema getSchema() {
+  public BatchSchema schema() {
     return container == null ? null : container.getSchema();
   }
 
@@ -100,12 +100,12 @@ public class VectorContainerAccessor implements BatchAccessor {
   public int schemaVersion() { return schemaTracker.schemaVersion(); }
 
   @Override
-  public int getRowCount() {
+  public int rowCount() {
     return container == null ? 0 : container.getRecordCount();
   }
 
   @Override
-  public VectorContainer getOutgoingContainer() { return container; }
+  public VectorContainer container() { return container; }
 
   @Override
   public TypedFieldId getValueVectorId(SchemaPath path) {
@@ -118,19 +118,19 @@ public class VectorContainerAccessor implements BatchAccessor {
   }
 
   @Override
-  public WritableBatch getWritableBatch() {
+  public WritableBatch writableBatch() {
     return WritableBatch.get(container);
   }
 
   @Override
-  public SelectionVector2 getSelectionVector2() {
+  public SelectionVector2 selectionVector2() {
     // Throws an exception by default because containers
     // do not support selection vectors.
     return container.getSelectionVector2();
   }
 
   @Override
-  public SelectionVector4 getSelectionVector4() {
+  public SelectionVector4 selectionVector4() {
     // Throws an exception by default because containers
     // do not support selection vectors.
      return container.getSelectionVector4();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index 866b988..f47eae4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -335,9 +335,9 @@ class ReaderState {
     // in a dummy container.
 
     assert lookahead == null;
-    lookahead = new VectorContainer(scanOp.context.getAllocator(), scanOp.containerAccessor.getSchema());
+    lookahead = new VectorContainer(scanOp.context.getAllocator(), scanOp.containerAccessor.schema());
     lookahead.setRecordCount(0);
-    lookahead.exchange(scanOp.containerAccessor.getOutgoingContainer());
+    lookahead.exchange(scanOp.containerAccessor.container());
     state = state == State.EOF ? State.LOOK_AHEAD_WITH_EOF : State.LOOK_AHEAD;
     return true;
   }
@@ -357,7 +357,7 @@ class ReaderState {
     case LOOK_AHEAD_WITH_EOF:
       // Use batch previously read.
       assert lookahead != null;
-      lookahead.exchange(scanOp.containerAccessor.getOutgoingContainer());
+      lookahead.exchange(scanOp.containerAccessor.container());
       assert lookahead.getRecordCount() == 0;
       lookahead = null;
       if (state == State.LOOK_AHEAD_WITH_EOF) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index 53664af..c905286 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -303,7 +303,7 @@ public class ScanOperatorExec implements OperatorExec {
       // was cleared when closing the reader. Recreate a valid empty
       // batch here to return downstream.
 
-      containerAccessor.getOutgoingContainer().setEmpty();
+      containerAccessor.container().setEmpty();
       state = State.EMPTY;
     } else {
       state = State.END;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
new file mode 100644
index 0000000..ae282fd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet;
+
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+
+/**
+ * Iterates over the set of batches in a result set, providing
+ * a row set reader to iterate over the rows within each batch.
+ * Handles schema changes between batches.
+ * <p>
+ * Designed to handle batches arriving from a single upstream
+ * operator. Uses Drill's strict form of schema identity: that
+ * not only must the column definitions match; the vectors must
+ * be identical from one batch to the next. If the vectors differ,
+ * then this class assumes a new schema has occurred, and will
+ * rebuild all the underlying readers, which can be costly.
+ *
+ * <h4>Protocol</h4>
+ * <ol>
+ * <li>Create an instance, passing in a
+ *     {@link BatchAccessor} to hold the batch and optional
+ *     selection vector.</li>
+ * <li>For each incoming batch:
+ *   <ol>
+ *   <li>Call {@link #start()} to attach the batch. The associated
+ *       {@link BatchAccessor} reports if the schema has changed.</li>
+ *   <li>Call {@link #reader()} to obtain a reader.</li>
+ *   <li>Iterate over the batch using the reader.</li>
+ *   <li>Call {@link #release()} to free the memory for the
+ *       incoming batch. Or, to call {@link #detach()} to keep
+ *       the batch memory.</li>
+ *   </ol>
+ * <li>Call {@link #close()} after all batches are read.</li>
+ * </ol>
+ */
+public interface ResultSetReader {
+
+  /**
+   * Start tracking a new batch in the associated
+   * vector container.
+   */
+  void start();
+
+  /**
+   * Get the row reader for this batch. The row reader is
+   * guaranteed to remain the same for the life of the
+   * result set reader.
+   *
+   * @return the row reader to read rows for the current
+   * batch
+   */
+  RowSetReader reader();
+
+  /**
+   * Detach the batch of data from this reader. Does not
+   * release the memory for that batch.
+   */
+  void detach();
+
+  /**
+   * Detach the batch of data from this reader and release
+   * the memory for that batch. Call this method before
+   * loading the underlying vector container with more
+   * data, then call {@link #start()} after new data is
+   * available.
+   */
+  void release();
+
+  /**
+   * Close this reader. Releases any memory still assigned
+   * to any attached batch. Call {@link #detach()} first if
+   * you want to preserve the batch memory.
+   */
+  void close();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
new file mode 100644
index 0000000..eea2d2d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.resultSet.ResultSetReader;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.physical.rowSet.RowSets;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+public class ResultSetReaderImpl implements ResultSetReader {
+
+  @VisibleForTesting
+  protected enum State {
+      START,
+      BATCH,
+      DETACHED,
+      CLOSED
+  }
+
+  private State state = State.START;
+  private int priorSchemaVersion;
+  private final BatchAccessor batch;
+  private RowSetReader rowSetReader;
+
+  public ResultSetReaderImpl(BatchAccessor batch) {
+    this.batch = batch;
+  }
+
+  @Override
+  public void start() {
+    Preconditions.checkState(state != State.CLOSED, "Reader is closed");
+    Preconditions.checkState(state != State.BATCH,
+        "Call detach/release before starting another batch");
+    Preconditions.checkState(state == State.START ||
+        priorSchemaVersion <= batch.schemaVersion());
+    boolean newSchema = state == State.START ||
+        priorSchemaVersion != batch.schemaVersion();
+    state = State.BATCH;
+
+    // If new schema, discard the old reader (if any, and create
+    // a new one that matches the new schema. If not a new schema,
+    // then the old reader is reused: it points to vectors which
+    // Drill requires be the same vectors as the previous batch,
+    // but with different buffers.
+
+    if (newSchema) {
+      rowSetReader = RowSets.wrap(batch).reader();
+      priorSchemaVersion = batch.schemaVersion();
+    } else {
+      rowSetReader.newBatch();
+    }
+  }
+
+  @Override
+  public RowSetReader reader() {
+    Preconditions.checkState(state == State.BATCH, "Call start() before requesting the reader.");
+    return rowSetReader;
+  }
+
+  @Override
+  public void detach() {
+    if (state != State.START) {
+      Preconditions.checkState(state == State.BATCH || state == State.DETACHED);
+      state = State.DETACHED;
+    }
+  }
+
+  @Override
+  public void release() {
+    if (state != State.START && state != State.DETACHED) {
+      detach();
+      batch.release();
+    }
+  }
+
+  @Override
+  public void close() {
+    if (state != State.CLOSED) {
+      release();
+      state = State.CLOSED;
+    }
+  }
+
+  @VisibleForTesting
+  protected State state() { return state; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
index 9eb927b..cdf3ec9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
@@ -54,4 +54,9 @@ public abstract class ReaderIndex implements ColumnReaderIndex {
     position = rowCount;
     return false;
   }
+
+  @Override
+  public boolean hasNext() {
+    return position + 1 < rowCount;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/AbstractRowSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/AbstractRowSet.java
index 95efee5..b1e29ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/AbstractRowSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/AbstractRowSet.java
@@ -65,6 +65,11 @@ public abstract class AbstractRowSet implements RowSet {
   }
 
   @Override
+  public void print() {
+    RowSetFormatter.print(this);
+  }
+
+  @Override
   public long size() {
     throw new UnsupportedOperationException("Current row set implementation does not support providing size information");
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSet.java
index 79f6ac7..bdbe5c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSet.java
@@ -17,17 +17,18 @@
  */
 package org.apache.drill.exec.physical.rowSet;
 
+import java.util.Set;
+
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-
-import java.util.Set;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 
 /**
  * A row set is a collection of rows stored as value vectors. Elsewhere in
@@ -99,6 +100,13 @@ public interface RowSet {
   SelectionVectorMode indirectionType();
 
   /**
+   * Debug-only tool to visualize a row set for inspection.
+   * <b>Do not</b> use this in production code.
+   */
+  @VisibleForTesting
+  void print();
+
+  /**
    * Return the size in memory of this record set, including indirection
    * vectors, null vectors, offset vectors and the entire (used and unused)
    * data vectors.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
index 9c97ee8..0817eda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
@@ -17,10 +17,12 @@
  */
 package org.apache.drill.exec.physical.rowSet;
 
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
 import org.apache.commons.io.output.StringBuilderWriter;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
 
 import java.io.IOException;
 import java.io.OutputStreamWriter;
@@ -50,6 +52,14 @@ public class RowSetFormatter {
     new RowSetFormatter(rowSet, new OutputStreamWriter(System.out)).write();
   }
 
+  public static void print(VectorContainer container) {
+    RowSets.wrap(container).print();
+  }
+
+  public static void print(BatchAccessor batch) {
+    RowSets.wrap(batch).print();
+  }
+
   public static String toString(RowSet rowSet) {
     StringBuilderWriter out = new StringBuilderWriter();
     new RowSetFormatter(rowSet, out).write();
@@ -86,6 +96,8 @@ public class RowSetFormatter {
       case TWO_BYTE:
         writer.write(" (row #)");
         break;
+      default:
+        break;
     }
     writer.write(": ");
     TupleMetadata schema = reader.tupleSchema();
@@ -103,20 +115,22 @@ public class RowSetFormatter {
   }
 
   private void writeHeader(Writer writer, RowSetReader reader, SelectionVectorMode selectionMode) throws IOException {
-    writer.write(reader.logicalIndex());
+    writer.write(Integer.toString(reader.logicalIndex()));
     switch (selectionMode) {
       case FOUR_BYTE:
         writer.write(" (");
         writer.write(reader.hyperVectorIndex());
         writer.write(", ");
-        writer.write(reader.offset());
+        writer.write(Integer.toString(reader.offset()));
         writer.write(")");
         break;
       case TWO_BYTE:
         writer.write(" (");
-        writer.write(reader.offset());
+        writer.write(Integer.toString(reader.offset()));
         writer.write(")");
         break;
+      default:
+        break;
     }
     writer.write(": ");
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetReader.java
index 22e756f..e76f8a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetReader.java
@@ -20,7 +20,10 @@ package org.apache.drill.exec.physical.rowSet;
 import org.apache.drill.exec.vector.accessor.TupleReader;
 
 /**
- * Reader for all types of row sets.
+ * Reader for all types of row sets: those with or without
+ * a selection vector. Iterates over "bare" row sets in row
+ * order. Iterates over selection-vector based row sets in
+ * selection vector order.
  */
 
 public interface RowSetReader extends TupleReader {
@@ -31,11 +34,53 @@ public interface RowSetReader extends TupleReader {
    */
   int rowCount();
 
+  /**
+   * Convenience method which whether the next call to {@link #next()}
+   * will succeed. Purely optional.
+   *
+   * @return <tt>true</tt> if there is another record to read,
+   * <tt>false</tt> if not
+   */
+  boolean hasNext();
+
+  /**
+   * Advance to the next position. If the underlying row set has
+   * a selection vector, then moves one position in the selection
+   * vector, and to whichever data record is indexed.
+   *
+   * @return <tt>true</tt> if another row is available,
+   * <tt>false</tt> if all rows have been read
+   */
   boolean next();
+
+  /**
+   * Gets the read position within the row set. If the row set has
+   * a selection vector, this is the position in that vector; the
+   * actual record location will likely differ. Use
+   * {@link #offset()} to get the actual row index.
+   *
+   * @return current iteration position
+   */
   int logicalIndex();
+
+  /**
+   * Sets the iteration position. If the row set has a selection
+   * vector, this sets the index within that vector. The index must
+   * be from -1 to the {@link #rowCount()} - 1. Set the value to one
+   * less than the position to be read in the next call to
+   * {@link #next()}. An index of -1 means before the first row.
+   *
+   * @param index the desired index position
+   */
   void setPosition(int index);
 
   /**
+   * Reset the position to before the first row. Convenient method
+   * which is the same as <tt>setPosition(-1)</tt>.
+   */
+  void rewind();
+
+  /**
    * Batch index: 0 for a single batch, batch for the current
    * row is a hyper-batch.
    * @return index of the batch for the current row
@@ -49,4 +94,13 @@ public interface RowSetReader extends TupleReader {
    * @return index of the underlying row
    */
   int offset();
+
+  /**
+   * Bind the reader to a new batch of data. The vectors are
+   * unchanged, but the buffers are different. Assumes the schema
+   * has not changed: the columns and selection vector mode remain
+   * unchanged; only the buffers changed. If the schema changes,
+   * discard this reader and rebuild a new one.
+   */
+  void newBatch();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetReaderImpl.java
index db460f1..b41a97c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetReaderImpl.java
@@ -59,6 +59,11 @@ public class RowSetReaderImpl extends AbstractTupleReader implements RowSetReade
   }
 
   @Override
+  public boolean hasNext() {
+    return readerIndex.hasNext();
+  }
+
+  @Override
   public int logicalIndex() { return readerIndex.logicalIndex(); }
 
   @Override
@@ -72,7 +77,7 @@ public class RowSetReaderImpl extends AbstractTupleReader implements RowSetReade
 
   @Override
   public void setPosition(int index) {
-    this.readerIndex.set(index);
+    readerIndex.set(index);
     reposition();
   }
 
@@ -81,4 +86,13 @@ public class RowSetReaderImpl extends AbstractTupleReader implements RowSetReade
 
   @Override
   public TupleMetadata tupleSchema() { return schema; }
+
+  @Override
+  public void rewind() { setPosition(-1); }
+
+  @Override
+  public void newBatch() {
+    bindBuffer();
+    rewind();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSets.java
new file mode 100644
index 0000000..e1f53e8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSets.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet;
+
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+
+public class RowSets {
+
+  public static RowSet wrap(RecordBatch batch) {
+    VectorContainer container = batch.getContainer();
+    switch (container.getSchema().getSelectionVectorMode()) {
+    case FOUR_BYTE:
+      return HyperRowSetImpl.fromContainer(container, batch.getSelectionVector4());
+    case NONE:
+      return DirectRowSet.fromContainer(container);
+    case TWO_BYTE:
+      return IndirectRowSet.fromSv2(container, batch.getSelectionVector2());
+    default:
+      throw new IllegalStateException("Invalid selection mode");
+    }
+  }
+
+  public static RowSet wrap(VectorContainer container) {
+    switch (container.getSchema().getSelectionVectorMode()) {
+    case FOUR_BYTE:
+      throw new IllegalArgumentException("Build from a batch for SV4");
+    case NONE:
+      return DirectRowSet.fromContainer(container);
+    case TWO_BYTE:
+      throw new IllegalArgumentException("Build from a batch for SV2");
+    default:
+      throw new IllegalStateException("Invalid selection mode");
+    }
+  }
+
+  public static RowSet wrap(BatchAccessor batch) {
+    VectorContainer container = batch.container();
+    switch (container.getSchema().getSelectionVectorMode()) {
+    case FOUR_BYTE:
+      return HyperRowSetImpl.fromContainer(container, batch.selectionVector4());
+    case NONE:
+      return DirectRowSet.fromContainer(container);
+    case TWO_BYTE:
+      return IndirectRowSet.fromSv2(container, batch.selectionVector2());
+    default:
+      throw new IllegalStateException("Invalid selection mode");
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
index c582919..3562eaf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
@@ -25,12 +25,14 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
- * This is a tool for printing the content of record batches to screen. Used for debugging.
+ * Tool for printing the content of record batches to screen. Used for debugging.
+ *
+ * @See {@link org.apache.drill.exec.physical.rowSet.RowSetFormatter RowSetFormatter}
  */
+
 public class BatchPrinter {
   public static void printHyperBatch(VectorAccessible batch, SelectionVector4 sv4) {
     List<String> columns = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
index 0eab2fb..9986609 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
@@ -116,7 +116,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
         return false;
       }
       if (nextCount == schemaChangeAt) {
-        BatchSchemaBuilder newSchema = new BatchSchemaBuilder(batchAccessor.getSchema());
+        BatchSchemaBuilder newSchema = new BatchSchemaBuilder(batchAccessor.schema());
         newSchema.schemaBuilder()
             .add("b", MinorType.VARCHAR);
         VectorContainer newContainer = new VectorContainer(fixture.allocator(), newSchema.build());
@@ -130,7 +130,7 @@ public class TestOperatorRecordBatch extends SubOperatorTest {
 
     @Override
     public void close() {
-      batchAccessor().getOutgoingContainer().clear();
+      batchAccessor().container().clear();
       closeCalled = true;
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index 33d07d7..0e42f67 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -305,7 +305,7 @@ public class TestFileScanFramework extends SubOperatorTest {
     assertTrue(scan.buildSchema());
     assertTrue(reader.openCalled);
     assertEquals(1, reader.batchCount);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Create the expected result.
 
@@ -324,19 +324,19 @@ public class TestFileScanFramework extends SubOperatorTest {
         .addRow(30, "fred", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1, null)
         .addRow(40, "wilma", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1, null)
         .build();
-    assertEquals(expected.batchSchema(), scan.batchAccessor().getSchema());
+    assertEquals(expected.batchSchema(), scan.batchAccessor().schema());
 
     // Next call, return with data.
 
     assertTrue(scan.next());
     RowSetUtilities.verify(expected,
-        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+        fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
     assertTrue(reader.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     scanFixture.close();
   }
@@ -379,19 +379,19 @@ public class TestFileScanFramework extends SubOperatorTest {
     // Schema should include implicit columns.
 
     assertTrue(scan.buildSchema());
-    assertEquals(expectedSchema, scan.batchAccessor().getSchema());
+    assertEquals(expectedSchema, scan.batchAccessor().schema());
     scan.batchAccessor().release();
 
     // Read one batch, should contain implicit columns
 
     assertTrue(scan.next());
     RowSetUtilities.verify(expected,
-        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+        fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
   }
 
@@ -436,19 +436,19 @@ public class TestFileScanFramework extends SubOperatorTest {
     // Schema should include implicit columns.
 
     assertTrue(scan.buildSchema());
-    assertEquals(expectedSchema, scan.batchAccessor().getSchema());
+    assertEquals(expectedSchema, scan.batchAccessor().schema());
     scan.batchAccessor().release();
 
     // Read one batch, should contain implicit columns
 
     assertTrue(scan.next());
     RowSetUtilities.verify(expected,
-        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+        fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
   }
 
@@ -479,19 +479,19 @@ public class TestFileScanFramework extends SubOperatorTest {
     // Schema should include implicit columns.
 
     assertTrue(scan.buildSchema());
-    assertEquals(expectedSchema, scan.batchAccessor().getSchema());
+    assertEquals(expectedSchema, scan.batchAccessor().schema());
     scan.batchAccessor().release();
 
     // Read one batch, should contain implicit columns
 
     assertTrue(scan.next());
     RowSetUtilities.verify(expected,
-        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+        fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
   }
 
@@ -551,17 +551,17 @@ public class TestFileScanFramework extends SubOperatorTest {
         .addSingleCol(new Object[] {20})
         .build();
     assertTrue(scan.buildSchema());
-    assertEquals(expectedSchema, scan.batchAccessor().getSchema());
+    assertEquals(expectedSchema, scan.batchAccessor().schema());
     scan.batchAccessor().release();
 
     assertTrue(scan.next());
     RowSetUtilities.verify(expected,
-         fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+         fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
index 32a5ec1..2566bcd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
@@ -97,7 +97,7 @@ public class TestScanOperExecBasics extends BaseScanOperatorExecTest {
     }
     assertTrue(reader.openCalled);
 
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
     assertTrue(reader.closeCalled);
   }
@@ -132,7 +132,7 @@ public class TestScanOperExecBasics extends BaseScanOperatorExecTest {
     }
     assertTrue(reader.openCalled);
 
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
     assertTrue(reader.closeCalled);
   }
@@ -162,7 +162,7 @@ public class TestScanOperExecBasics extends BaseScanOperatorExecTest {
     }
     assertTrue(reader.openCalled);
 
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
     assertTrue(reader.closeCalled);
   }
@@ -196,7 +196,7 @@ public class TestScanOperExecBasics extends BaseScanOperatorExecTest {
     }
     assertTrue(reader.openCalled);
 
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
     assertTrue(reader.closeCalled);
   }
@@ -393,13 +393,13 @@ public class TestScanOperExecBasics extends BaseScanOperatorExecTest {
 
     assertTrue(scan.buildSchema());
     assertTrue(scan.next());
-    VectorContainer container = scan.batchAccessor().getOutgoingContainer();
+    VectorContainer container = scan.batchAccessor().container();
     assertEquals(0, container.getRecordCount());
     assertEquals(2, container.getNumberOfColumns());
 
     assertTrue(reader1.closeCalled);
     assertTrue(reader2.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     assertFalse(scan.next());
 
     scanFixture.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
index 034314f..5c6fa80 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
@@ -76,18 +76,18 @@ public class TestScanOperExecEarlySchema extends BaseScanOperatorExecTest {
 
     assertTrue(scan.buildSchema());
     assertEquals(0, reader.batchCount);
-    assertEquals(expected.batchSchema(), scan.batchAccessor().getSchema());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(expected.batchSchema(), scan.batchAccessor().schema());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Next call, return with data.
 
     assertTrue(scan.next());
-    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Next again: no-op
 
@@ -120,12 +120,12 @@ public class TestScanOperExecEarlySchema extends BaseScanOperatorExecTest {
     // First batch: return with data.
 
     assertTrue(scan.next());
-    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Next again: no-op
 
@@ -170,17 +170,17 @@ public class TestScanOperExecEarlySchema extends BaseScanOperatorExecTest {
     // First batch: return schema.
 
     assertTrue(scan.buildSchema());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Next call, return with data.
 
     assertTrue(scan.next());
-    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Next again: no-op
 
@@ -208,7 +208,7 @@ public class TestScanOperExecEarlySchema extends BaseScanOperatorExecTest {
 
     assertFalse(scan.buildSchema());
     assertTrue(reader.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     scanFixture.close();
   }
@@ -227,11 +227,11 @@ public class TestScanOperExecEarlySchema extends BaseScanOperatorExecTest {
 
     assertTrue(scan.next());
     assertTrue(reader.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     RowSetUtilities.verify(
         RowSetBuilder.emptyBatch(fixture.allocator(), expectedSchema()),
-        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+        fixture.wrap(scan.batchAccessor().container()));
 
     assertFalse(scan.next());
     scanFixture.close();
@@ -267,14 +267,14 @@ public class TestScanOperExecEarlySchema extends BaseScanOperatorExecTest {
     assertTrue(scan.next());
     assertEquals(1, reader1.batchCount);
     assertEquals(1, scan.batchAccessor().schemaVersion());
-    verifyBatch(0, scan.batchAccessor().getOutgoingContainer());
+    verifyBatch(0, scan.batchAccessor().container());
 
     // Third batch.
 
     assertTrue(scan.next());
     assertEquals(2, reader1.batchCount);
     assertEquals(1, scan.batchAccessor().schemaVersion());
-    verifyBatch(20, scan.batchAccessor().getOutgoingContainer());
+    verifyBatch(20, scan.batchAccessor().container());
 
     // Second reader. First batch includes data, no special first-batch
     // handling for the second reader.
@@ -286,20 +286,20 @@ public class TestScanOperExecEarlySchema extends BaseScanOperatorExecTest {
     assertTrue(reader2.openCalled);
     assertEquals(1, reader2.batchCount);
     assertEquals(1, scan.batchAccessor().schemaVersion());
-    verifyBatch(100, scan.batchAccessor().getOutgoingContainer());
+    verifyBatch(100, scan.batchAccessor().container());
 
     // Second batch from second reader.
 
     assertTrue(scan.next());
     assertEquals(2, reader2.batchCount);
     assertEquals(1, scan.batchAccessor().schemaVersion());
-    verifyBatch(120, scan.batchAccessor().getOutgoingContainer());
+    verifyBatch(120, scan.batchAccessor().container());
 
     // EOF
 
     assertFalse(scan.next());
     assertTrue(reader2.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     scanFixture.close();
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLateSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLateSchema.java
index 32e9fe6..500b343 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLateSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLateSchema.java
@@ -119,24 +119,24 @@ public class TestScanOperExecLateSchema extends BaseScanOperatorExecTest {
     assertTrue(scan.buildSchema());
     assertTrue(reader.openCalled);
     assertEquals(1, reader.batchCount);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Create the expected result.
 
     SingleRowSet expected = makeExpected(20);
     RowSetComparison verifier = new RowSetComparison(expected);
-    assertEquals(expected.batchSchema(), scan.batchAccessor().getSchema());
+    assertEquals(expected.batchSchema(), scan.batchAccessor().schema());
 
     // Next call, return with data.
 
     assertTrue(scan.next());
-    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
     assertTrue(reader.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     scanFixture.close();
   }
@@ -167,19 +167,19 @@ public class TestScanOperExecLateSchema extends BaseScanOperatorExecTest {
 
     assertTrue(scan.next());
     RowSetUtilities.verify(makeExpected(0),
-        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+        fixture.wrap(scan.batchAccessor().container()));
 
     // Second batch.
 
     assertTrue(scan.next());
     RowSetUtilities.verify(makeExpected(20),
-        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+        fixture.wrap(scan.batchAccessor().container()));
 
      // EOF
 
     assertFalse(scan.next());
     assertTrue(reader.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     scanFixture.close();
   }
@@ -299,24 +299,24 @@ public class TestScanOperExecLateSchema extends BaseScanOperatorExecTest {
     assertTrue(scan.buildSchema());
     assertTrue(reader.openCalled);
     assertEquals(1, reader.batchCount);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Create the expected result.
 
     SingleRowSet expected = makeExpected(20);
     RowSetComparison verifier = new RowSetComparison(expected);
-    assertEquals(expected.batchSchema(), scan.batchAccessor().getSchema());
+    assertEquals(expected.batchSchema(), scan.batchAccessor().schema());
 
     // Next call, return with data.
 
     assertTrue(scan.next());
-    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
     assertTrue(reader.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     scanFixture.close();
   }
@@ -371,22 +371,22 @@ public class TestScanOperExecLateSchema extends BaseScanOperatorExecTest {
     assertTrue(scan.buildSchema());
     assertTrue(reader.openCalled);
     assertEquals(1, reader.batchCount);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     SingleRowSet expected = makeExpected();
     RowSetComparison verifier = new RowSetComparison(expected);
-    assertEquals(expected.batchSchema(), scan.batchAccessor().getSchema());
+    assertEquals(expected.batchSchema(), scan.batchAccessor().schema());
 
     // Next call, return with data.
 
     assertTrue(scan.next());
-    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+    verifier.verifyAndClearAll(fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
     assertTrue(reader.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     scanFixture.close();
   }
@@ -414,8 +414,8 @@ public class TestScanOperExecLateSchema extends BaseScanOperatorExecTest {
 
    assertTrue(scan.buildSchema());
    assertEquals(1, reader.batchCount);
-   assertEquals(expected.batchSchema(), scan.batchAccessor().getSchema());
-   assertEquals(0, scan.batchAccessor().getRowCount());
+   assertEquals(expected.batchSchema(), scan.batchAccessor().schema());
+   assertEquals(0, scan.batchAccessor().rowCount());
    scan.batchAccessor().release();
 
    // Second batch. Returns the "look-ahead" batch returned by
@@ -424,20 +424,20 @@ public class TestScanOperExecLateSchema extends BaseScanOperatorExecTest {
    assertTrue(scan.next());
    assertEquals(1, reader.batchCount);
    RowSetUtilities.verify(expected,
-     fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+     fixture.wrap(scan.batchAccessor().container()));
 
    // Third batch, normal case.
 
    assertTrue(scan.next());
    assertEquals(2, reader.batchCount);
    RowSetUtilities.verify(makeExpected(20),
-     fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+     fixture.wrap(scan.batchAccessor().container()));
 
    // EOF
 
    assertFalse(scan.next());
    assertTrue(reader.closeCalled);
-   assertEquals(0, scan.batchAccessor().getRowCount());
+   assertEquals(0, scan.batchAccessor().rowCount());
 
    scanFixture.close();
  }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
index 0792833..f9b7340 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
@@ -130,7 +130,7 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
            .build();
       RowSetUtilities.verify(expected,
-          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+          fixture.wrap(scan.batchAccessor().container()));
     }
 
     // Batch with defaults and null types
@@ -141,7 +141,7 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
           .addRow(10, "foo", 20L, null)
           .build();
       RowSetUtilities.verify(expected,
-          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+          fixture.wrap(scan.batchAccessor().container()));
     }
 
     assertFalse(scan.next());
@@ -186,7 +186,7 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
            .build();
       RowSetUtilities.verify(expected,
-          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+          fixture.wrap(scan.batchAccessor().container()));
     }
 
     // Batch with defaults and null types
@@ -197,7 +197,7 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
           .addRow(10, 20L, 30L, "foo", "bar")
           .build();
       RowSetUtilities.verify(expected,
-          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+          fixture.wrap(scan.batchAccessor().container()));
     }
 
     assertFalse(scan.next());
@@ -237,7 +237,7 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
            .build();
       RowSetUtilities.verify(expected,
-          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+          fixture.wrap(scan.batchAccessor().container()));
     }
 
     // Batch with defaults and null types
@@ -248,7 +248,7 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
           .addRow(10, 20L, 30L)
           .build();
       RowSetUtilities.verify(expected,
-          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+          fixture.wrap(scan.batchAccessor().container()));
     }
 
     assertFalse(scan.next());
@@ -288,7 +288,7 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
       SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
            .build();
       RowSetUtilities.verify(expected,
-          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+          fixture.wrap(scan.batchAccessor().container()));
     }
 
     // Batch with defaults and null types
@@ -299,7 +299,7 @@ public class TestScanOperExecOuputSchema extends BaseScanOperatorExecTest {
           .addRow(20L, 30L)
           .build();
       RowSetUtilities.verify(expected,
-          fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+          fixture.wrap(scan.batchAccessor().container()));
     }
 
     assertFalse(scan.next());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
index 584e197..9eb7761 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
@@ -147,7 +147,7 @@ public class TestScanOperExecOverflow extends BaseScanOperatorExecTest {
     assertTrue(scan.next());
     assertEquals(1, reader1.batchCount);
     assertEquals(1, scan.batchAccessor().schemaVersion());
-    int prevRowCount = scan.batchAccessor().getRowCount();
+    int prevRowCount = scan.batchAccessor().rowCount();
     assertEquals(reader1.rowCount - 1, prevRowCount);
     scan.batchAccessor().release();
 
@@ -158,7 +158,7 @@ public class TestScanOperExecOverflow extends BaseScanOperatorExecTest {
     assertTrue(scan.next());
     assertEquals(2, reader1.batchCount);
     assertEquals(1, scan.batchAccessor().schemaVersion());
-    assertEquals(reader1.rowCount - prevRowCount - 1, scan.batchAccessor().getRowCount());
+    assertEquals(reader1.rowCount - prevRowCount - 1, scan.batchAccessor().rowCount());
     scan.batchAccessor().release();
     int prevReaderRowCount = reader1.rowCount;
 
@@ -168,7 +168,7 @@ public class TestScanOperExecOverflow extends BaseScanOperatorExecTest {
     assertTrue(scan.next());
     assertEquals(eofWithData ? 2 : 3, reader1.batchCount);
     assertEquals(1, scan.batchAccessor().schemaVersion());
-    assertEquals(1, scan.batchAccessor().getRowCount());
+    assertEquals(1, scan.batchAccessor().rowCount());
     assertEquals(prevReaderRowCount, reader1.rowCount);
     scan.batchAccessor().release();
 
@@ -188,7 +188,7 @@ public class TestScanOperExecOverflow extends BaseScanOperatorExecTest {
     // EOF
 
     assertFalse(scan.next());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecSmoothing.java
index cdc8c57..9c7cb65 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecSmoothing.java
@@ -171,7 +171,7 @@ public class TestScanOperExecSmoothing extends BaseScanOperatorExecTest {
         .addRow("20", "wilma")
         .build();
     RowSetUtilities.verify(expected,
-      fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+      fixture.wrap(scan.batchAccessor().container()));
 
     // Second batch from second reader.
 
@@ -182,13 +182,13 @@ public class TestScanOperExecSmoothing extends BaseScanOperatorExecTest {
         .addRow("40", "wilma")
         .build();
     RowSetUtilities.verify(expected,
-      fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+      fixture.wrap(scan.batchAccessor().container()));
 
     // EOF
 
     assertFalse(scan.next());
     assertTrue(reader2.closeCalled);
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     scanFixture.close();
   }
@@ -244,7 +244,7 @@ public class TestScanOperExecSmoothing extends BaseScanOperatorExecTest {
 
     assertTrue(scan.next());
     assertEquals(1, scan.batchAccessor().schemaVersion());
-    verifyBatch(0, scan.batchAccessor().getOutgoingContainer());
+    verifyBatch(0, scan.batchAccessor().container());
 
     // Batch from (a) reader 2
     // Due to schema smoothing, b vector type is left unchanged,
@@ -253,19 +253,19 @@ public class TestScanOperExecSmoothing extends BaseScanOperatorExecTest {
     assertTrue(scan.next());
     assertEquals(1, scan.batchAccessor().schemaVersion());
 
-    SingleRowSet expected = fixture.rowSetBuilder(scan.batchAccessor().getSchema())
+    SingleRowSet expected = fixture.rowSetBuilder(scan.batchAccessor().schema())
         .addRow(111, null)
         .addRow(121, null)
         .build();
     RowSetUtilities.verify(expected,
-        fixture.wrap(scan.batchAccessor().getOutgoingContainer()));
+        fixture.wrap(scan.batchAccessor().container()));
 
     // Batch from (a, b) reader 3
     // Recycles b again, back to being a table column.
 
     assertTrue(scan.next());
     assertEquals(1, scan.batchAccessor().schemaVersion());
-    verifyBatch(200, scan.batchAccessor().getOutgoingContainer());
+    verifyBatch(200, scan.batchAccessor().container());
 
     assertFalse(scan.next());
     scanFixture.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderTypeConversion.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderTypeConversion.java
index 269a416..2114ac8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderTypeConversion.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderTypeConversion.java
@@ -34,10 +34,10 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.TestColumnConverter;
 import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.exec.physical.rowSet.TestColumnConverter.ConverterFactory;
 import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.test.rowSet.test.TestColumnConverter;
-import org.apache.drill.test.rowSet.test.TestColumnConverter.ConverterFactory;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
new file mode 100644
index 0000000..a49f16e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.ResultSetReader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.test.SubOperatorTest;
+import org.junit.Test;
+
+public class TestResultSetReader extends SubOperatorTest {
+
+  public static class BatchGenerator {
+
+    private enum State { SCHEMA1, SCHEMA2 };
+
+    private final ResultSetLoader rsLoader;
+    private final VectorContainerAccessor batch = new VectorContainerAccessor();
+    private State state;
+
+    public BatchGenerator() {
+      TupleMetadata schema1 = new SchemaBuilder()
+          .add("id", MinorType.INT)
+          .add("name", MinorType.VARCHAR)
+          .build();
+      ResultSetOptions options = new OptionBuilder()
+          .setSchema(schema1)
+          .build();
+      rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+      state = State.SCHEMA1;
+    }
+
+    public void batch1(int start, int end) {
+      Preconditions.checkState(state == State.SCHEMA1);
+      rsLoader.startBatch();
+      RowSetLoader writer = rsLoader.writer();
+      for (int i = start; i <= end; i++) {
+        writer.start();
+        writer.scalar("id").setInt(i);
+        writer.scalar("name").setString("Row" + i);
+        writer.save();
+      }
+      batch.addBatch(rsLoader.harvest());
+    }
+
+    public void batch2(int start, int end) {
+      RowSetLoader writer = rsLoader.writer();
+      if (state == State.SCHEMA1) {
+        ColumnMetadata balCol = MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED);
+        writer.addColumn(balCol);
+        state = State.SCHEMA2;
+      }
+      rsLoader.startBatch();
+      for (int i = start; i <= end; i++) {
+        writer.start();
+        writer.scalar("id").setInt(i);
+        writer.scalar("name").setString("Row" + i);
+        writer.scalar("amount").setInt(i * 10);
+        writer.save();
+      }
+      batch.addBatch(rsLoader.harvest());
+    }
+
+    public BatchAccessor batchAccessor() {
+      return batch;
+    }
+
+    public void close() {
+      rsLoader.close();
+    }
+  }
+
+  @Test
+  public void testBasics() {
+    BatchGenerator gen = new BatchGenerator();
+    ResultSetReader rsReader = new ResultSetReaderImpl(gen.batchAccessor());
+
+    // Start state
+
+    try {
+      rsReader.reader();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // OK to detach with no input
+    rsReader.detach();
+    rsReader.release();
+
+    // Make a batch. Verify reader is attached.
+    // (Don't need to do a full reader test, that is already done
+    // elsewhere.)
+
+    gen.batch1(1, 10);
+    rsReader.start();
+    RowSetReader reader1;
+    {
+      RowSetReader reader = rsReader.reader();
+      reader1 = reader;
+      assertTrue(reader.next());
+      assertEquals(1, reader.scalar("id").getInt());
+      assertEquals("Row1", reader.scalar("name").getString());
+    }
+    rsReader.release();
+    try {
+      rsReader.reader();
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Another batch of same schema
+
+    gen.batch1(11, 20);
+    rsReader.start();
+    {
+      RowSetReader reader = rsReader.reader();
+      assertSame(reader1, reader);
+      reader1 = reader;
+      assertTrue(reader.next());
+      assertEquals(11, reader.scalar("id").getInt());
+      assertEquals("Row11", reader.scalar("name").getString());
+    }
+    rsReader.release();
+
+    // Batch with new schema
+
+    gen.batch2(21, 30);
+    rsReader.start();
+    {
+      RowSetReader reader = rsReader.reader();
+      assertNotSame(reader1, reader);
+      reader1 = reader;
+      assertTrue(reader.next());
+      assertEquals(21, reader.scalar("id").getInt());
+      assertEquals("Row21", reader.scalar("name").getString());
+      assertEquals(210, reader.scalar("amount").getInt());
+    }
+    rsReader.release();
+
+    rsReader.close();
+  }
+
+  @Test
+  public void testCloseAtStart() {
+    BatchGenerator gen = new BatchGenerator();
+    ResultSetReaderImpl rsReader = new ResultSetReaderImpl(gen.batchAccessor());
+
+    // Close OK in start state
+
+    rsReader.close();
+    assertEquals(ResultSetReaderImpl.State.CLOSED, rsReader.state());
+
+    // Second close OK
+
+    rsReader.close();
+  }
+
+  @Test
+  public void testAutoRelease() {
+    BatchGenerator gen = new BatchGenerator();
+    ResultSetReader rsReader = new ResultSetReaderImpl(gen.batchAccessor());
+    gen.batch1(1, 10);
+    rsReader.start();
+
+    // If the test fails with open allocators, then the following failed.
+
+    rsReader.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestColumnConverter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestColumnConverter.java
similarity index 99%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestColumnConverter.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestColumnConverter.java
index ef8d0df..5baf70e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestColumnConverter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestColumnConverter.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
@@ -31,6 +31,7 @@ import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -43,15 +44,12 @@ import org.apache.drill.exec.vector.accessor.convert.StandardConversions;
 import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionDefn;
 import org.apache.drill.exec.vector.accessor.convert.StandardConversions.ConversionType;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Instant;
 import org.joda.time.LocalDate;
 import org.joda.time.LocalTime;
 import org.joda.time.Period;
-import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
-import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestDummyWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java
similarity index 99%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestDummyWriter.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java
index a59b83c..59be129 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestDummyWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFillEmpties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestFillEmpties.java
similarity index 98%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFillEmpties.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestFillEmpties.java
index 7b2dbd0..6ccb78a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFillEmpties.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestFillEmpties.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -27,6 +27,8 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
@@ -34,11 +36,7 @@ import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.exec.physical.rowSet.RowSetWriter;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestFixedWidthWriter.java
similarity index 99%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestFixedWidthWriter.java
index f7304e9..c74d526 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestFixedWidthWriter.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestHyperVectorReaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestHyperVectorReaders.java
similarity index 98%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestHyperVectorReaders.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestHyperVectorReaders.java
index 928a434..a115215 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestHyperVectorReaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestHyperVectorReaders.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
@@ -27,18 +27,14 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.HyperRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.HyperRowSetImpl;
-import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.HyperRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.exec.physical.rowSet.RowSetWriter;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestIndirectReaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestIndirectReaders.java
similarity index 96%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestIndirectReaders.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestIndirectReaders.java
index 37c1468..887a05b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestIndirectReaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestIndirectReaders.java
@@ -15,10 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -27,17 +32,10 @@ import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSetWriter;
-import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Test reading with an indirection vector (sv2.) This form of
  * indirection vector reorders values within a single batch.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestMapAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
similarity index 98%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestMapAccessors.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
index c171b38..581e164 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestMapAccessors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
@@ -28,6 +28,7 @@ import java.util.Iterator;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -41,13 +42,8 @@ import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.exec.physical.rowSet.RowSetWriter;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestOffsetVectorWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestOffsetVectorWriter.java
similarity index 99%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestOffsetVectorWriter.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestOffsetVectorWriter.java
index 91396e6..b396234 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestOffsetVectorWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestOffsetVectorWriter.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.TestFixedWidthWriter.TestIndex;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.vector.UInt4Vector;
@@ -33,7 +34,6 @@ import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriterImpl;
 import org.apache.drill.exec.vector.accessor.writer.WriterEvents.ColumnWriterListener;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.test.TestFixedWidthWriter.TestIndex;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestRepeatedListAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRepeatedListAccessors.java
similarity index 98%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestRepeatedListAccessors.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRepeatedListAccessors.java
index f400365..9fbaa14 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestRepeatedListAccessors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRepeatedListAccessors.java
@@ -15,8 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
+import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -24,17 +26,17 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
-import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.BatchSchemaBuilder;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.ColumnMetadata.StructureType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.RepeatedVarCharVector;
@@ -48,15 +50,9 @@ import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
-import org.apache.drill.exec.record.metadata.ColumnMetadata.StructureType;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.DirectRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.exec.physical.rowSet.RowSetWriter;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
similarity index 99%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestRowSet.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
index 167053f..4f4d0fc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
 import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
@@ -30,9 +30,12 @@ import java.util.Arrays;
 
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VectorOverflowException;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ObjectType;
@@ -44,12 +47,7 @@ import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.DirectRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.exec.physical.rowSet.RowSetWriter;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestScalarAccessors.java
similarity index 99%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestScalarAccessors.java
index 55575d1..efe543d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestScalarAccessors.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.dec;
 import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
@@ -33,6 +33,7 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.SimpleVectorWrapper;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -46,13 +47,7 @@ import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.DirectRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.exec.physical.rowSet.RowSetWriter;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Instant;
 import org.joda.time.LocalDate;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestSchemaBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
similarity index 99%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestSchemaBuilder.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
index 724bec4..673050a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestSchemaBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -23,9 +23,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestVariableWidthWriter.java
similarity index 98%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestVariableWidthWriter.java
index c40bbd7..69190d2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestVariableWidthWriter.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.TestFixedWidthWriter.TestIndex;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.vector.VarCharVector;
@@ -31,12 +32,11 @@ import org.apache.drill.exec.vector.accessor.ColumnAccessors.VarCharColumnWriter
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.accessor.writer.WriterEvents.ColumnWriterListener;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.test.TestFixedWidthWriter.TestIndex;
 import org.bouncycastle.util.Arrays;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 
 @Category(RowSetTests.class)
 public class TestVariableWidthWriter extends SubOperatorTest {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariantAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestVariantAccessors.java
similarity index 99%
rename from exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariantAccessors.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestVariantAccessors.java
index 5b343e5..a0dd8ad 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariantAccessors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestVariantAccessors.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.test.rowSet.test;
+package org.apache.drill.exec.physical.rowSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -29,6 +29,8 @@ import java.util.List;
 import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -52,10 +54,6 @@ import org.apache.drill.exec.vector.complex.ListVector;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.UnionVector;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
-import org.apache.drill.exec.physical.rowSet.RowSetWriter;
-import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
index 160f2e0..cdf0e8c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
@@ -95,15 +95,15 @@ public class TestMockRowReader extends SubOperatorTest {
         .add("b", MinorType.VARCHAR, 10) // Width is reflected in meta-data
         .buildSchema();
     BatchSchema expectedBatchSchema = new BatchSchema(SelectionVectorMode.NONE, expectedSchema.toFieldList());
-    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().schema()));
+    assertEquals(0, scan.batchAccessor().rowCount());
     scan.batchAccessor().release();
 
     // Next call, return with data.
 
     assertTrue(scan.next());
-    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
-    assertEquals(rowCount, scan.batchAccessor().getRowCount());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().schema()));
+    assertEquals(rowCount, scan.batchAccessor().rowCount());
     scan.batchAccessor().release();
 
     // EOF
@@ -146,14 +146,14 @@ public class TestMockRowReader extends SubOperatorTest {
         .addNullable("b", MinorType.VARCHAR, 10)
         .build();
     BatchSchema expectedBatchSchema = new BatchSchema(SelectionVectorMode.NONE, expectedSchema.toFieldList());
-    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().schema()));
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Next call, return with data.
 
     assertTrue(scan.next());
-    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
-    assertEquals(rowCount, scan.batchAccessor().getRowCount());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().schema()));
+    assertEquals(rowCount, scan.batchAccessor().rowCount());
     scan.batchAccessor().release();
 
     // EOF
@@ -195,14 +195,14 @@ public class TestMockRowReader extends SubOperatorTest {
         .add("b", MinorType.VARCHAR, 10)
         .build();
     BatchSchema expectedBatchSchema = new BatchSchema(SelectionVectorMode.NONE, expectedSchema.toFieldList());
-    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().schema()));
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Next call, return with data.
 
     assertTrue(scan.next());
-    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().getSchema()));
-    assertEquals(rowCount, scan.batchAccessor().getRowCount());
+    assertTrue(expectedBatchSchema.isEquivalent(scan.batchAccessor().schema()));
+    assertEquals(rowCount, scan.batchAccessor().rowCount());
     scan.batchAccessor().release();
 
     // EOF
@@ -238,16 +238,16 @@ public class TestMockRowReader extends SubOperatorTest {
     // empty first batch.
 
     assertTrue(scan.buildSchema());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Next call, return with data, limited by batch size.
 
     assertTrue(scan.next());
-    assertEquals(batchSize, scan.batchAccessor().getRowCount());
+    assertEquals(batchSize, scan.batchAccessor().rowCount());
     scan.batchAccessor().release();
 
     assertTrue(scan.next());
-    assertEquals(batchSize, scan.batchAccessor().getRowCount());
+    assertEquals(batchSize, scan.batchAccessor().rowCount());
     scan.batchAccessor().release();
 
     // EOF
@@ -283,16 +283,16 @@ public class TestMockRowReader extends SubOperatorTest {
     // empty first batch.
 
     assertTrue(scan.buildSchema());
-    assertEquals(0, scan.batchAccessor().getRowCount());
+    assertEquals(0, scan.batchAccessor().rowCount());
 
     // Next call, return with data, limited by batch size.
 
     int totalRowCount = 0;
     int batchCount = 0;
     while(scan.next()) {
-      assertTrue(scan.batchAccessor().getRowCount() < ValueVector.MAX_ROW_COUNT);
+      assertTrue(scan.batchAccessor().rowCount() < ValueVector.MAX_ROW_COUNT);
       BatchAccessor batchAccessor = scan.batchAccessor();
-      totalRowCount += batchAccessor.getRowCount();
+      totalRowCount += batchAccessor.rowCount();
       batchCount++;
       batchAccessor.release();
     }
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 3eacc00..b9d9558 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -35,7 +35,6 @@ import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
 import org.apache.drill.exec.memory.BoundsChecking;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.BufferManager;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import io.netty.util.internal.PlatformDependent;
@@ -460,8 +459,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   @Override
   public long getLong(int index) {
     chk(index, 8);
-    final long v = PlatformDependent.getLong(addr(index));
-    return v;
+    return PlatformDependent.getLong(addr(index));
   }
 
   @Override
@@ -487,8 +485,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   @Override
   public int getInt(int index) {
     chk(index, 4);
-    final int v = PlatformDependent.getInt(addr(index));
-    return v;
+    return PlatformDependent.getInt(addr(index));
   }
 
   @Override
@@ -499,8 +496,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   @Override
   public short getShort(int index) {
     chk(index, 2);
-    short v = PlatformDependent.getShort(addr(index));
-    return v;
+    return PlatformDependent.getShort(addr(index));
   }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnReaderIndex.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnReaderIndex.java
index b96b92c..878da91 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnReaderIndex.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnReaderIndex.java
@@ -112,6 +112,13 @@ public interface ColumnReaderIndex {
   boolean next();
 
   /**
+   * Reports if the index has another item.
+   * @return <true> if more rows remain. That is, if a
+   * call to {@link #next()} would return <tt>true</tt>.
+   */
+  boolean hasNext();
+
+  /**
    * Return the number of items that this index indexes: top-level record
    * count for the root index; total element count for nested arrays.
    *
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractScalarReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractScalarReader.java
index 7d8190d..2e729e8 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractScalarReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractScalarReader.java
@@ -83,6 +83,9 @@ public abstract class AbstractScalarReader implements ScalarReader, ReaderEvents
 
     @Override
     public ColumnMetadata schema() { return schema; }
+
+    @Override
+    public void bindBuffer() { }
   }
 
   protected ColumnReaderIndex vectorIndex;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
index 9796cb0..65443aa 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
@@ -88,6 +88,13 @@ public abstract class AbstractTupleReader implements TupleReader, ReaderEvents {
   }
 
   @Override
+  public void bindBuffer() {
+    for (int i = 0; i < readers.length; i++) {
+      readers[i].events().bindBuffer();
+    }
+  }
+
+  @Override
   public NullStateReader nullStateReader() { return nullStateReader; }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
index bf13c87..6f66869 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
@@ -31,7 +31,6 @@ import org.apache.drill.exec.vector.accessor.ObjectType;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.TupleReader;
 import org.apache.drill.exec.vector.accessor.VariantReader;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 /**
@@ -49,7 +48,7 @@ public class ArrayReaderImpl implements ArrayReader, ReaderEvents {
 
   public static class ArrayObjectReader extends AbstractObjectReader {
 
-    private ArrayReaderImpl arrayReader;
+    private final ArrayReaderImpl arrayReader;
 
     public ArrayObjectReader(ArrayReaderImpl arrayReader) {
        this.arrayReader = arrayReader;
@@ -150,6 +149,11 @@ public class ArrayReaderImpl implements ArrayReader, ReaderEvents {
       return false;
     }
 
+    @Override
+    public boolean hasNext() {
+      return position + 1 < length;
+    }
+
     /**
      * Set the current iterator location to the given index offset.
      *
@@ -367,6 +371,12 @@ public class ArrayReaderImpl implements ArrayReader, ReaderEvents {
     elementIndex.rewind();
   }
 
+
+  @Override
+  public void bindBuffer() {
+    elementReader.events().bindBuffer();
+  }
+
   @Override
   public ObjectReader entry() { return elementReader; }
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
index 279fb58..a621207 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
@@ -53,6 +53,12 @@ public abstract class BaseScalarReader extends AbstractScalarReader {
       super.bindIndex(index);
       offsetsReader.bindIndex(index);
     }
+
+    @Override
+    public void bindBuffer() {
+      super.bindBuffer();
+      offsetsReader.bindBuffer();
+    }
   }
 
   /**
@@ -61,12 +67,20 @@ public abstract class BaseScalarReader extends AbstractScalarReader {
 
   public interface BufferAccessor {
     DrillBuf buffer();
+    void rebind();
   }
 
   private static class SingleVectorBufferAccessor implements BufferAccessor {
-    private final DrillBuf buffer;
+    private final VectorAccessor va;
+    private DrillBuf buffer;
 
     public SingleVectorBufferAccessor(VectorAccessor va) {
+      this.va = va;
+      rebind();
+    }
+
+    @Override
+    public void rebind() {
       BaseDataValueVector vector = va.vector();
       buffer = vector.getBuffer();
     }
@@ -87,6 +101,9 @@ public abstract class BaseScalarReader extends AbstractScalarReader {
       BaseDataValueVector vector = vectorAccessor.vector();
       return vector.getBuffer();
     }
+
+    @Override
+    public void rebind() { }
   }
 
   protected ColumnMetadata schema;
@@ -148,4 +165,9 @@ public abstract class BaseScalarReader extends AbstractScalarReader {
 
   @Override
   public ColumnMetadata schema() { return schema; }
+
+  @Override
+  public void bindBuffer() {
+    bufferAccessor.rebind();
+  }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ReaderEvents.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ReaderEvents.java
index 2f75946..4fba20a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ReaderEvents.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ReaderEvents.java
@@ -28,4 +28,5 @@ public interface ReaderEvents {
   void bindNullState(NullStateReader nullStateReader);
   NullStateReader nullStateReader();
   void reposition();
+  void bindBuffer();
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
index 45ced7a..d766bb7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
@@ -51,7 +51,7 @@ public class UnionReaderImpl implements VariantReader, ReaderEvents {
 
   public static class UnionObjectReader extends AbstractObjectReader {
 
-    private UnionReaderImpl reader;
+    private final UnionReaderImpl reader;
 
     public UnionObjectReader(UnionReaderImpl reader) {
       this.reader = reader;
@@ -183,6 +183,15 @@ public class UnionReaderImpl implements VariantReader, ReaderEvents {
   }
 
   @Override
+  public void bindBuffer() {
+    for (int i = 0; i < variants.length; i++) {
+      if (variants[i] != null) {
+        variants[i].events().bindBuffer();
+      }
+    }
+  }
+
+  @Override
   public boolean isNull() {
     return nullStateReader.isNull();
   }
@@ -193,7 +202,7 @@ public class UnionReaderImpl implements VariantReader, ReaderEvents {
     if (typeCode == UnionVector.NULL_MARKER) {
       return null;
     }
-    return MinorType.valueOf(typeCode);
+    return MinorType.forNumber(typeCode);
   }
 
   @Override