You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/12/17 12:48:35 UTC

[drill] 12/12: DRILL-7486: Refactor row set reader builders

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5339fc23eb1b177bbc20ed74637e6b11e3ffa803
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Thu Dec 12 17:37:53 2019 -0800

    DRILL-7486: Refactor row set reader builders
    
    Moves reader building code into a shared location, independent
    of the RowSet class. Allows readers to be built from a
    VectorContainer in addition to a row set.
    
    closes #1928
---
 .../physical/resultSet/impl/WriterIndexImpl.java   |  2 +-
 ...stractReaderBuilder.java => ReaderBuilder.java} | 29 ++++++++++++-
 ...eReaderBuilder.java => HyperReaderBuilder.java} | 49 +++++++++++----------
 .../resultSet/model/single/DirectRowIndex.java     |  5 ++-
 ...ReaderBuilder.java => SimpleReaderBuilder.java} | 50 ++++++++++++++++++++--
 .../exec/physical/rowSet/AbstractSingleRowSet.java | 19 ++------
 .../drill/exec/physical/rowSet/DirectRowSet.java   |  2 +-
 .../HyperRowIndex.java}                            | 27 +++++++-----
 .../exec/physical/rowSet/HyperRowSetImpl.java      | 15 ++-----
 .../IndirectRowIndex.java}                         | 24 +++++++----
 .../drill/exec/physical/rowSet/IndirectRowSet.java | 35 +++------------
 .../exec/physical/rowSet/RowSetFormatter.java      |  5 ---
 12 files changed, 148 insertions(+), 114 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
index ce830d7..fe73cbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
@@ -39,7 +39,7 @@ import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 class WriterIndexImpl implements ColumnWriterIndex {
 
   private final ResultSetLoader rsLoader;
-  private int rowIndex = 0;
+  private int rowIndex;
 
   public WriterIndexImpl(ResultSetLoader rsLoader) {
     this.rsLoader = rsLoader;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/AbstractReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderBuilder.java
similarity index 60%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/AbstractReaderBuilder.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderBuilder.java
index ea8e11b..ae217d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/AbstractReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderBuilder.java
@@ -18,6 +18,12 @@
 package org.apache.drill.exec.physical.resultSet.model;
 
 import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.resultSet.model.hyper.HyperReaderBuilder;
+import org.apache.drill.exec.physical.resultSet.model.single.SimpleReaderBuilder;
+import org.apache.drill.exec.physical.rowSet.RowSetReaderImpl;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
 import org.apache.drill.exec.vector.accessor.reader.ArrayReaderImpl;
@@ -25,7 +31,27 @@ import org.apache.drill.exec.vector.accessor.reader.BaseScalarReader;
 import org.apache.drill.exec.vector.accessor.reader.ColumnReaderFactory;
 import org.apache.drill.exec.vector.accessor.reader.VectorAccessor;
 
-public abstract class AbstractReaderBuilder {
+public abstract class ReaderBuilder {
+
+  public static RowSetReaderImpl buildReader(BatchAccessor batch) {
+    if (batch.schema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) {
+      try {
+        return HyperReaderBuilder.build(batch);
+      } catch (SchemaChangeException e) {
+
+        // The caller is responsible for ensuring that the hyper-batch
+        // has a consistent schema. If it is possible that the schema is
+        // inconsistent, then call the build() method directory and
+        // "do the right thing", which is pretty much to fail, as a
+        // hyper-batch is a very awkward place to discover an inconsistent
+        // schema.
+
+        throw new IllegalStateException("Hyper-batch contains an inconsistent schema", e);
+      }
+    } else {
+      return SimpleReaderBuilder.build(batch);
+    }
+  }
 
   protected AbstractObjectReader buildScalarReader(VectorAccessor va, ColumnMetadata schema) {
     BaseScalarReader scalarReader = ColumnReaderFactory.buildColumnReader(va);
@@ -41,5 +67,4 @@ public abstract class AbstractReaderBuilder {
       throw new UnsupportedOperationException(mode.toString());
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java
similarity index 85%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/BaseReaderBuilder.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java
index c83f018..533d6c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/BaseReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java
@@ -23,8 +23,10 @@ import java.util.List;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.resultSet.model.AbstractReaderBuilder;
-import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.resultSet.model.ReaderBuilder;
+import org.apache.drill.exec.physical.rowSet.HyperRowIndex;
+import org.apache.drill.exec.physical.rowSet.RowSetReaderImpl;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -33,7 +35,6 @@ import org.apache.drill.exec.record.metadata.VariantMetadata;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
-import org.apache.drill.exec.vector.accessor.impl.AccessorUtilities;
 import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
 import org.apache.drill.exec.vector.accessor.reader.ArrayReaderImpl;
 import org.apache.drill.exec.vector.accessor.reader.MapReader;
@@ -71,31 +72,33 @@ import org.apache.drill.exec.vector.accessor.reader.VectorAccessors.BaseHyperVec
  * the outer vector accessor.)
  */
 
-public abstract class BaseReaderBuilder extends AbstractReaderBuilder {
+public class HyperReaderBuilder extends ReaderBuilder {
 
-  /**
-   * Read-only row index into the hyper row set with batch and index
-   * values mapping via an SV4.
-   */
-
-  public static class HyperRowIndex extends ReaderIndex {
+  private static final HyperReaderBuilder INSTANCE = new HyperReaderBuilder();
 
-    private final SelectionVector4 sv4;
+  private HyperReaderBuilder() { }
 
-    public HyperRowIndex(SelectionVector4 sv4) {
-      super(sv4.getCount());
-      this.sv4 = sv4;
-    }
+  public static RowSetReaderImpl build(VectorContainer container, TupleMetadata schema, SelectionVector4 sv4) {
+    HyperRowIndex rowIndex = new HyperRowIndex(sv4);
+    return new RowSetReaderImpl(schema, rowIndex,
+        INSTANCE.buildContainerChildren(container, schema));
+  }
 
-    @Override
-    public int offset() {
-      return AccessorUtilities.sv4Index(sv4.get(position));
-    }
+  /**
+   * Build a hyper-batch reader given a batch accessor.
+   *
+   * @param batch wrapper which provides the container and SV4
+   * @return a row set reader for the hyper-batch
+   * @throws SchemaChangeException if the individual batches have
+   * inconsistent schemas (say, a column in batch 1 is an INT, but in
+   * batch 2 it is a VARCHAR)
+   */
 
-    @Override
-    public int hyperVectorIndex( ) {
-      return AccessorUtilities.sv4Batch(sv4.get(position));
-    }
+  public static RowSetReaderImpl build(BatchAccessor batch) throws SchemaChangeException {
+    VectorContainer container = batch.container();
+    return build(container,
+        new HyperSchemaInference().infer(container),
+        batch.selectionVector4());
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
index ea64ac1..8fada11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.resultSet.model.single;
 
 import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
+import org.apache.drill.exec.record.VectorContainer;
 
 /**
  * Reader index that points directly to each row in the row set.
@@ -28,8 +29,8 @@ import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
 
 public class DirectRowIndex extends ReaderIndex {
 
-  public DirectRowIndex(int rowCount) {
-    super(rowCount);
+  public DirectRowIndex(VectorContainer container) {
+    super(container.getRecordCount());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
similarity index 81%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
index 3bbe6ab..0d330d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
@@ -23,10 +23,18 @@ import java.util.List;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.resultSet.model.AbstractReaderBuilder;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
 import org.apache.drill.exec.physical.resultSet.model.MetadataProvider;
+import org.apache.drill.exec.physical.resultSet.model.MetadataProvider.MetadataCreator;
+import org.apache.drill.exec.physical.resultSet.model.MetadataProvider.MetadataRetrieval;
 import org.apache.drill.exec.physical.resultSet.model.MetadataProvider.VectorDescrip;
+import org.apache.drill.exec.physical.resultSet.model.ReaderBuilder;
+import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
+import org.apache.drill.exec.physical.rowSet.IndirectRowIndex;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReaderImpl;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
 import org.apache.drill.exec.vector.accessor.reader.AbstractScalarReader;
@@ -46,7 +54,7 @@ import org.apache.drill.exec.vector.complex.UnionVector;
  * <p>
  * Derived classes handle the details of the various kinds of readers.
  * Today there is a single subclass that builds (test-time)
- * {@link org.apache.drill.exec.physical.rowSet.RowSet} objects. The idea, however, is that we may eventually
+ * {@link RowSet} objects. The idea, however, is that we may eventually
  * want to create a "result set reader" for use in internal operators,
  * in parallel to the "result set loader". The result set reader would
  * handle a stream of incoming batches. The extant RowSet class handles
@@ -59,9 +67,43 @@ import org.apache.drill.exec.vector.complex.UnionVector;
  * quite complex.
  */
 
-public abstract class BaseReaderBuilder extends AbstractReaderBuilder {
+public class SimpleReaderBuilder extends ReaderBuilder {
 
-  protected List<AbstractObjectReader> buildContainerChildren(
+  private static final SimpleReaderBuilder INSTANCE = new SimpleReaderBuilder();
+
+  private SimpleReaderBuilder() { }
+
+  public static RowSetReaderImpl build(VectorContainer container,
+      TupleMetadata schema, ReaderIndex rowIndex) {
+    return new RowSetReaderImpl(schema, rowIndex,
+        INSTANCE.buildContainerChildren(container,
+           new MetadataRetrieval(schema)));
+  }
+
+  public static RowSetReaderImpl build(VectorContainer container, ReaderIndex rowIndex) {
+    MetadataCreator mdCreator = new MetadataCreator();
+    List<AbstractObjectReader> children = INSTANCE.buildContainerChildren(container,
+        mdCreator);
+    return new RowSetReaderImpl(mdCreator.tuple(), rowIndex, children);
+  }
+
+  public static RowSetReaderImpl build(BatchAccessor batch) {
+    return SimpleReaderBuilder.build(batch.container(),
+        readerIndex(batch));
+  }
+
+  public static ReaderIndex readerIndex(BatchAccessor batch) {
+    switch (batch.schema().getSelectionVectorMode()) {
+    case TWO_BYTE:
+      return new IndirectRowIndex(batch.selectionVector2());
+    case NONE:
+      return new DirectRowIndex(batch.container());
+    default:
+      throw new UnsupportedOperationException("Cannot use this method for a hyper-batch");
+    }
+  }
+
+  public List<AbstractObjectReader> buildContainerChildren(
       VectorContainer container, MetadataProvider mdProvider) {
     final List<AbstractObjectReader> readers = new ArrayList<>();
     for (int i = 0; i < container.getNumberOfColumns(); i++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/AbstractSingleRowSet.java
index 398ed33..6d955c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/AbstractSingleRowSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/AbstractSingleRowSet.java
@@ -17,13 +17,12 @@
  */
 package org.apache.drill.exec.physical.rowSet;
 
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
-import org.apache.drill.exec.physical.resultSet.model.MetadataProvider.MetadataRetrieval;
-import org.apache.drill.exec.physical.resultSet.model.single.BaseReaderBuilder;
+import org.apache.drill.exec.physical.resultSet.model.single.SimpleReaderBuilder;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 
 /**
  * Base class for row sets backed by a single record batch.
@@ -31,16 +30,6 @@ import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 
 public abstract class AbstractSingleRowSet extends AbstractRowSet implements SingleRowSet {
 
-  public static class RowSetReaderBuilder extends BaseReaderBuilder {
-
-    public RowSetReader buildReader(AbstractSingleRowSet rowSet, ReaderIndex rowIndex) {
-      TupleMetadata schema = rowSet.schema();
-      return new RowSetReaderImpl(schema, rowIndex,
-          buildContainerChildren(rowSet.container(),
-              new MetadataRetrieval(schema)));
-    }
-  }
-
   protected AbstractSingleRowSet(AbstractSingleRowSet rowSet) {
     super(rowSet.container, rowSet.schema);
   }
@@ -64,6 +53,6 @@ public abstract class AbstractSingleRowSet extends AbstractRowSet implements Sin
    * (non-map) vectors.
    */
   protected RowSetReader buildReader(ReaderIndex rowIndex) {
-    return new RowSetReaderBuilder().buildReader(this, rowIndex);
+    return SimpleReaderBuilder.build(container(), schema, rowIndex);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/DirectRowSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/DirectRowSet.java
index 0164819..67dd7e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/DirectRowSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/DirectRowSet.java
@@ -122,7 +122,7 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
 
   @Override
   public RowSetReader reader() {
-    return buildReader(new DirectRowIndex(rowCount()));
+    return buildReader(new DirectRowIndex(container));
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
similarity index 58%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
index ea64ac1..1e1cd5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
@@ -15,26 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.resultSet.model.single;
+package org.apache.drill.exec.physical.rowSet;
 
 import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.accessor.impl.AccessorUtilities;
 
 /**
- * Reader index that points directly to each row in the row set.
- * This index starts with pointing to the -1st row, so that the
- * reader can require a <tt>next()</tt> for every row, including
- * the first. (This is the JDBC <tt>RecordSet</tt> convention.)
+ * Read-only row index into the hyper row set with batch and index
+ * values mapping via an SV4.
  */
 
-public class DirectRowIndex extends ReaderIndex {
+public class HyperRowIndex extends ReaderIndex {
 
-  public DirectRowIndex(int rowCount) {
-    super(rowCount);
+  private final SelectionVector4 sv4;
+
+  public HyperRowIndex(SelectionVector4 sv4) {
+    super(sv4.getCount());
+    this.sv4 = sv4;
   }
 
   @Override
-  public int offset() { return position; }
+  public int offset() {
+    return AccessorUtilities.sv4Index(sv4.get(position));
+  }
 
   @Override
-  public int hyperVectorIndex() { return 0; }
+  public int hyperVectorIndex( ) {
+    return AccessorUtilities.sv4Batch(sv4.get(position));
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowSetImpl.java
index 31d12cd..0773d5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowSetImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowSetImpl.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.resultSet.model.hyper.BaseReaderBuilder;
+import org.apache.drill.exec.physical.resultSet.model.hyper.HyperReaderBuilder;
 import org.apache.drill.exec.physical.resultSet.model.hyper.HyperSchemaInference;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -44,16 +44,6 @@ import org.apache.drill.exec.physical.rowSet.RowSet.HyperRowSet;
 
 public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
 
-  public static class RowSetReaderBuilder extends BaseReaderBuilder {
-
-    public RowSetReader buildReader(HyperRowSet rowSet, SelectionVector4 sv4) {
-      TupleMetadata schema = rowSet.schema();
-      HyperRowIndex rowIndex = new HyperRowIndex(sv4);
-      return new RowSetReaderImpl(schema, rowIndex,
-          buildContainerChildren(rowSet.container(), schema));
-    }
-  }
-
   public static class HyperRowSetBuilderImpl implements HyperRowSetBuilder {
 
     private final BufferAllocator allocator;
@@ -95,6 +85,7 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
       for (VectorContainer container : batches) {
         hyperContainer.addBatch(container);
       }
+      hyperContainer.setRecordCount(totalRowCount);
 
       // TODO: This has a bug. If the hyperset has two batches with unions,
       // and the first union contains only VARCHAR, while the second contains
@@ -160,7 +151,7 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
 
   @Override
   public RowSetReader reader() {
-    return new RowSetReaderBuilder().buildReader(this, sv4);
+    return HyperReaderBuilder.build(container(), schema, sv4);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
similarity index 57%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
index ea64ac1..8aa0770 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
@@ -15,25 +15,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.resultSet.model.single;
+package org.apache.drill.exec.physical.rowSet;
 
 import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 
 /**
- * Reader index that points directly to each row in the row set.
- * This index starts with pointing to the -1st row, so that the
- * reader can require a <tt>next()</tt> for every row, including
- * the first. (This is the JDBC <tt>RecordSet</tt> convention.)
+ * Reader index that points to each row indirectly through the
+ * selection vector. The {@link #offset()} method points to the
+ * actual data row, while the {@link #logicalIndex()} method gives
+ * the position relative to the indirection vector. That is,
+ * the position increases monotonically, but the index jumps
+ * around as specified by the indirection vector.
  */
 
-public class DirectRowIndex extends ReaderIndex {
+public class IndirectRowIndex extends ReaderIndex {
 
-  public DirectRowIndex(int rowCount) {
-    super(rowCount);
+  private final SelectionVector2 sv2;
+
+  public IndirectRowIndex(SelectionVector2 sv2) {
+    super(sv2.getCount());
+    this.sv2 = sv2;
   }
 
   @Override
-  public int offset() { return position; }
+  public int offset() { return sv2.getIndex(position); }
 
   @Override
   public int hyperVectorIndex() { return 0; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowSet.java
index cf7da3c..17bbf74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowSet.java
@@ -17,18 +17,17 @@
  */
 package org.apache.drill.exec.physical.rowSet;
 
+import java.util.Collections;
+import java.util.Set;
+
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
 import org.apache.drill.exec.physical.resultSet.model.single.SingleSchemaInference;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
-import java.util.Collections;
-import java.util.Set;
-
 /**
  * Single row set coupled with an indirection (selection) vector,
  * specifically an SV2.
@@ -36,31 +35,6 @@ import java.util.Set;
 
 public class IndirectRowSet extends AbstractSingleRowSet {
 
-  /**
-   * Reader index that points to each row indirectly through the
-   * selection vector. The {@link #offset()} method points to the
-   * actual data row, while the {@link #logicalIndex()} method gives
-   * the position relative to the indirection vector. That is,
-   * the position increases monotonically, but the index jumps
-   * around as specified by the indirection vector.
-   */
-
-  private static class IndirectRowIndex extends ReaderIndex {
-
-    private final SelectionVector2 sv2;
-
-    public IndirectRowIndex(SelectionVector2 sv2) {
-      super(sv2.getCount());
-      this.sv2 = sv2;
-    }
-
-    @Override
-    public int offset() { return sv2.getIndex(position); }
-
-    @Override
-    public int hyperVectorIndex() { return 0; }
-  }
-
   private final SelectionVector2 sv2;
 
   private IndirectRowSet(VectorContainer container, SelectionVector2 sv2) {
@@ -117,7 +91,8 @@ public class IndirectRowSet extends AbstractSingleRowSet {
 
   @Override
   public RowSetReader reader() {
-    return buildReader(new IndirectRowIndex(getSv2()));
+    IndirectRowIndex index = new IndirectRowIndex(getSv2());
+    return buildReader(index);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
index 9bf9f44..eeb2236 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
@@ -25,7 +25,6 @@ import org.apache.commons.io.output.StringBuilderWriter;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
@@ -61,10 +60,6 @@ public class RowSetFormatter {
     RowSets.wrap(batch).print();
   }
 
-  public static void print(RecordBatch batch) {
-    RowSets.wrap(batch).print();
-  }
-
   public static String toString(RowSet rowSet) {
     StringBuilderWriter out = new StringBuilderWriter();
     new RowSetFormatter(rowSet, out).write();