You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/04/21 23:32:30 UTC

[3/3] drill git commit: DRILL-5323: Test tools for row sets

DRILL-5323: Test tools for row sets

Provide test tools to create, populate and compare row sets

To simplify tests, we need a TestRowSet concept that wraps a
VectorContainer and provides easy ways to:

- Define a schema for the row set.
- Create a set of vectors that implement the schema.
- Populate the row set with test data via code.
- Add an SV2 to the row set.
- Pass the row set to operator components (such as generated code
blocks.)
- Examine the contents of a row set
- Compare the results of the operation with an expected result set.
- Dispose of the underling direct memory when work is done.

This code builds on that in DRILL-5324 to provide a complete row set
API. See DRILL-5318 for the spec.

Note: this code can be reviewed as-is, but cannot be committed until
after DRILL-5324 is committed: this code has compile-time dependencies
on that code. This PR will be rebased once DRILL-5324 is pulled into
master.

Handles maps and intervals

The row set schema is refined to provide two forms of schema. A
physical schema shows the nested structure of the data with maps
expanding into their contents.

Updates the row set schema builder to easily build a schema with maps.

An access schema shows the row \u201cflattened\u201d to include just scalar
(non-map) columns, with all columns at a single level, with dotted
names identifying nested fields. This form makes for very simple access.

Then, provides tools for reading and writing batches with maps by
presenting the flattened view to the row reader and writer.

HyperVectors have a very complex structure for maps. The hyper row set
implementation takes a first crack at mapping that structure into the
standardized row set format.

Also provides a handy way to set an INTERVAL column from an int. There
is no good mapping from an int to an interval, so an arbitrary
convention is used. This convention is not generally useful, but is
very handy for quickly generating test data.

As before, this is a partial PR. The code here still depends on
DRILL-5324 to provide the column accessors needed by the row reader and
writer.

All this code is getting rather complex, so this commit includes a unit
test of the schema and row set code.

Revisions to support arrays

Arrays require a somewhat different API. Refactored to allow arrays to
appear as a field type.

While refactoring, moved interfaces to more logical locations.

Added more comments.

Rejiggered the row set schema to provide both a physical and flattened
(access) schema, both driven from the original batch schema.

Pushed some accessor and writer classes into the accessor layer.

Added tests for arrays.

Also added more comments where needed.

Moved tests to DRILL-5318

The test classes previously here depend on the new \u201coperator fixture\u201d.
To provide a non-cyclic checkin order, moved the tests to the PR with
the fixtures so that this PR is clear of dependencies. The tests were
reviewed in the context of DRILL-5318.

Also pulls in batch sizer support for map fields which are required by
the tests.

closes #785


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/095a660b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/095a660b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/095a660b

Branch: refs/heads/master
Commit: 095a660b337e3fbdd09c13e220a545617aaf922c
Parents: 381eab6
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue Mar 14 16:18:24 2017 -0700
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Fri Apr 21 14:51:36 2017 -0700

----------------------------------------------------------------------
 .../physical/impl/spill/RecordBatchSizer.java   |  53 +++-
 .../apache/drill/exec/record/BatchSchema.java   |   5 +-
 .../exec/record/VectorAccessibleUtilities.java  |  51 ++++
 .../drill/test/rowSet/AbstractRowSet.java       | 164 ++++++++++
 .../drill/test/rowSet/AbstractSingleRowSet.java | 217 +++++++++++++
 .../apache/drill/test/rowSet/DirectRowSet.java  | 236 ++++++++++++++
 .../drill/test/rowSet/HyperRowSetImpl.java      | 292 ++++++++++++++++++
 .../drill/test/rowSet/IndirectRowSet.java       | 125 ++++++++
 .../org/apache/drill/test/rowSet/RowSet.java    | 198 ++++++++++++
 .../apache/drill/test/rowSet/RowSetBuilder.java |  85 ++++++
 .../drill/test/rowSet/RowSetComparison.java     | 244 +++++++++++++++
 .../apache/drill/test/rowSet/RowSetPrinter.java | 101 ++++++
 .../apache/drill/test/rowSet/RowSetSchema.java  | 304 +++++++++++++++++++
 .../drill/test/rowSet/RowSetUtilities.java      | 106 +++++++
 .../apache/drill/test/rowSet/SchemaBuilder.java | 142 +++++++++
 .../apache/drill/test/rowSet/package-info.java  |  76 +++++
 16 files changed, 2385 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index b384e0a..4cb2bae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -20,14 +20,17 @@ package org.apache.drill.exec.physical.impl.spill;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
 
 /**
  * Given a record batch or vector container, determines the actual memory
@@ -66,16 +69,15 @@ public class RecordBatchSizer {
     public int density;
     public int dataSize;
 
-    @SuppressWarnings("resource")
-    public ColumnSize(VectorWrapper<?> vw) {
-      metadata = vw.getField();
+    public ColumnSize(ValueVector v) {
+      metadata = v.getField();
       stdSize = TypeHelper.getSize(metadata.getType());
 
       // Can't get size estimates if this is an empty batch.
 
-      ValueVector v = vw.getValueVector();
       int rowCount = v.getAccessor().getValueCount();
       if (rowCount == 0) {
+        estSize = stdSize;
         return;
       }
 
@@ -128,7 +130,7 @@ public class RecordBatchSizer {
     }
   }
 
-  List<ColumnSize> columnSizes = new ArrayList<>();
+  private List<ColumnSize> columnSizes = new ArrayList<>();
 
   /**
    * Number of records (rows) in the batch.
@@ -159,7 +161,17 @@ public class RecordBatchSizer {
 
   private int netBatchSize;
 
+  public RecordBatchSizer(RecordBatch batch) {
+    this(batch,
+         (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) ?
+         batch.getSelectionVector2() : null);
+  }
+
   public RecordBatchSizer(VectorAccessible va) {
+    this(va, null);
+  }
+
+  public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) {
     rowCount = va.getRecordCount();
     for (VectorWrapper<?> vw : va) {
       measureColumn(vw);
@@ -169,12 +181,9 @@ public class RecordBatchSizer {
       grossRowWidth = roundUp(totalBatchSize, rowCount);
     }
 
-    hasSv2 = va.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE;
-    if (hasSv2) {
-      @SuppressWarnings("resource")
-      SelectionVector2 sv2 = va.getSelectionVector2();
+    if (sv2 != null) {
       sv2Size = sv2.getBuffer(false).capacity();
-      grossRowWidth += sv2Size / rowCount;
+      grossRowWidth += roundUp(sv2Size, rowCount);
       netRowWidth += 2;
     }
 
@@ -200,7 +209,19 @@ public class RecordBatchSizer {
   }
 
   private void measureColumn(VectorWrapper<?> vw) {
-    ColumnSize colSize = new ColumnSize(vw);
+    measureColumn(vw.getValueVector());
+  }
+
+  private void measureColumn(ValueVector v) {
+
+    // Maps consume no size themselves. However, their contained
+    // vectors do consume space, so visit columns recursively.
+
+    if (v.getField().getType().getMinorType() == MinorType.MAP) {
+      expandMap((AbstractMapVector) v);
+      return;
+    }
+    ColumnSize colSize = new ColumnSize(v);
     columnSizes.add(colSize);
 
     stdRowWidth += colSize.stdSize;
@@ -209,6 +230,12 @@ public class RecordBatchSizer {
     netRowWidth += colSize.estSize;
   }
 
+  private void expandMap(AbstractMapVector mapVector) {
+    for (ValueVector vector : mapVector) {
+      measureColumn(vector);
+    }
+  }
+
   public static int roundUp(int num, int denom) {
     if(denom == 0) {
       return 0;
@@ -240,8 +267,10 @@ public class RecordBatchSizer {
     buf.append(rowCount);
     buf.append(", Total size: ");
     buf.append(totalBatchSize);
-    buf.append(", Row width:");
+    buf.append(", Gross row width:");
     buf.append(grossRowWidth);
+    buf.append(", Net row width:");
+    buf.append(netRowWidth);
     buf.append(", Density:");
     buf.append(avgDensity);
     buf.append("}");

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 3591148..168995d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -28,10 +28,11 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 
 public class BatchSchema implements Iterable<MaterializedField> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
-  final SelectionVectorMode selectionVectorMode;
+
+  private final SelectionVectorMode selectionVectorMode;
   private final List<MaterializedField> fields;
 
-  BatchSchema(SelectionVectorMode selectionVector, List<MaterializedField> fields) {
+  public BatchSchema(SelectionVectorMode selectionVector, List<MaterializedField> fields) {
     this.fields = fields;
     this.selectionVectorMode = selectionVector;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
new file mode 100644
index 0000000..12b9053
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessibleUtilities.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.AllocationHelper;
+
+/**
+ * VectorAccessible is an interface. Yet, several operations are done
+ * on VectorAccessible over and over gain. While Java 8 allows static
+ * methods on an interface, Drill uses Java 7, which does not. This
+ * class is a placeholder for common VectorAccessible methods that
+ * can migrate into the interface when Drill upgrades to Java 8.
+ */
+
+public class VectorAccessibleUtilities {
+
+  private VectorAccessibleUtilities() { }
+
+  public static void clear(VectorAccessible va) {
+    for (final VectorWrapper<?> w : va) {
+      w.clear();
+    }
+  }
+
+  public static void setValueCount(VectorAccessible va, int count) {
+    for (VectorWrapper<?> w: va) {
+      w.getValueVector().getMutator().setValueCount(count);
+    }
+  }
+
+  public static void allocateVectors(VectorAccessible va, int targetRecordCount) {
+    for (VectorWrapper<?> w: va) {
+      AllocationHelper.allocateNew(w.getValueVector(), targetRecordCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
new file mode 100644
index 0000000..a32262a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractRowSet.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnAccessor.RowIndex;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
+import org.apache.drill.exec.vector.accessor.impl.TupleReaderImpl;
+
+/**
+ * Basic implementation of a row set for both the single and multiple
+ * (hyper) varieties, both the fixed and extendible varieties.
+ */
+
+public abstract class AbstractRowSet implements RowSet {
+
+  /**
+   * Row set index base class used when indexing rows within a row
+   * set for a row set reader. Keeps track of the current position,
+   * which starts before the first row, meaning that the client
+   * must call <tt>next()</tt> to advance to the first row.
+   */
+
+  public static abstract class RowSetIndex implements RowIndex {
+    protected int rowIndex = -1;
+
+    public int position() { return rowIndex; }
+    public abstract boolean next();
+    public abstract int size();
+    public abstract boolean valid();
+    public void set(int index) { rowIndex = index; }
+  }
+
+  /**
+   * Bounded (read-only) version of the row set index. When reading,
+   * the row count is fixed, and set here.
+   */
+
+  public static abstract class BoundedRowIndex extends RowSetIndex {
+
+    protected final int rowCount;
+
+    public BoundedRowIndex(int rowCount) {
+      this.rowCount = rowCount;
+    }
+
+    @Override
+    public boolean next() {
+      if (++rowIndex < rowCount ) {
+        return true;
+      } else {
+        rowIndex--;
+        return false;
+      }
+    }
+
+    @Override
+    public int size() { return rowCount; }
+
+    @Override
+    public boolean valid() { return rowIndex < rowCount; }
+  }
+
+  /**
+   * Reader implementation for a row set.
+   */
+
+  public class RowSetReaderImpl extends TupleReaderImpl implements RowSetReader {
+
+    protected final RowSetIndex index;
+
+    public RowSetReaderImpl(TupleSchema schema, RowSetIndex index, AbstractColumnReader[] readers) {
+      super(schema, readers);
+      this.index = index;
+    }
+
+    @Override
+    public boolean next() { return index.next(); }
+
+    @Override
+    public boolean valid() { return index.valid(); }
+
+    @Override
+    public int index() { return index.position(); }
+
+    @Override
+    public int size() { return index.size(); }
+
+    @Override
+    public int rowIndex() { return index.index(); }
+
+    @Override
+    public int batchIndex() { return index.batch(); }
+
+    @Override
+    public void set(int index) { this.index.set(index); }
+  }
+
+  protected final BufferAllocator allocator;
+  protected final RowSetSchema schema;
+  protected final VectorContainer container;
+  protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+
+  public AbstractRowSet(BufferAllocator allocator, BatchSchema schema, VectorContainer container) {
+    this.allocator = allocator;
+    this.schema = new RowSetSchema(schema);
+    this.container = container;
+  }
+
+  @Override
+  public VectorAccessible vectorAccessible() { return container; }
+
+  @Override
+  public VectorContainer container() { return container; }
+
+  @Override
+  public int rowCount() { return container.getRecordCount(); }
+
+  @Override
+  public void clear() {
+    container.zeroVectors();
+    container.setRecordCount(0);
+  }
+
+  @Override
+  public RowSetSchema schema() { return schema; }
+
+  @Override
+  public BufferAllocator allocator() { return allocator; }
+
+  @Override
+  public void print() {
+    new RowSetPrinter(this).print();
+  }
+
+  @Override
+  public int size() {
+    throw new UnsupportedOperationException("getSize");
+  }
+
+  @Override
+  public BatchSchema batchSchema() {
+    return container.getSchema();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
new file mode 100644
index 0000000..d8176de
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
+import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
+import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn;
+import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
+
+/**
+ * Base class for row sets backed by a single record batch.
+ */
+
+public abstract class AbstractSingleRowSet extends AbstractRowSet implements SingleRowSet {
+
+  /**
+   * Internal helper class to organize a set of value vectors for use by the
+   * row set class. Subclasses either build vectors from a schema, or map an
+   * existing vector container into the row set structure. The row set
+   * structure is based on a flattened structure; all vectors appear in
+   * a single vector array. Maps are set aside in a separate map list.
+   */
+
+  public abstract static class StructureBuilder {
+    protected final PhysicalSchema schema;
+    protected final BufferAllocator allocator;
+    protected final ValueVector[] valueVectors;
+    protected final MapVector[] mapVectors;
+    protected int vectorIndex;
+    protected int mapIndex;
+
+    public StructureBuilder(BufferAllocator allocator, RowSetSchema schema) {
+      this.allocator = allocator;
+      this.schema = schema.physical();
+      FlattenedSchema flatSchema = schema.flatAccess();
+      valueVectors = new ValueVector[flatSchema.count()];
+      if (flatSchema.mapCount() == 0) {
+        mapVectors = null;
+      } else {
+        mapVectors = new MapVector[flatSchema.mapCount()];
+      }
+    }
+  }
+
+  /**
+   * Create a set of value vectors given a schema, then map them into both
+   * the value container and the row set structure.
+   */
+
+  public static class VectorBuilder extends StructureBuilder {
+
+    public VectorBuilder(BufferAllocator allocator, RowSetSchema schema) {
+      super(allocator, schema);
+    }
+
+    public ValueVector[] buildContainer(VectorContainer container) {
+      for (int i = 0; i < schema.count(); i++) {
+        LogicalColumn colSchema = schema.column(i);
+        @SuppressWarnings("resource")
+        ValueVector v = TypeHelper.getNewVector(colSchema.field, allocator, null);
+        container.add(v);
+        if (colSchema.field.getType().getMinorType() == MinorType.MAP) {
+          MapVector mv = (MapVector) v;
+          mapVectors[mapIndex++] = mv;
+          buildMap(mv, colSchema.mapSchema);
+        } else {
+          valueVectors[vectorIndex++] = v;
+        }
+      }
+      container.buildSchema(SelectionVectorMode.NONE);
+      return valueVectors;
+    }
+
+    private void buildMap(MapVector mapVector, PhysicalSchema mapSchema) {
+      for (int i = 0; i < mapSchema.count(); i++) {
+        LogicalColumn colSchema = mapSchema.column(i);
+        MajorType type = colSchema.field.getType();
+        Class<? extends ValueVector> vectorClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
+        @SuppressWarnings("resource")
+        ValueVector v = mapVector.addOrGet(colSchema.field.getName(), type, vectorClass);
+        if (type.getMinorType() == MinorType.MAP) {
+          MapVector mv = (MapVector) v;
+          mapVectors[mapIndex++] = mv;
+          buildMap(mv, colSchema.mapSchema);
+        } else {
+          valueVectors[vectorIndex++] = v;
+        }
+      }
+    }
+  }
+
+  /**
+   * Build a row set given an existing vector container. In this case,
+   * the vectors exist and we simply need to pull them out of the container
+   * and maps and put them into the row set arrays.
+   */
+
+  public static class VectorMapper extends StructureBuilder {
+
+    public VectorMapper(BufferAllocator allocator, RowSetSchema schema) {
+      super(allocator, schema);
+    }
+
+    public ValueVector[] mapContainer(VectorContainer container) {
+      for (VectorWrapper<?> w : container) {
+        @SuppressWarnings("resource")
+        ValueVector v = w.getValueVector();
+        if (v.getField().getType().getMinorType() == MinorType.MAP) {
+          MapVector mv = (MapVector) v;
+          mapVectors[mapIndex++] = mv;
+          buildMap(mv);
+        } else {
+          valueVectors[vectorIndex++] = v;
+        }
+      }
+      return valueVectors;
+    }
+
+    private void buildMap(MapVector mapVector) {
+      for (ValueVector v : mapVector) {
+        if (v.getField().getType().getMinorType() == MinorType.MAP) {
+          MapVector mv = (MapVector) v;
+          mapVectors[mapIndex++] = mv;
+          buildMap(mv);
+        } else {
+          valueVectors[vectorIndex++] = v;
+        }
+      }
+    }
+  }
+
+  /**
+   * Flattened representation of value vectors using a depth-first
+   * traversal of maps. Order of vectors here correspond to the column
+   * indexes used to access columns in a reader or writer.
+   */
+
+  protected final ValueVector[] valueVectors;
+
+  public AbstractSingleRowSet(BufferAllocator allocator, BatchSchema schema) {
+    super(allocator, schema, new VectorContainer());
+    valueVectors = new VectorBuilder(allocator, super.schema).buildContainer(container);
+  }
+
+  public AbstractSingleRowSet(BufferAllocator allocator, VectorContainer container) {
+    super(allocator, container.getSchema(), container);
+    valueVectors = new VectorMapper(allocator, super.schema).mapContainer(container);
+  }
+
+  public AbstractSingleRowSet(AbstractSingleRowSet rowSet) {
+    super(rowSet.allocator, rowSet.schema.batch(), rowSet.container);
+    valueVectors = rowSet.valueVectors;
+  }
+
+  @Override
+  public ValueVector[] vectors() { return valueVectors; }
+
+  @Override
+  public int size() {
+    RecordBatchSizer sizer = new RecordBatchSizer(container);
+    return sizer.actualSize();
+  }
+
+  /**
+   * Internal method to build the set of column readers needed for
+   * this row set. Used when building a row set reader.
+   * @param rowIndex object that points to the current row
+   * @return an array of column readers: in the same order as the
+   * (non-map) vectors.
+   */
+
+  protected RowSetReader buildReader(RowSetIndex rowIndex) {
+    FlattenedSchema accessSchema = schema().flatAccess();
+    ValueVector[] valueVectors = vectors();
+    AbstractColumnReader[] readers = new AbstractColumnReader[valueVectors.length];
+    for (int i = 0; i < readers.length; i++) {
+      MinorType type = accessSchema.column(i).getType().getMinorType();
+      if (type == MinorType.MAP) {
+        readers[i] = null; // buildMapAccessor(i);
+      } else if (type == MinorType.LIST) {
+        readers[i] = null; // buildListAccessor(i);
+      } else {
+        readers[i] = ColumnAccessorFactory.newReader(valueVectors[i].getField().getType());
+        readers[i].bind(rowIndex, valueVectors[i]);
+      }
+    }
+    return new RowSetReaderImpl(accessSchema, rowIndex, readers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
new file mode 100644
index 0000000..706db27
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnWriter;
+import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
+import org.apache.drill.exec.vector.accessor.impl.TupleWriterImpl;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+
+/**
+ * Implementation of a single row set with no indirection (selection)
+ * vector.
+ */
+
+public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowSet {
+
+  /**
+   * Reader index that points directly to each row in the row set.
+   * This index starts with pointing to the -1st row, so that the
+   * reader can require a <tt>next()</tt> for every row, including
+   * the first. (This is the JDBC RecordSet convention.)
+   */
+
+  private static class DirectRowIndex extends BoundedRowIndex {
+
+    public DirectRowIndex(int rowCount) {
+      super(rowCount);
+    }
+
+    @Override
+    public int index() { return rowIndex; }
+
+    @Override
+    public int batch() { return 0; }
+  }
+
+  /**
+   * Writer index that points to each row in the row set. The index starts at
+   * the 0th row and advances one row on each increment. This allows writers to
+   * start positioned at the first row. Writes happen in the current row.
+   * Calling <tt>next()</tt> advances to the next position, effectively saving
+   * the current row. The most recent row can be abandoned easily simply by not
+   * calling <tt>next()</tt>. This means that the number of completed rows is
+   * the same as the row index.
+   */
+
+  private static class ExtendableRowIndex extends RowSetIndex {
+
+    private final int maxSize;
+
+    public ExtendableRowIndex(int maxSize) {
+      this.maxSize = maxSize;
+      rowIndex = 0;
+    }
+
+    @Override
+    public int index() { return rowIndex; }
+
+    @Override
+    public boolean next() {
+      if (++rowIndex <= maxSize ) {
+        return true;
+      } else {
+        rowIndex--;
+        return false;
+      }
+    }
+
+    @Override
+    public int size() { return rowIndex; }
+
+    @Override
+    public boolean valid() { return rowIndex < maxSize; }
+
+    @Override
+    public int batch() { return 0; }
+  }
+
+  /**
+   * Implementation of a row set writer. Only available for newly-created,
+   * empty, direct, single row sets. Rewriting is not allowed, nor is writing
+   * to a hyper row set.
+   */
+
+  public class RowSetWriterImpl extends TupleWriterImpl implements RowSetWriter {
+
+    private final ExtendableRowIndex index;
+    private final ExtendableRowSet rowSet;
+
+    protected RowSetWriterImpl(ExtendableRowSet rowSet, TupleSchema schema, ExtendableRowIndex index, AbstractColumnWriter[] writers) {
+      super(schema, writers);
+      this.rowSet = rowSet;
+      this.index = index;
+      start();
+    }
+
+    @Override
+    public void setRow(Object...values) {
+      if (! index.valid()) {
+        throw new IndexOutOfBoundsException("Write past end of row set");
+      }
+      for (int i = 0; i < values.length;  i++) {
+        set(i, values[i]);
+      }
+      save();
+    }
+
+    @Override
+    public boolean valid() { return index.valid(); }
+
+    @Override
+    public int index() { return index.position(); }
+
+    @Override
+    public void save() {
+      index.next();
+      start();
+    }
+
+    @Override
+    public void done() {
+      rowSet.setRowCount(index.size());
+    }
+  }
+
+  public DirectRowSet(BufferAllocator allocator, BatchSchema schema) {
+    super(allocator, schema);
+  }
+
+  public DirectRowSet(BufferAllocator allocator, VectorContainer container) {
+    super(allocator, container);
+  }
+
+  public DirectRowSet(BufferAllocator allocator, VectorAccessible va) {
+    super(allocator, toContainer(va, allocator));
+  }
+
+  private static VectorContainer toContainer(VectorAccessible va, BufferAllocator allocator) {
+    VectorContainer container = VectorContainer.getTransferClone(va, allocator);
+    container.buildSchema(SelectionVectorMode.NONE);
+    container.setRecordCount(va.getRecordCount());
+    return container;
+  }
+
+  @Override
+  public void allocate(int recordCount) {
+    for (final ValueVector v : valueVectors) {
+      AllocationHelper.allocate(v, recordCount, 50, 10);
+    }
+  }
+
+  @Override
+  public void setRowCount(int rowCount) {
+    container.setRecordCount(rowCount);
+    VectorAccessibleUtilities.setValueCount(container, rowCount);
+  }
+
+  @Override
+  public RowSetWriter writer() {
+    return writer(10);
+  }
+
+  @Override
+  public RowSetWriter writer(int initialRowCount) {
+    if (container.hasRecordCount()) {
+      throw new IllegalStateException("Row set already contains data");
+    }
+    allocate(initialRowCount);
+    return buildWriter(new ExtendableRowIndex(Character.MAX_VALUE));
+  }
+
+  /**
+   * Build writer objects for each column based on the column type.
+   *
+   * @param rowIndex the index which points to each row
+   * @return an array of writers
+   */
+
+  protected RowSetWriter buildWriter(ExtendableRowIndex rowIndex) {
+    ValueVector[] valueVectors = vectors();
+    AbstractColumnWriter[] writers = new AbstractColumnWriter[valueVectors.length];
+    for (int i = 0; i < writers.length; i++) {
+      writers[i] = ColumnAccessorFactory.newWriter(valueVectors[i].getField().getType());
+      writers[i].bind(rowIndex, valueVectors[i]);
+    }
+    TupleSchema accessSchema = schema().hierarchicalAccess();
+    return new RowSetWriterImpl(this, accessSchema, rowIndex, writers);
+  }
+
+  @Override
+  public RowSetReader reader() {
+    return buildReader(new DirectRowIndex(rowCount()));
+  }
+
+  @Override
+  public boolean isExtendable() { return true; }
+
+  @Override
+  public boolean isWritable() { return true; }
+
+  @Override
+  public SelectionVectorMode indirectionType() { return SelectionVectorMode.NONE; }
+
+  @Override
+  public SingleRowSet toIndirect() {
+    return new IndirectRowSet(this);
+  }
+
+  @Override
+  public SelectionVector2 getSv2() { return null; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
new file mode 100644
index 0000000..9df0b23
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.AccessorUtilities;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
+import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader.VectorAccessor;
+import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.test.rowSet.RowSet.HyperRowSet;
+import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
+import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn;
+import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
+
+/**
+ * Implements a row set wrapper around a collection of "hyper vectors."
+ * A hyper-vector is a logical vector formed by a series of physical vectors
+ * stacked on top of one another. To make a row set, we have a hyper-vector
+ * for each column. Another way to visualize this is as a "hyper row set":
+ * a stacked collection of single row sets: each column is represented by a
+ * vector per row set, with each vector in a row set having the same number
+ * of rows. An SV4 then provides a uniform index into the rows in the
+ * hyper set. A hyper row set is read-only.
+ */
+
+public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
+
+  /**
+   * Read-only row index into the hyper row set with batch and index
+   * values mapping via an SV4.
+   */
+
+  public static class HyperRowIndex extends BoundedRowIndex {
+
+    private final SelectionVector4 sv4;
+
+    public HyperRowIndex(SelectionVector4 sv4) {
+      super(sv4.getCount());
+      this.sv4 = sv4;
+    }
+
+    @Override
+    public int index() {
+      return AccessorUtilities.sv4Index(sv4.get(rowIndex));
+    }
+
+    @Override
+    public int batch( ) {
+      return AccessorUtilities.sv4Batch(sv4.get(rowIndex));
+    }
+  }
+
+  /**
+   * Vector accessor used by the column accessors to obtain the vector for
+   * each column value. That is, position 0 might be batch 4, index 3,
+   * while position 1 might be batch 1, index 7, and so on.
+   */
+
+  public static class HyperVectorAccessor implements VectorAccessor {
+
+    private final HyperRowIndex rowIndex;
+    private final ValueVector[] vectors;
+
+    public HyperVectorAccessor(HyperVectorWrapper<ValueVector> hvw, HyperRowIndex rowIndex) {
+      this.rowIndex = rowIndex;
+      vectors = hvw.getValueVectors();
+    }
+
+    @Override
+    public ValueVector vector() {
+      return vectors[rowIndex.batch()];
+    }
+  }
+
+  /**
+   * Build a hyper row set by restructuring a hyper vector bundle into a uniform
+   * shape. Consider this schema: <pre><code>
+   * { a: 10, b: { c: 20, d: { e: 30 } } }</code></pre>
+   * <p>
+   * The hyper container, with two batches, has this structure:
+   * <table border="1">
+   * <tr><th>Batch</th><th>a</th><th>b</th></tr>
+   * <tr><td>0</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th>
+   * <tr><td>1</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th>
+   * </table>
+   * <p>
+   * The above table shows that top-level scalar vectors (such as the Int Vector for column
+   * a) appear "end-to-end" as a hyper-vector. Maps also appear end-to-end. But, the
+   * contents of the map (column c) do not appear end-to-end. Instead, they appear as
+   * contents in the map vector. To get to c, one indexes into the map vector, steps inside
+   * the map to find c and indexes to the right row.
+   * <p>
+   * Similarly, the maps for d do not appear end-to-end, one must step to the right batch
+   * in b, then step to d.
+   * <p>
+   * Finally, to get to e, one must step
+   * into the hyper vector for b, then steps to the proper batch, steps to d, step to e
+   * and finally step to the row within e. This is a very complex, costly indexing scheme
+   * that differs depending on map nesting depth.
+   * <p>
+   * To simplify access, this class restructures the maps to flatten the scalar vectors
+   * into end-to-end hyper vectors. For example, for the above:
+   * <p>
+   * <table border="1">
+   * <tr><th>Batch</th><th>a</th><th>c</th><th>d</th></tr>
+   * <tr><td>0</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th>
+   * <tr><td>1</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th>
+   * </table>
+   *
+   * The maps are still available as hyper vectors, but separated into map fields.
+   * (Scalar access no longer needs to access the maps.) The result is a uniform
+   * addressing scheme for both top-level and nested vectors.
+   */
+
+  public static class HyperVectorBuilder {
+
+    protected final HyperVectorWrapper<?> valueVectors[];
+    protected final HyperVectorWrapper<AbstractMapVector> mapVectors[];
+    private final List<ValueVector> nestedScalars[];
+    private int vectorIndex;
+    private int mapIndex;
+    private final PhysicalSchema physicalSchema;
+
+    @SuppressWarnings("unchecked")
+    public HyperVectorBuilder(RowSetSchema schema) {
+      physicalSchema = schema.physical();
+      FlattenedSchema flatSchema = schema.flatAccess();
+      valueVectors = new HyperVectorWrapper<?>[schema.hierarchicalAccess().count()];
+      if (flatSchema.mapCount() == 0) {
+        mapVectors = null;
+        nestedScalars = null;
+      } else {
+        mapVectors = (HyperVectorWrapper<AbstractMapVector>[])
+            new HyperVectorWrapper<?>[flatSchema.mapCount()];
+        nestedScalars = new ArrayList[flatSchema.count()];
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    public HyperVectorWrapper<ValueVector>[] mapContainer(VectorContainer container) {
+      int i = 0;
+      for (VectorWrapper<?> w : container) {
+        HyperVectorWrapper<?> hvw = (HyperVectorWrapper<?>) w;
+        if (w.getField().getType().getMinorType() == MinorType.MAP) {
+          HyperVectorWrapper<AbstractMapVector> mw = (HyperVectorWrapper<AbstractMapVector>) hvw;
+          mapVectors[mapIndex++] = mw;
+          buildHyperMap(physicalSchema.column(i).mapSchema(), mw);
+        } else {
+          valueVectors[vectorIndex++] = hvw;
+        }
+        i++;
+      }
+      if (nestedScalars != null) {
+        buildNestedHyperVectors();
+      }
+      return (HyperVectorWrapper<ValueVector>[]) valueVectors;
+    }
+
+    private void buildHyperMap(PhysicalSchema mapSchema, HyperVectorWrapper<AbstractMapVector> mapWrapper) {
+      createHyperVectors(mapSchema);
+      for (AbstractMapVector mapVector : mapWrapper.getValueVectors()) {
+        buildMap(mapSchema, mapVector);
+      }
+    }
+
+    private void buildMap(PhysicalSchema mapSchema, AbstractMapVector mapVector) {
+      for (ValueVector v : mapVector) {
+        LogicalColumn col = mapSchema.column(v.getField().getName());
+        if (col.isMap()) {
+          buildMap(col.mapSchema, (AbstractMapVector) v);
+        } else {
+          nestedScalars[col.accessIndex()].add(v);
+        }
+      }
+    }
+
+    private void createHyperVectors(PhysicalSchema mapSchema) {
+      for (int i = 0; i < mapSchema.count(); i++) {
+        LogicalColumn col = mapSchema.column(i);
+        if (col.isMap()) {
+          createHyperVectors(col.mapSchema);
+        } else {
+          nestedScalars[col.accessIndex()] = new ArrayList<ValueVector>();
+        }
+      }
+    }
+
+    private void buildNestedHyperVectors() {
+      for (int i = 0;  i < nestedScalars.length; i++) {
+        if (nestedScalars[i] == null) {
+          continue;
+        }
+        ValueVector vectors[] = new ValueVector[nestedScalars[i].size()];
+        nestedScalars[i].toArray(vectors);
+        assert valueVectors[i] == null;
+        valueVectors[i] = new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors, false);
+      }
+    }
+  }
+
+  /**
+   * Selection vector that indexes into the hyper vectors.
+   */
+  private final SelectionVector4 sv4;
+  /**
+   * Collection of hyper vectors in flattened order: a left-to-right,
+   * depth first ordering of vectors in maps. Order here corresponds to
+   * the order used for column indexes in the row set reader.
+   */
+  private final HyperVectorWrapper<ValueVector> hvw[];
+
+  public HyperRowSetImpl(BufferAllocator allocator, VectorContainer container, SelectionVector4 sv4) {
+    super(allocator, container.getSchema(), container);
+    this.sv4 = sv4;
+    hvw = new HyperVectorBuilder(schema).mapContainer(container);
+  }
+
+  @Override
+  public boolean isExtendable() { return false; }
+
+  @Override
+  public boolean isWritable() { return false; }
+
+  @Override
+  public RowSetWriter writer() {
+    throw new UnsupportedOperationException("Cannot write to a hyper vector");
+  }
+
+  @Override
+  public RowSetReader reader() {
+    return buildReader(new HyperRowIndex(sv4));
+  }
+
+  /**
+   * Internal method to build the set of column readers needed for
+   * this row set. Used when building a row set reader.
+   * @param rowIndex object that points to the current row
+   * @return an array of column readers: in the same order as the
+   * (non-map) vectors.
+   */
+
+  protected RowSetReader buildReader(HyperRowIndex rowIndex) {
+    FlattenedSchema accessSchema = schema().flatAccess();
+    AbstractColumnReader readers[] = new AbstractColumnReader[accessSchema.count()];
+    for (int i = 0; i < readers.length; i++) {
+      MaterializedField field = accessSchema.column(i);
+      readers[i] = ColumnAccessorFactory.newReader(field.getType());
+      HyperVectorWrapper<ValueVector> hvw = getHyperVector(i);
+      readers[i].bind(rowIndex, field, new HyperVectorAccessor(hvw, rowIndex));
+    }
+    return new RowSetReaderImpl(accessSchema, rowIndex, readers);
+  }
+
+  @Override
+  public SelectionVectorMode indirectionType() { return SelectionVectorMode.FOUR_BYTE; }
+
+  @Override
+  public SelectionVector4 getSv4() { return sv4; }
+
+  @Override
+  public HyperVectorWrapper<ValueVector> getHyperVector(int i) { return hvw[i]; }
+
+  @Override
+  public int rowCount() { return sv4.getCount(); }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
new file mode 100644
index 0000000..f90fbb7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+/**
+ * Single row set coupled with an indirection (selection) vector,
+ * specifically an SV2.
+ */
+
+public class IndirectRowSet extends AbstractSingleRowSet {
+
+  /**
+   * Reader index that points to each row indirectly through the
+   * selection vector. The {@link #index()} method points to the
+   * actual data row, while the {@link #position()} method gives
+   * the position relative to the indirection vector. That is,
+   * the position increases monotonically, but the index jumps
+   * around as specified by the indirection vector.
+   */
+
+  private static class IndirectRowIndex extends BoundedRowIndex {
+
+    private final SelectionVector2 sv2;
+
+    public IndirectRowIndex(SelectionVector2 sv2) {
+      super(sv2.getCount());
+      this.sv2 = sv2;
+    }
+
+    @Override
+    public int index() { return sv2.getIndex(rowIndex); }
+
+    @Override
+    public int batch() { return 0; }
+  }
+
+  private final SelectionVector2 sv2;
+
+  public IndirectRowSet(BufferAllocator allocator, VectorContainer container) {
+    this(allocator, container, makeSv2(allocator, container));
+  }
+
+  public IndirectRowSet(BufferAllocator allocator, VectorContainer container, SelectionVector2 sv2) {
+    super(allocator, container);
+    this.sv2 = sv2;
+  }
+
+  private static SelectionVector2 makeSv2(BufferAllocator allocator, VectorContainer container) {
+    int rowCount = container.getRecordCount();
+    SelectionVector2 sv2 = new SelectionVector2(allocator);
+    if (!sv2.allocateNewSafe(rowCount)) {
+      throw new OutOfMemoryException("Unable to allocate sv2 buffer");
+    }
+    for (int i = 0; i < rowCount; i++) {
+      sv2.setIndex(i, (char) i);
+    }
+    sv2.setRecordCount(rowCount);
+    container.buildSchema(SelectionVectorMode.TWO_BYTE);
+    return sv2;
+  }
+
+  public IndirectRowSet(DirectRowSet directRowSet) {
+    super(directRowSet);
+    sv2 = makeSv2(allocator, container);
+  }
+
+  @Override
+  public SelectionVector2 getSv2() { return sv2; }
+
+  @Override
+  public void clear() {
+    super.clear();
+    getSv2().clear();
+  }
+
+  @Override
+  public RowSetWriter writer() {
+    throw new UnsupportedOperationException("Cannot write to an existing row set");
+  }
+
+  @Override
+  public RowSetReader reader() {
+    return buildReader(new IndirectRowIndex(getSv2()));
+  }
+
+  @Override
+  public boolean isExtendable() {return false;}
+
+  @Override
+  public boolean isWritable() { return true;}
+
+  @Override
+  public SelectionVectorMode indirectionType() { return SelectionVectorMode.TWO_BYTE; }
+
+  @Override
+  public SingleRowSet toIndirect() { return this; }
+
+  @Override
+  public int size() {
+    RecordBatchSizer sizer = new RecordBatchSizer(container, sv2);
+    return sizer.actualSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
new file mode 100644
index 0000000..d22139c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.TupleReader;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * A row set is a collection of rows stored as value vectors. Elsewhere in
+ * Drill we call this a "record batch", but that term has been overloaded to
+ * mean the runtime implementation of an operator...
+ * <p>
+ * A row set encapsulates a set of vectors and provides access to Drill's
+ * various "views" of vectors: {@link VectorContainer},
+ * {@link VectorAccessible}, etc.
+ * <p>
+ * A row set is defined by a {@link RowSetSchema}. For testing purposes, a row
+ * set has a fixed schema; we don't allow changing the set of vectors
+ * dynamically.
+ * <p>
+ * The row set also provides a simple way to write and read records using the
+ * {@link RowSetWriter} and {@link RowSetReader} interfaces. As per Drill
+ * conventions, a row set can be written (once), read many times, and finally
+ * cleared.
+ * <p>
+ * Drill provides a large number of vector (data) types. Each requires a
+ * type-specific way to set data. The row set writer uses a {@link ColumnWriter}
+ * to set each value in a way unique to the specific data type. Similarly, the
+ * row set reader provides a {@link ColumnReader} interface. In both cases,
+ * columns can be accessed by index number (as defined in the schema) or
+ * by name.
+ * <p>
+ * A row set follows a schema. The schema starts as a
+ * {@link BatchSchema}, but is parsed and restructured into a variety of
+ * forms. In the original form, maps contain their value vectors. In the
+ * flattened form, all vectors for all maps (and the top-level tuple) are
+ * collected into a single structure. Since this structure is for testing,
+ * this somewhat-static structure works just file; we don't need the added
+ * complexity that comes from building the schema and data dynamically.
+ * <p>
+ * Putting this all together, the typical life-cycle flow is:
+ * <ul>
+ * <li>Define the schema using {@link RowSetSchema#builder()}.</li>
+ * <li>Create the row set from the schema.</li>
+ * <li>Populate the row set using a writer from {@link #writer(int)}.</li>
+ * <li>Optionally add a selection vector: {@link #makeSv2()}.</li>
+ * <li>Process the vector container using the code under test.</li>
+ * <li>Retrieve the results using a reader from {@link #reader()}.</li>
+ * <li>Dispose of vector memory with {@link #clear()}.</li>
+ * </ul>
+ */
+
+public interface RowSet {
+
+  /**
+   * Interface for writing values to a row set. Only available
+   * for newly-created, single, direct row sets. Eventually, if
+   * we want to allow updating a row set, we have to create a
+   * new row set with the updated columns, then merge the new
+   * and old row sets to create a new immutable row set.
+   */
+
+  public interface RowSetWriter extends TupleWriter {
+    void setRow(Object...values);
+    boolean valid();
+    int index();
+    void save();
+    void done();
+  }
+
+  /**
+   * Reader for all types of row sets.
+   */
+
+  public interface RowSetReader extends TupleReader {
+
+    /**
+     * Total number of rows in the row set.
+     * @return total number of rows
+     */
+    int size();
+
+    boolean next();
+    int index();
+    void set(int index);
+
+    /**
+     * Batch index: 0 for a single batch, batch for the current
+     * row is a hyper-batch.
+     * @return index of the batch for the current row
+     */
+    int batchIndex();
+
+    /**
+     * The index of the underlying row which may be indexed by an
+     * Sv2 or Sv4.
+     *
+     * @return
+     */
+
+    int rowIndex();
+    boolean valid();
+  }
+
+  boolean isExtendable();
+
+  boolean isWritable();
+
+  VectorAccessible vectorAccessible();
+
+  VectorContainer container();
+
+  int rowCount();
+
+  RowSetWriter writer();
+
+  RowSetReader reader();
+
+  void clear();
+
+  RowSetSchema schema();
+
+  BufferAllocator allocator();
+
+  SelectionVectorMode indirectionType();
+
+  void print();
+
+  /**
+   * Return the size in memory of this record set, including indirection
+   * vectors, null vectors, offset vectors and the entire (used and unused)
+   * data vectors.
+   *
+   * @return memory size in bytes
+   */
+
+  int size();
+
+  BatchSchema batchSchema();
+
+  /**
+   * Row set that manages a single batch of rows.
+   */
+
+  public interface SingleRowSet extends RowSet {
+    ValueVector[] vectors();
+    SingleRowSet toIndirect();
+    SelectionVector2 getSv2();
+  }
+
+  /**
+   * Single row set which is empty and allows writing.
+   * Once writing is complete, the row set becomes an
+   * immutable direct row set.
+   */
+
+  public interface ExtendableRowSet extends SingleRowSet {
+    void allocate(int recordCount);
+    void setRowCount(int rowCount);
+    RowSetWriter writer(int initialRowCount);
+  }
+
+  /**
+   * Row set comprised of multiple single row sets, along with
+   * an indirection vector (SV4).
+   */
+
+  public interface HyperRowSet extends RowSet {
+    SelectionVector4 getSv4();
+    HyperVectorWrapper<ValueVector> getHyperVector(int i);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
new file mode 100644
index 0000000..a5b03c8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+
+/**
+ * Fluent builder to quickly build up an row set (record batch)
+ * programmatically. Starting with an {@link OperatorFixture}:
+ * <pre></code>
+ * OperatorFixture fixture = ...
+ * RowSet rowSet = fixture.rowSetBuilder(batchSchema)
+ *   .addRow(10, "string", new int[] {10.3, 10.4})
+ *   ...
+ *   .build();</code></pre>
+ */
+
+public final class RowSetBuilder {
+
+  private DirectRowSet rowSet;
+  private RowSetWriter writer;
+  private boolean withSv2;
+
+  public RowSetBuilder(BufferAllocator allocator, BatchSchema schema) {
+    this(allocator, schema, 10);
+  }
+
+  public RowSetBuilder(BufferAllocator allocator, BatchSchema schema, int capacity) {
+    rowSet = new DirectRowSet(allocator, schema);
+    writer = rowSet.writer(capacity);
+  }
+
+  /**
+   * Add a new row using column values passed as variable-length arguments. Expects
+   * map values to be flattened. a schema of (a:int, b:map(c:varchar)) would be>
+   * set as <br><tt>add(10, "foo");</tt><br> Values of arrays can be expressed as a Java
+   * array. A schema of (a:int, b:int[]) can be set as<br>
+   * <tt>add(10, new int[] {100, 200});</tt><br>
+   * @param values column values in column index order
+   * @return this builder
+   */
+
+  public RowSetBuilder add(Object...values) {
+    writer.setRow(values);
+    return this;
+  }
+
+  /**
+   * Build the row set with a selection vector 2. The SV2 is
+   * initialized to have a 1:1 index to the rows: SV2 0 points
+   * to row 1, SV2 position 1 points to row 1 and so on.
+   *
+   * @return this builder
+   */
+  public RowSetBuilder withSv2() {
+    withSv2 = true;
+    return this;
+  }
+
+  public SingleRowSet build() {
+    writer.done();
+    if (withSv2) {
+      return rowSet.toIndirect();
+    }
+    return rowSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
new file mode 100644
index 0000000..3ba7471
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.exec.vector.accessor.ArrayReader;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+import org.bouncycastle.util.Arrays;
+
+/**
+ * For testing, compare the contents of two row sets (record batches)
+ * to verify that they are identical. Supports masks to exclude certain
+ * columns from comparison.
+ */
+
+public class RowSetComparison {
+
+  private RowSet expected;
+  private boolean mask[];
+  private double delta = 0.001;
+  private int offset;
+  private int span = -1;
+
+  public RowSetComparison(RowSet expected) {
+    this.expected = expected;
+    mask = new boolean[expected.schema().hierarchicalAccess().count()];
+    for (int i = 0; i < mask.length; i++) {
+      mask[i] = true;
+    }
+  }
+
+  /**
+   * Mark a specific column as excluded from comparisons.
+   * @param colNo the index of the column to exclude
+   * @return this builder
+   */
+
+  public RowSetComparison exclude(int colNo) {
+    mask[colNo] = false;
+    return this;
+  }
+
+  /**
+   * Specifies a "selection" mask that determines which columns
+   * to compare. Columns marked as "false" are omitted from the
+   * comparison.
+   *
+   * @param flags variable-length list of column flags
+   * @return this builder
+   */
+  public RowSetComparison withMask(Boolean...flags) {
+    for (int i = 0; i < flags.length; i++) {
+      mask[i] = flags[i];
+    }
+    return this;
+  }
+
+  /**
+   * Specify the delta value to use when comparing float or
+   * double values.
+   *
+   * @param delta the delta to use in float and double comparisons
+   * @return this builder
+   */
+  public RowSetComparison withDelta(double delta) {
+    this.delta = delta;
+    return this;
+  }
+
+  /**
+   * Specify an offset into the row sets to start the comparison.
+   * Usually combined with {@link #span()}.
+   *
+   * @param offset offset into the row set to start the comparison
+   * @return this builder
+   */
+  public RowSetComparison offset(int offset) {
+    this.offset = offset;
+    return this;
+  }
+
+  /**
+   * Specify a subset of rows to compare. Usually combined
+   * with {@link #offset()}.
+   *
+   * @param span the number of rows to compare
+   * @return this builder
+   */
+
+  public RowSetComparison span(int span) {
+    this.span = span;
+    return this;
+  }
+
+  /**
+   * Verify the actual rows using the rules defined in this builder
+   * @param actual the actual results to verify
+   */
+
+  public void verify(RowSet actual) {
+    int testLength = expected.rowCount() - offset;
+    if (span > -1) {
+      testLength = span;
+    }
+    int dataLength = offset + testLength;
+    assertTrue("Missing expected rows", expected.rowCount() >= dataLength);
+    assertTrue("Missing actual rows", actual.rowCount() >= dataLength);
+    RowSetReader er = expected.reader();
+    RowSetReader ar = actual.reader();
+    for (int i = 0; i < offset; i++) {
+      er.next();
+      ar.next();
+    }
+    for (int i = 0; i < testLength; i++) {
+      er.next();
+      ar.next();
+      verifyRow(er, ar);
+    }
+  }
+
+  /**
+   * Convenience method to verify the actual results, then free memory
+   * for both the expected and actual result sets.
+   * @param actual the actual results to verify
+   */
+
+  public void verifyAndClear(RowSet actual) {
+    try {
+      verify(actual);
+    } finally {
+      expected.clear();
+      actual.clear();
+    }
+  }
+
+  private void verifyRow(RowSetReader er, RowSetReader ar) {
+    for (int i = 0; i < mask.length; i++) {
+      if (! mask[i]) {
+        continue;
+      }
+      ColumnReader ec = er.column(i);
+      ColumnReader ac = ar.column(i);
+      String label = er.index() + ":" + i;
+      assertEquals(label, ec.valueType(), ac.valueType());
+      if (ec.isNull()) {
+        assertTrue(label + " - column not null", ac.isNull());
+        continue;
+      }
+      if (! ec.isNull()) {
+        assertTrue(label + " - column is null", ! ac.isNull());
+      }
+    switch (ec.valueType()) {
+    case BYTES: {
+        byte expected[] = ac.getBytes();
+        byte actual[] = ac.getBytes();
+        assertEquals(label + " - byte lengths differ", expected.length, actual.length);
+        assertTrue(label, Arrays.areEqual(expected, actual));
+        break;
+     }
+     case DOUBLE:
+       assertEquals(label, ec.getDouble(), ac.getDouble(), delta);
+       break;
+     case INTEGER:
+       assertEquals(label, ec.getInt(), ac.getInt());
+       break;
+     case LONG:
+       assertEquals(label, ec.getLong(), ac.getLong());
+       break;
+     case STRING:
+       assertEquals(label, ec.getString(), ac.getString());
+        break;
+     case DECIMAL:
+       assertEquals(label, ec.getDecimal(), ac.getDecimal());
+       break;
+     case PERIOD:
+       assertEquals(label, ec.getPeriod(), ac.getPeriod());
+       break;
+     case ARRAY:
+       verifyArray(label, ec.array(), ac.array());
+       break;
+     default:
+        throw new IllegalStateException( "Unexpected type: " + ec.valueType());
+      }
+    }
+  }
+
+  private void verifyArray(String colLabel, ArrayReader ea,
+      ArrayReader aa) {
+    assertEquals(colLabel, ea.valueType(), aa.valueType());
+    assertEquals(colLabel, ea.size(), aa.size());
+    for (int i = 0; i < ea.size(); i++) {
+      String label = colLabel + "[" + i + "]";
+      switch (ea.valueType()) {
+      case ARRAY:
+        throw new IllegalStateException("Arrays of arrays not supported yet");
+      case BYTES: {
+        byte expected[] = ea.getBytes(i);
+        byte actual[] = aa.getBytes(i);
+        assertEquals(label + " - byte lengths differ", expected.length, actual.length);
+        assertTrue(label, Arrays.areEqual(expected, actual));
+        break;
+      }
+      case DOUBLE:
+        assertEquals(label, ea.getDouble(i), aa.getDouble(i), delta);
+        break;
+      case INTEGER:
+        assertEquals(label, ea.getInt(i), aa.getInt(i));
+        break;
+      case LONG:
+        assertEquals(label, ea.getLong(i), aa.getLong(i));
+        break;
+      case STRING:
+        assertEquals(label, ea.getString(i), aa.getString(i));
+        break;
+      case DECIMAL:
+        assertEquals(label, ea.getDecimal(i), aa.getDecimal(i));
+        break;
+      case PERIOD:
+        assertEquals(label, ea.getPeriod(i), aa.getPeriod(i));
+        break;
+      default:
+        throw new IllegalStateException( "Unexpected type: " + ea.valueType());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
new file mode 100644
index 0000000..601abb1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.io.PrintStream;
+
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
+import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+
+/**
+ * Print a row set in CSV-like format. Primarily for debugging.
+ */
+
+public class RowSetPrinter {
+  private RowSet rowSet;
+
+  public RowSetPrinter(RowSet rowSet) {
+    this.rowSet = rowSet;
+  }
+
+  public void print() {
+    print(System.out);
+  }
+
+  public void print(PrintStream out) {
+    SelectionVectorMode selectionMode = rowSet.indirectionType();
+    RowSetReader reader = rowSet.reader();
+    int colCount = reader.schema().count();
+    printSchema(out, selectionMode);
+    while (reader.next()) {
+      printHeader(out, reader, selectionMode);
+      for (int i = 0; i < colCount; i++) {
+        if (i > 0) {
+          out.print(", ");
+        }
+        out.print(reader.getAsString(i));
+      }
+      out.println();
+    }
+  }
+
+  private void printSchema(PrintStream out, SelectionVectorMode selectionMode) {
+    out.print("#");
+    switch (selectionMode) {
+    case FOUR_BYTE:
+      out.print(" (batch #, row #)");
+      break;
+    case TWO_BYTE:
+      out.print(" (row #)");
+      break;
+    default:
+      break;
+    }
+    out.print(": ");
+    TupleSchema schema = rowSet.schema().hierarchicalAccess();
+    for (int i = 0; i < schema.count(); i++) {
+      if (i > 0) {
+        out.print(", ");
+      }
+      out.print(schema.column(i).getLastName());
+    }
+    out.println();
+  }
+
+  private void printHeader(PrintStream out, RowSetReader reader, SelectionVectorMode selectionMode) {
+    out.print(reader.index());
+    switch (selectionMode) {
+    case FOUR_BYTE:
+      out.print(" (");
+      out.print(reader.batchIndex());
+      out.print(", ");
+      out.print(reader.rowIndex());
+      out.print(")");
+      break;
+    case TWO_BYTE:
+      out.print(" (");
+      out.print(reader.rowIndex());
+      out.print(")");
+      break;
+    default:
+      break;
+    }
+    out.print(": ");
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
new file mode 100644
index 0000000..55b5f12
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * Row set schema presented as a number of distinct "views" for various
+ * purposes:
+ * <ul>
+ * <li>Batch schema: the schema used by a VectorContainer.</li>
+ * <li>Physical schema: the schema expressed as a hierarchy of
+ * tuples with the top tuple representing the row, nested tuples
+ * representing maps.</li>
+ * <li>Access schema: a flattened schema with all scalar columns
+ * at the top level, and with map columns pulled out into a separate
+ * collection. The flattened-scalar view is the one used to write to,
+ * and read from, the row set.</li>
+ * </ul>
+ * Allows easy creation of multiple row sets from the same schema.
+ * Each schema is immutable, which is fine for tests in which we
+ * want known inputs and outputs.
+ */
+
+public class RowSetSchema {
+
+  /**
+   * Logical description of a column. A logical column is a
+   * materialized field. For maps, also includes a logical schema
+   * of the map.
+   */
+
+  public static class LogicalColumn {
+    protected final String fullName;
+    protected final int accessIndex;
+    protected int flatIndex;
+    protected final MaterializedField field;
+
+    /**
+     * Schema of the map. Includes only those fields directly within
+     * the map; does not include fields from nested tuples.
+     */
+
+    protected PhysicalSchema mapSchema;
+
+    public LogicalColumn(String fullName, int accessIndex, MaterializedField field) {
+      this.fullName = fullName;
+      this.accessIndex = accessIndex;
+      this.field = field;
+    }
+
+    private void updateStructure(int index, PhysicalSchema children) {
+      flatIndex = index;
+      mapSchema = children;
+    }
+
+    public int accessIndex() { return accessIndex; }
+    public int flatIndex() { return flatIndex; }
+    public boolean isMap() { return mapSchema != null; }
+    public PhysicalSchema mapSchema() { return mapSchema; }
+    public MaterializedField field() { return field; }
+    public String fullName() { return fullName; }
+  }
+
+  /**
+   * Implementation of a tuple name space. Tuples allow both indexed and
+   * named access to their members.
+   *
+   * @param <T> the type of object representing each column
+   */
+
+  public static class NameSpace<T> {
+    private final Map<String,Integer> nameSpace = new HashMap<>();
+    private final List<T> columns = new ArrayList<>();
+
+    public int add(String key, T value) {
+      int index = columns.size();
+      nameSpace.put(key, index);
+      columns.add(value);
+      return index;
+    }
+
+    public T get(int index) {
+      return columns.get(index);
+    }
+
+    public T get(String key) {
+      int index = getIndex(key);
+      if (index == -1) {
+        return null;
+      }
+      return get(index);
+    }
+
+    public int getIndex(String key) {
+      Integer index = nameSpace.get(key);
+      if (index == null) {
+        return -1;
+      }
+      return index;
+    }
+
+    public int count() { return columns.size(); }
+  }
+
+  /**
+   * Provides a non-flattened, physical view of the schema. The top-level
+   * row includes maps, maps expand to a nested tuple schema. This view
+   * corresponds, more-or-less, to the physical storage of vectors in
+   * a vector accessible or vector container.
+   */
+
+  private static class TupleSchemaImpl implements TupleSchema {
+
+    private NameSpace<LogicalColumn> columns;
+
+    public TupleSchemaImpl(NameSpace<LogicalColumn> ns) {
+      this.columns = ns;
+    }
+
+    @Override
+    public MaterializedField column(int index) {
+      return logicalColumn(index).field();
+    }
+
+    public LogicalColumn logicalColumn(int index) { return columns.get(index); }
+
+    @Override
+    public MaterializedField column(String name) {
+      LogicalColumn col = columns.get(name);
+      return col == null ? null : col.field();
+    }
+
+    @Override
+    public int columnIndex(String name) {
+      return columns.getIndex(name);
+    }
+
+    @Override
+    public int count() { return columns.count(); }
+  }
+
+  /**
+   * Represents the flattened view of the schema used to get and set columns.
+   * Represents a left-to-right, depth-first traversal of the row and map
+   * columns. Holds only materialized vectors (non-maps). For completeness,
+   * provides access to maps also via separate methods, but this is generally
+   * of little use.
+   */
+
+  public static class FlattenedSchema extends TupleSchemaImpl {
+    protected final TupleSchemaImpl maps;
+
+    public FlattenedSchema(NameSpace<LogicalColumn> cols, NameSpace<LogicalColumn> maps) {
+      super(cols);
+      this.maps = new TupleSchemaImpl(maps);
+    }
+
+    public LogicalColumn logicalMap(int index) { return maps.logicalColumn(index); }
+    public MaterializedField map(int index) { return maps.column(index); }
+    public MaterializedField map(String name) { return maps.column(name); }
+    public int mapIndex(String name) { return maps.columnIndex(name); }
+    public int mapCount() { return maps.count(); }
+  }
+
+  /**
+   * Physical schema of a row set showing the logical hierarchy of fields
+   * with map fields as first-class fields. Map members appear as children
+   * under the map, much as they appear in the physical value-vector
+   * implementation.
+   */
+
+  public static class PhysicalSchema {
+    protected final NameSpace<LogicalColumn> schema = new NameSpace<>();
+
+    public LogicalColumn column(int index) {
+      return schema.get(index);
+    }
+
+    public LogicalColumn column(String name) {
+      return schema.get(name);
+    }
+
+    public int count() { return schema.count(); }
+
+    public NameSpace<LogicalColumn> nameSpace() { return schema; }
+  }
+
+  private static class SchemaExpander {
+    private final PhysicalSchema physicalSchema;
+    private final NameSpace<LogicalColumn> cols = new NameSpace<>();
+    private final NameSpace<LogicalColumn> maps = new NameSpace<>();
+
+    public SchemaExpander(BatchSchema schema) {
+      physicalSchema = expand("", schema);
+    }
+
+    private PhysicalSchema expand(String prefix, Iterable<MaterializedField> fields) {
+      PhysicalSchema physical = new PhysicalSchema();
+      for (MaterializedField field : fields) {
+        String name = prefix + field.getName();
+        int index;
+        LogicalColumn colSchema = new LogicalColumn(name, physical.count(), field);
+        physical.schema.add(field.getName(), colSchema);
+        PhysicalSchema children = null;
+        if (field.getType().getMinorType() == MinorType.MAP) {
+          index = maps.add(name, colSchema);
+          children = expand(name + ".", field.getChildren());
+        } else {
+          index = cols.add(name, colSchema);
+        }
+        colSchema.updateStructure(index, children);
+      }
+      return physical;
+    }
+  }
+
+  private final BatchSchema batchSchema;
+  private final TupleSchemaImpl accessSchema;
+  private final FlattenedSchema flatSchema;
+  private final PhysicalSchema physicalSchema;
+
+  public RowSetSchema(BatchSchema schema) {
+    batchSchema = schema;
+    SchemaExpander expander = new SchemaExpander(schema);
+    physicalSchema = expander.physicalSchema;
+    accessSchema = new TupleSchemaImpl(physicalSchema.nameSpace());
+    flatSchema = new FlattenedSchema(expander.cols, expander.maps);
+  }
+
+  /**
+   * A hierarchical schema that includes maps, with maps expanding
+   * to a nested tuple schema. Not used at present; this is intended
+   * to be the bases of non-flattened accessors if we find the need.
+   * @return the hierarchical access schema
+   */
+
+  public TupleSchema hierarchicalAccess() { return accessSchema; }
+
+  /**
+   * A flattened (left-to-right, depth-first traversal) of the non-map
+   * columns in the row. Used to define the column indexes in the
+   * get methods for row readers and the set methods for row writers.
+   * @return the flattened access schema
+   */
+
+  public FlattenedSchema flatAccess() { return flatSchema; }
+
+  /**
+   * Internal physical schema in hierarchical order. Mostly used to create
+   * the other schemas, but may be of use in special cases. Has the same
+   * structure as the batch schema, but with additional information.
+   * @return a tree-structured physical schema
+   */
+
+  public PhysicalSchema physical() { return physicalSchema; }
+
+  /**
+   * The batch schema used by the Drill runtime. Represents a tree-structured
+   * list of top-level fields, including maps. Maps contain a nested schema.
+   * @return the batch schema used by the Drill runtime
+   */
+
+  public BatchSchema batch() { return batchSchema; }
+
+  /**
+   * Convert this schema to a new batch schema that includes the specified
+   * selection vector mode.
+   * @param svMode selection vector mode for the new schema
+   * @return the new batch schema
+   */
+
+  public BatchSchema toBatchSchema(SelectionVectorMode svMode) {
+    List<MaterializedField> fields = new ArrayList<>();
+    for (MaterializedField field : batchSchema) {
+      fields.add(field);
+    }
+    return new BatchSchema(svMode, fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/095a660b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
new file mode 100644
index 0000000..261a9c1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.accessor.AccessorUtilities;
+import org.apache.drill.exec.vector.accessor.ColumnAccessor.ValueType;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+/**
+ * Various utilities useful for working with row sets, especially for testing.
+ */
+
+public class RowSetUtilities {
+
+  private RowSetUtilities() { }
+
+  /**
+   * Reverse a row set by reversing the entries in an SV2. This is a quick
+   * and easy way to reverse the sort order of an expected-value row set.
+   * @param sv2 the SV2 which is reversed in place
+   */
+
+  public static void reverse(SelectionVector2 sv2) {
+    int count = sv2.getCount();
+    for (int i = 0; i < count / 2; i++) {
+      char temp = sv2.getIndex(i);
+      int dest = count - 1 - i;
+      sv2.setIndex(i, sv2.getIndex(dest));
+      sv2.setIndex(dest, temp);
+    }
+  }
+
+  /**
+   * Set a test data value from an int. Uses the type information of the
+   * column to handle interval types. Else, uses the value type of the
+   * accessor. The value set here is purely for testing; the mapping
+   * from ints to intervals has no real meaning.
+   *
+   * @param rowWriter
+   * @param index
+   * @param value
+   */
+
+  public static void setFromInt(RowSetWriter rowWriter, int index, int value) {
+    ColumnWriter writer = rowWriter.column(index);
+    if (writer.valueType() == ValueType.PERIOD) {
+      setPeriodFromInt(writer, rowWriter.schema().column(index).getType().getMinorType(), value);
+    } else {
+      AccessorUtilities.setFromInt(writer, value);
+    }
+  }
+
+  /**
+   * Ad-hoc, test-only method to set a Period from an integer. Periods are made up of
+   * months and millseconds. There is no mapping from one to the other, so a period
+   * requires at least two number. Still, we are given just one (typically from a test
+   * data generator.) Use that int value to "spread" some value across the two kinds
+   * of fields. The result has no meaning, but has the same comparison order as the
+   * original ints.
+   *
+   * @param writer column writer for a period column
+   * @param minorType the Drill data type
+   * @param value the integer value to apply
+   */
+
+  public static void setPeriodFromInt(ColumnWriter writer, MinorType minorType,
+      int value) {
+    switch (minorType) {
+    case INTERVAL:
+      writer.setPeriod(Duration.millis(value).toPeriod());
+      break;
+    case INTERVALYEAR:
+      writer.setPeriod(Period.years(value / 12).withMonths(value % 12));
+      break;
+    case INTERVALDAY:
+      int sec = value % 60;
+      value = value / 60;
+      int min = value % 60;
+      value = value / 60;
+      writer.setPeriod(Period.days(value).withMinutes(min).withSeconds(sec));
+      break;
+    default:
+      throw new IllegalArgumentException("Writer is not an interval: " + minorType);
+    }
+  }
+}