You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:42 UTC

[15/15] drill git commit: DRILL-5657: Size-aware vector writer structure

DRILL-5657: Size-aware vector writer structure

- Vector and accessor layer
- Row Set layer
- Tuple and column models
- Revised write-time metadata
- "Result set loader" layer

this closes #914


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/40de8ca4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/40de8ca4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/40de8ca4

Branch: refs/heads/master
Commit: 40de8ca4f47533fa6593d1266403868ae1a2119f
Parents: eb0c403
Author: Paul Rogers <pr...@maprtech.com>
Authored: Thu Aug 17 22:41:30 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Wed Dec 20 21:17:48 2017 -0800

----------------------------------------------------------------------
 .../exec/physical/rowSet/ResultSetLoader.java   |  204 +++
 .../exec/physical/rowSet/ResultVectorCache.java |   33 +
 .../exec/physical/rowSet/RowSetLoader.java      |  153 +++
 .../exec/physical/rowSet/impl/ColumnState.java  |  358 +++++
 .../physical/rowSet/impl/NullProjectionSet.java |   41 +
 .../rowSet/impl/NullResultVectorCacheImpl.java  |   41 +
 .../physical/rowSet/impl/NullVectorState.java   |   52 +
 .../rowSet/impl/NullableVectorState.java        |  108 ++
 .../physical/rowSet/impl/OptionBuilder.java     |  134 ++
 .../rowSet/impl/PrimitiveColumnState.java       |  105 ++
 .../physical/rowSet/impl/ProjectionSet.java     |   48 +
 .../physical/rowSet/impl/ProjectionSetImpl.java |  136 ++
 .../rowSet/impl/RepeatedVectorState.java        |  168 +++
 .../rowSet/impl/ResultSetLoaderImpl.java        |  775 +++++++++++
 .../rowSet/impl/ResultVectorCacheImpl.java      |  186 +++
 .../physical/rowSet/impl/RowSetLoaderImpl.java  |   98 ++
 .../physical/rowSet/impl/SingleVectorState.java |  274 ++++
 .../exec/physical/rowSet/impl/TupleState.java   |  388 ++++++
 .../rowSet/impl/VectorContainerBuilder.java     |  257 ++++
 .../exec/physical/rowSet/impl/VectorState.java  |  102 ++
 .../physical/rowSet/impl/WriterIndexImpl.java   |  100 ++
 .../exec/physical/rowSet/impl/package-info.java |  304 +++++
 .../physical/rowSet/model/BaseTupleModel.java   |  117 ++
 .../physical/rowSet/model/ContainerVisitor.java |  115 ++
 .../physical/rowSet/model/MetadataProvider.java |   93 ++
 .../exec/physical/rowSet/model/ReaderIndex.java |   53 +
 .../physical/rowSet/model/SchemaInference.java  |   61 +
 .../exec/physical/rowSet/model/TupleModel.java  |  117 ++
 .../rowSet/model/hyper/BaseReaderBuilder.java   |  149 +++
 .../rowSet/model/hyper/package-info.java        |   30 +
 .../physical/rowSet/model/package-info.java     |   68 +
 .../rowSet/model/single/BaseReaderBuilder.java  |   89 ++
 .../rowSet/model/single/BaseWriterBuilder.java  |   72 +
 .../model/single/BuildVectorsFromMetadata.java  |   97 ++
 .../rowSet/model/single/VectorAllocator.java    |  112 ++
 .../rowSet/model/single/package-info.java       |   28 +
 .../exec/physical/rowSet/package-info.java      |  193 +++
 .../apache/drill/exec/record/BatchSchema.java   |   42 +-
 .../apache/drill/exec/record/RecordBatch.java   |    3 +-
 .../apache/drill/exec/record/TupleSchema.java   |  534 ++++++++
 .../exec/record/selection/SelectionVector2.java |   20 +-
 .../exec/cache/TestBatchSerialization.java      |   22 +-
 .../exec/physical/impl/TopN/TopNBatchTest.java  |   26 +-
 .../impl/validate/TestBatchValidator.java       |   64 +-
 .../physical/impl/xsort/TestExternalSort.java   |   12 +-
 .../impl/xsort/managed/SortTestUtilities.java   |    8 +-
 .../physical/impl/xsort/managed/TestCopier.java |  146 +-
 .../impl/xsort/managed/TestShortArrays.java     |    8 +-
 .../impl/xsort/managed/TestSortImpl.java        |   46 +-
 .../physical/impl/xsort/managed/TestSorter.java |   38 +-
 .../rowSet/impl/TestResultSetLoaderLimits.java  |  224 ++++
 .../impl/TestResultSetLoaderMapArray.java       |  481 +++++++
 .../rowSet/impl/TestResultSetLoaderMaps.java    |  810 +++++++++++
 .../impl/TestResultSetLoaderOmittedValues.java  |  379 ++++++
 .../impl/TestResultSetLoaderOverflow.java       |  680 ++++++++++
 .../impl/TestResultSetLoaderProjection.java     |  470 +++++++
 .../impl/TestResultSetLoaderProtocol.java       |  586 ++++++++
 .../rowSet/impl/TestResultSetLoaderTorture.java |  453 +++++++
 .../rowSet/impl/TestResultSetSchemaChange.java  |  245 ++++
 .../drill/exec/record/TestTupleSchema.java      |  509 +++++++
 .../drill/exec/record/TestVectorContainer.java  |  127 --
 .../exec/record/vector/TestValueVector.java     |   12 +
 .../apache/drill/exec/sql/TestInfoSchema.java   |    2 +-
 .../exec/store/easy/text/compliant/TestCsv.java |    6 +-
 .../java/org/apache/drill/test/ExampleTest.java |    4 +-
 .../org/apache/drill/test/OperatorFixture.java  |   30 +-
 .../org/apache/drill/test/QueryBuilder.java     |   12 +-
 .../apache/drill/test/QueryRowSetIterator.java  |    2 +-
 .../drill/test/rowSet/AbstractRowSet.java       |  109 +-
 .../drill/test/rowSet/AbstractSingleRowSet.java |  182 +--
 .../apache/drill/test/rowSet/DirectRowSet.java  |  171 +--
 .../drill/test/rowSet/HyperRowSetImpl.java      |  245 +---
 .../drill/test/rowSet/IndirectRowSet.java       |   38 +-
 .../org/apache/drill/test/rowSet/RowSet.java    |   81 +-
 .../apache/drill/test/rowSet/RowSetBuilder.java |   32 +-
 .../drill/test/rowSet/RowSetComparison.java     |  124 +-
 .../apache/drill/test/rowSet/RowSetPrinter.java |   30 +-
 .../apache/drill/test/rowSet/RowSetReader.java  |   54 +
 .../drill/test/rowSet/RowSetReaderImpl.java     |   76 ++
 .../apache/drill/test/rowSet/RowSetSchema.java  |  304 -----
 .../drill/test/rowSet/RowSetUtilities.java      |  101 +-
 .../apache/drill/test/rowSet/RowSetWriter.java  |  119 ++
 .../drill/test/rowSet/RowSetWriterImpl.java     |  155 +++
 .../apache/drill/test/rowSet/SchemaBuilder.java |   87 +-
 .../drill/test/rowSet/file/JsonFileBuilder.java |   35 +-
 .../drill/test/rowSet/test/DummyWriterTest.java |  169 +++
 .../drill/test/rowSet/test/PerformanceTool.java |  296 ++++
 .../drill/test/rowSet/test/RowSetTest.java      |  858 +++++++-----
 .../drill/test/rowSet/test/TestFillEmpties.java |  241 ++++
 .../test/rowSet/test/TestFixedWidthWriter.java  |  444 ++++++
 .../rowSet/test/TestOffsetVectorWriter.java     |  425 ++++++
 .../test/rowSet/test/TestScalarAccessors.java   | 1266 ++++++++++++++++++
 .../rowSet/test/TestVariableWidthWriter.java    |  418 ++++++
 .../drill/test/rowSet/test/VectorPrinter.java   |   72 +
 .../apache/drill/vector/TestFillEmpties.java    |   55 +-
 .../apache/drill/vector/TestVectorLimits.java   |  487 -------
 exec/jdbc-all/pom.xml                           |    2 +-
 .../src/main/java/io/netty/buffer/DrillBuf.java |   70 +-
 .../netty/buffer/PooledByteBufAllocatorL.java   |   62 +-
 .../drill/exec/memory/AllocationManager.java    |   89 +-
 .../main/codegen/templates/ColumnAccessors.java |  383 +++---
 .../codegen/templates/FixedValueVectors.java    |  293 +---
 .../codegen/templates/NullableValueVectors.java |   91 +-
 .../codegen/templates/RepeatedValueVectors.java |   71 +-
 .../src/main/codegen/templates/UnionVector.java |   44 +-
 .../templates/VariableLengthVectors.java        |  216 +--
 .../drill/exec/record/ColumnMetadata.java       |  114 ++
 .../drill/exec/record/MaterializedField.java    |   41 +-
 .../apache/drill/exec/record/TupleMetadata.java |   88 ++
 .../drill/exec/record/TupleNameSpace.java       |   89 ++
 .../drill/exec/vector/AllocationHelper.java     |    2 +-
 .../drill/exec/vector/BaseDataValueVector.java  |   16 +
 .../org/apache/drill/exec/vector/BitVector.java |   52 +-
 .../drill/exec/vector/FixedWidthVector.java     |    7 +-
 .../apache/drill/exec/vector/ObjectVector.java  |   26 +-
 .../drill/exec/vector/UntypedNullVector.java    |   59 +-
 .../apache/drill/exec/vector/ValueVector.java   |   53 +-
 .../drill/exec/vector/VariableWidthVector.java  |    4 +-
 .../apache/drill/exec/vector/VectorUtils.java   |   63 -
 .../apache/drill/exec/vector/ZeroVector.java    |    6 +-
 .../exec/vector/accessor/AccessorUtilities.java |  125 --
 .../drill/exec/vector/accessor/ArrayReader.java |  108 +-
 .../drill/exec/vector/accessor/ArrayWriter.java |   60 +-
 .../exec/vector/accessor/ColumnAccessor.java    |   40 -
 .../exec/vector/accessor/ColumnReader.java      |   64 -
 .../exec/vector/accessor/ColumnReaderIndex.java |   28 +
 .../exec/vector/accessor/ColumnWriter.java      |   45 -
 .../exec/vector/accessor/ColumnWriterIndex.java |   76 ++
 .../exec/vector/accessor/ObjectReader.java      |   60 +
 .../drill/exec/vector/accessor/ObjectType.java  |   28 +
 .../exec/vector/accessor/ObjectWriter.java      |  101 ++
 .../vector/accessor/ScalarElementReader.java    |   65 +
 .../exec/vector/accessor/ScalarReader.java      |   75 ++
 .../exec/vector/accessor/ScalarWriter.java      |   71 +-
 .../exec/vector/accessor/TupleAccessor.java     |   71 -
 .../drill/exec/vector/accessor/TupleReader.java |   36 +-
 .../drill/exec/vector/accessor/TupleWriter.java |  154 ++-
 .../drill/exec/vector/accessor/ValueType.java   |   31 +
 .../accessor/impl/AbstractArrayReader.java      |  128 --
 .../accessor/impl/AbstractArrayWriter.java      |  127 --
 .../accessor/impl/AbstractColumnAccessor.java   |   43 -
 .../accessor/impl/AbstractColumnReader.java     |  126 --
 .../accessor/impl/AbstractColumnWriter.java     |   87 --
 .../accessor/impl/AbstractTupleAccessor.java    |   38 -
 .../vector/accessor/impl/AccessorUtilities.java |   53 +
 .../accessor/impl/ColumnAccessorFactory.java    |  122 --
 .../accessor/impl/HierarchicalFormatter.java    |   38 +
 .../accessor/impl/HierarchicalPrinter.java      |  238 ++++
 .../vector/accessor/impl/TupleReaderImpl.java   |  151 ---
 .../vector/accessor/impl/TupleWriterImpl.java   |  162 ---
 .../exec/vector/accessor/package-info.java      |   79 +-
 .../accessor/reader/AbstractArrayReader.java    |  188 +++
 .../accessor/reader/AbstractObjectReader.java   |   52 +
 .../accessor/reader/AbstractTupleReader.java    |  189 +++
 .../accessor/reader/BaseElementReader.java      |  187 +++
 .../accessor/reader/BaseScalarReader.java       |  189 +++
 .../accessor/reader/ColumnReaderFactory.java    |  109 ++
 .../accessor/reader/ElementReaderIndex.java     |   24 +
 .../reader/FixedWidthElementReaderIndex.java    |   38 +
 .../exec/vector/accessor/reader/MapReader.java  |   43 +
 .../accessor/reader/ObjectArrayReader.java      |  159 +++
 .../accessor/reader/ScalarArrayReader.java      |  102 ++
 .../vector/accessor/reader/VectorAccessor.java  |   26 +
 .../vector/accessor/reader/package-info.java    |   26 +
 .../accessor/writer/AbstractArrayWriter.java    |  348 +++++
 .../writer/AbstractFixedWidthWriter.java        |  258 ++++
 .../accessor/writer/AbstractObjectWriter.java   |   72 +
 .../accessor/writer/AbstractScalarWriter.java   |  126 ++
 .../accessor/writer/AbstractTupleWriter.java    |  450 +++++++
 .../accessor/writer/BaseScalarWriter.java       |  272 ++++
 .../accessor/writer/BaseVarWidthWriter.java     |  157 +++
 .../accessor/writer/ColumnWriterFactory.java    |  196 +++
 .../exec/vector/accessor/writer/MapWriter.java  |  155 +++
 .../accessor/writer/NullableScalarWriter.java   |  190 +++
 .../accessor/writer/ObjectArrayWriter.java      |  143 ++
 .../accessor/writer/OffsetVectorWriter.java     |  283 ++++
 .../accessor/writer/ScalarArrayWriter.java      |  229 ++++
 .../vector/accessor/writer/WriterEvents.java    |  127 ++
 .../accessor/writer/dummy/DummyArrayWriter.java |   96 ++
 .../writer/dummy/DummyScalarWriter.java         |   89 ++
 .../accessor/writer/dummy/package-info.java     |   54 +
 .../vector/accessor/writer/package-info.java    |  151 +++
 .../exec/vector/complex/AbstractMapVector.java  |   13 +-
 .../vector/complex/BaseRepeatedValueVector.java |   13 +-
 .../drill/exec/vector/complex/ListVector.java   |    4 +-
 .../drill/exec/vector/complex/MapVector.java    |   24 +-
 .../exec/vector/complex/RepeatedListVector.java |   20 +-
 .../exec/vector/complex/RepeatedMapVector.java  |   21 +-
 188 files changed, 22717 insertions(+), 4811 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
new file mode 100644
index 0000000..a4b260b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet;
+
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+
+/**
+ * Builds a result set (series of zero or more row sets) based on a defined
+ * schema which may
+ * evolve (expand) over time. Automatically rolls "overflow" rows over
+ * when a batch fills.
+ * <p>
+ * Many of the methods in this interface verify that the loader is
+ * in the proper state. For example, an exception is thrown if the caller
+ * attempts to save a row before starting a batch. However, the per-column
+ * write methods are checked only through assertions that should enabled
+ * during testing, but will be disabled during production.
+ *
+ * @see {@link VectorContainerWriter}, the class which this class
+ * replaces
+ */
+
+public interface ResultSetLoader {
+
+  public static final int DEFAULT_ROW_COUNT = BaseValueVector.INITIAL_VALUE_ALLOCATION;
+
+  /**
+   * Current schema version. The version increments by one each time
+   * a column is added.
+   * @return the current schema version
+   */
+
+  int schemaVersion();
+
+  /**
+   * Adjust the number of rows to produce in the next batch. Takes
+   * affect after the next call to {@link #startBatch()}.
+   *
+   * @param count target batch row count
+   */
+
+  void setTargetRowCount(int count);
+
+  /**
+   * The number of rows produced by this loader (as configured in the loader
+   * options.)
+   *
+   * @return the target row count for batches that this loader produces
+   */
+
+  int targetRowCount();
+
+  /**
+   * The largest vector size produced by this loader (as specified by
+   * the value vector limit.)
+   *
+   * @return the largest vector size. Attempting to extend a vector beyond
+   * this limit causes automatic vector overflow and terminates the
+   * in-flight batch, even if the batch has not yet reached the target
+   * row count
+   */
+
+  int targetVectorSize();
+
+  /**
+   * Total number of batches created. Includes the current batch if
+   * the row count in this batch is non-zero.
+   * @return the number of batches produced including the current
+   * one
+   */
+
+  int batchCount();
+
+  /**
+   * Total number of rows loaded for all previous batches and the
+   * current batch.
+   * @return total row count
+   */
+
+  int totalRowCount();
+
+  /**
+   * Start a new row batch. Valid only when first started, or after the
+   * previous batch has been harvested.
+   */
+
+  void startBatch();
+
+  /**
+   * Writer for the top-level tuple (the entire row). Valid only when
+   * the mutator is actively writing a batch (after <tt>startBatch()</tt>
+   * but before </tt>harvest()</tt>.)
+   *
+   * @return writer for the top-level columns
+   */
+
+  RowSetLoader writer();
+  boolean writeable();
+
+  /**
+   * Load a row using column values passed as variable-length arguments. Expects
+   * map values to represented as an array.
+   * A schema of (a:int, b:map(c:varchar)) would be>
+   * set as <br><tt>loadRow(10, new Object[] {"foo"});</tt><br>
+   * Values of arrays can be expressed as a Java
+   * array. A schema of (a:int, b:int[]) can be set as<br>
+   * <tt>loadRow(10, new int[] {100, 200});</tt><br>.
+   * Primarily for testing, too slow for production code.
+   * <p>
+   * If the row consists of a single map or list, then the one value will be an
+   * <tt>Object</tt> array, creating an ambiguity. Use <tt>writer().set(0, value);</tt>
+   * in this case.
+   *
+   * @param values column values in column index order
+   * @return this loader
+   */
+
+  ResultSetLoader setRow(Object...values);
+
+  /**
+   * Return the output container, primarily to obtain the schema
+   * and set of vectors. Depending on when this is called, the
+   * data may or may not be populated: call
+   * {@link #harvest()} to obtain the container for a batch.
+   * <p>
+   * This method is useful when the schema is known and fixed.
+   * After declaring the schema, call this method to get the container
+   * that holds the vectors for use in planning projection, etc.
+   * <p>
+   * If the result set schema changes, then a call to this method will
+   * return the latest schema. But, if the schema changes during the
+   * overflow row, then this method will not see those changes until
+   * after harvesting the current batch. (This avoid the appearance
+   * of phantom columns in the output since the new column won't
+   * appear until the next batch.)
+   * <p>
+   * Never count on the data in the container; it may be empty, half
+   * written, or inconsistent. Always call
+   * {@link #harvest()} to obtain the container for a batch.
+   *
+   * @return the output container including schema and value
+   * vectors
+   */
+
+  VectorContainer outputContainer();
+
+  /**
+   * Harvest the current row batch, and reset the mutator
+   * to the start of the next row batch (which may already contain
+   * an overflow row.
+   * <p>
+   * The schema of the returned container is defined as:
+   * <ul>
+   * <li>The schema as passed in via the loader options, plus</li>
+   * <li>Columns added dynamically during write, minus</li>
+   * <li>Any columns not included in the project list, minus</li>
+   * <li>Any columns added in the overflow row.</li>
+   * </ul>
+   * That is, column order is as defined by the initial schema and column
+   * additions. In particular, the schema order is <b>not</b> defined by
+   * the projection list. (Another mechanism is required to reorder columns
+   * for the actual projection.)
+   *
+   * @return the row batch to send downstream
+   */
+
+  VectorContainer harvest();
+
+  /**
+   * The schema of the harvested batch. Valid until the start of the
+   * next batch.
+   *
+   * @return the extended schema of the harvested batch which includes
+   * any allocation hints used when creating the batch
+   */
+
+  TupleMetadata harvestSchema();
+
+  /**
+   * Called after all rows are returned, whether because no more data is
+   * available, or the caller wishes to cancel the current row batch
+   * and complete.
+   */
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java
new file mode 100644
index 0000000..6e32b5d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Interface for a cache that implements "vector persistence" across
+ * multiple result set loaders. Allows a single scan operator to offer
+ * the same set of vectors even when data is read by a set of readers.
+ */
+
+public interface ResultVectorCache {
+  BufferAllocator allocator();
+  ValueVector addOrGet(MaterializedField colSchema);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java
new file mode 100644
index 0000000..070e9a9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet;
+
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * Interface for writing values to a row set. Only available for newly-created
+ * single row sets.
+ * <p>
+ * Typical usage:
+ *
+ * <pre></code>
+ * void writeABatch() {
+ *   RowSetLoader writer = ...
+ *   while (! writer.isFull()) {
+ *     writer.start();
+ *     writer.scalar(0).setInt(10);
+ *     writer.scalar(1).setString("foo");
+ *     ...
+ *     writer.save();
+ *   }
+ * }</code></pre>
+ * Alternative usage:
+ *
+ * <pre></code>
+ * void writeABatch() {
+ *   RowSetLoader writer = ...
+ *   while (writer.start()) {
+ *     writer.scalar(0).setInt(10);
+ *     writer.scalar(1).setString("foo");
+ *     ...
+ *     writer.save();
+ *   }
+ * }</code></pre>
+ *
+ * The above writes until the batch is full, based on size or vector overflow.
+ * That is, the details of vector overflow are hidden from the code that calls
+ * the writer.
+ */
+
+public interface RowSetLoader extends TupleWriter {
+
+  ResultSetLoader loader();
+
+  /**
+   * Write a row of values, given by Java objects. Object type must match
+   * expected column type. Stops writing, and returns false, if any value causes
+   * vector overflow. Value format:
+   * <ul>
+   * <li>For scalars, the value as a suitable Java type (int or Integer, say,
+   * for <tt>INTEGER</tt> values.)</li>
+   * <li>For scalar arrays, an array of a suitable Java primitive type for
+   * scalars. For example, <tt>int[]</tt> for an <tt>INTEGER</tt> column.</li>
+   * <li>For a Map, an <tt>Object<tt> array with values encoded as above.
+   * (In fact, the list here is the same as the map format.</li>
+   * <li>For a list (repeated map, list of list), an <tt>Object</tt> array with
+   * values encoded as above. (So, for a repeated map, an outer <tt>Object</tt>
+   * map encodes the array, an inner one encodes the map members.</li>
+   * </ul>
+   *
+   * @param values
+   *          variable-length argument list of column values
+   */
+
+  RowSetLoader addRow(Object... values);
+
+  /**
+   * Indicates that no more rows fit into the current row batch and that the row
+   * batch should be harvested and sent downstream. Any overflow row is
+   * automatically saved for the next cycle. The value is undefined when a batch
+   * is not active.
+   * <p>
+   * Will be false on the first row, and all subsequent rows until either the
+   * maximum number of rows are written, or a vector overflows. After that, will
+   * return true. The method returns false as soon as any column writer
+   * overflows even in the middle of a row write. That is, this writer does not
+   * automatically handle overflow rows because that added complexity is seldom
+   * needed for tests.
+   *
+   * @return true if another row can be written, false if not
+   */
+
+  boolean isFull();
+
+  /**
+   * The number of rows in the current row set. Does not count any overflow row
+   * saved for the next batch.
+   *
+   * @return number of rows to be sent downstream
+   */
+
+  int rowCount();
+
+  /**
+   * The index of the current row. Same as the row count except in an overflow
+   * row in which case the row index will revert to zero as soon as any vector
+   * overflows. Note: this means that the index can change between columns in a
+   * single row. Applications usually don't use this index directly; rely on the
+   * writers to write to the proper location.
+   *
+   * @return the current write index
+   */
+
+  int rowIndex();
+
+  /**
+   * Prepare a new row for writing. Call this before each row.
+   * <p>
+   * Handles a very special case: that of discarding the last row written.
+   * A reader can read a row into vectors, then "sniff" the row to check,
+   * for example, against a filter. If the row is not wanted, simply omit
+   * the call to <tt>save()</tt> and the next all to <tt>start()</tt> will
+   * discard the unsaved row.
+   * <p>
+   * Note that the vectors still contain values in the
+   * discarded position; just the various pointers are unset. If
+   * the batch ends before the discarded values are overwritten, the
+   * discarded values just exist at the end of the vector. Since vectors
+   * start with garbage contents, the discarded values are simply a different
+   * kind of garbage. But, if the client writes a new row, then the new
+   * row overwrites the discarded row. This works because we only change
+   * the tail part of a vector; never the internals.
+   *
+   * @return true if another row can be added, false if the batch is full
+   */
+
+  boolean start();
+
+  /**
+   * Saves the current row and moves to the next row. Failing to call this
+   * method effectively abandons the in-flight row; something that may be useful
+   * to recover from partially-written rows that turn out to contain errors.
+   * Done automatically if using <tt>setRow()</tt>.
+   */
+
+  void save();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
new file mode 100644
index 0000000..f3626d9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.ArrayList;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapState;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
+import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
+
+/**
+ * Represents the write-time state for a column including the writer and the (optional)
+ * backing vector. Implements per-column operations such as vector overflow. If a column
+ * is a (possibly repeated) map, then the column state will hold a tuple state.
+ * <p>
+ * If a column is not projected, then the writer exists (to make life easier for the
+ * reader), but there will be no vector backing the writer.
+ * <p>
+ * Different columns need different kinds of vectors: a data vector, possibly an offset
+ * vector, or even a non-existent vector. The {@link VectorState} class abstracts out
+ * these diffrences.
+ */
+
+public abstract class ColumnState {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnState.class);
+
+  public static abstract class BaseMapColumnState extends ColumnState {
+    protected final MapState mapState;
+
+    public BaseMapColumnState(ResultSetLoaderImpl resultSetLoader,
+         AbstractObjectWriter writer, VectorState vectorState,
+         ProjectionSet projectionSet) {
+      super(resultSetLoader, writer, vectorState);
+      mapState = new MapState(resultSetLoader, this, projectionSet);
+    }
+
+    @Override
+    public void rollover() {
+      super.rollover();
+      mapState.rollover();
+    }
+
+    @Override
+    public void startBatch() {
+      super.startBatch();
+      mapState.startBatch();
+    }
+
+    @Override
+    public void harvestWithLookAhead() {
+      super.harvestWithLookAhead();
+      mapState.harvestWithLookAhead();
+    }
+
+    @Override
+    public void close() {
+      super.close();
+      mapState.close();
+    }
+
+    public MapState mapState() { return mapState; }
+  }
+
+  public static class MapColumnState extends BaseMapColumnState {
+
+    public MapColumnState(ResultSetLoaderImpl resultSetLoader,
+        ColumnMetadata columnSchema,
+        ProjectionSet projectionSet) {
+      super(resultSetLoader,
+          ColumnWriterFactory.buildMap(columnSchema, null,
+              new ArrayList<AbstractObjectWriter>()),
+          new NullVectorState(),
+          projectionSet);
+    }
+
+    @Override
+    public void updateCardinality(int cardinality) {
+      super.updateCardinality(cardinality);
+      mapState.updateCardinality(cardinality);
+    }
+  }
+
+  public static class MapArrayColumnState extends BaseMapColumnState {
+
+    public MapArrayColumnState(ResultSetLoaderImpl resultSetLoader,
+        AbstractObjectWriter writer,
+        VectorState vectorState,
+        ProjectionSet projectionSet) {
+      super(resultSetLoader, writer,
+          vectorState,
+          projectionSet);
+    }
+
+    @SuppressWarnings("resource")
+    public static MapArrayColumnState build(ResultSetLoaderImpl resultSetLoader,
+        ColumnMetadata columnSchema,
+        ProjectionSet projectionSet) {
+
+      // Create the map's offset vector.
+
+      UInt4Vector offsetVector = new UInt4Vector(
+          BaseRepeatedValueVector.OFFSETS_FIELD,
+          resultSetLoader.allocator());
+
+      // Create the writer using the offset vector
+
+      AbstractObjectWriter writer = ColumnWriterFactory.buildMapArray(
+          columnSchema, offsetVector,
+          new ArrayList<AbstractObjectWriter>());
+
+      // Wrap the offset vector in a vector state
+
+      VectorState vectorState = new OffsetVectorState(
+            ((AbstractArrayWriter) writer.array()).offsetWriter(),
+            offsetVector,
+            (AbstractObjectWriter) writer.array().entry());
+
+      // Assemble it all into the column state.
+
+      return new MapArrayColumnState(resultSetLoader,
+                  writer, vectorState, projectionSet);
+    }
+
+    @Override
+    public void updateCardinality(int cardinality) {
+      super.updateCardinality(cardinality);
+      int childCardinality = cardinality * schema().expectedElementCount();
+      mapState.updateCardinality(childCardinality);
+    }
+  }
+
+  /**
+   * Columns move through various lifecycle states as identified by this
+   * enum. (Yes, sorry that the term "state" is used in two different ways
+   * here: the variables for a column and the point within the column
+   * lifecycle.
+   */
+
+  protected enum State {
+
+    /**
+     * Column is in the normal state of writing with no overflow
+     * in effect.
+     */
+
+    NORMAL,
+
+    /**
+     * Like NORMAL, but means that the data has overflowed and the
+     * column's data for the current row appears in the new,
+     * overflow batch. For a client that omits some columns, written
+     * columns will be in OVERFLOW state, unwritten columns in
+     * NORMAL state.
+     */
+
+    OVERFLOW,
+
+    /**
+     * Indicates that the column has data saved
+     * in the overflow batch.
+     */
+
+    LOOK_AHEAD,
+
+    /**
+     * Like LOOK_AHEAD, but indicates the special case that the column
+     * was added after overflow, so there is no vector for the column
+     * in the harvested batch.
+     */
+
+    NEW_LOOK_AHEAD
+  }
+
+  protected final ResultSetLoaderImpl resultSetLoader;
+  protected final int addVersion;
+  protected final VectorState vectorState;
+  protected State state;
+  protected AbstractObjectWriter writer;
+
+  /**
+   * Cardinality of the value itself. If this is an array,
+   * then this is the number of arrays. A separate number,
+   * the inner cardinality, is computed as the outer cardinality
+   * times the expected array count (from metadata.) The inner
+   * cardinality is the total number of array items in the
+   * vector.
+   */
+
+  protected int outerCardinality;
+
+  public ColumnState(ResultSetLoaderImpl resultSetLoader,
+      AbstractObjectWriter writer, VectorState vectorState) {
+    this.resultSetLoader = resultSetLoader;
+    this.vectorState = vectorState;
+    this.addVersion = resultSetLoader.bumpVersion();
+    state = resultSetLoader.hasOverflow() ?
+        State.NEW_LOOK_AHEAD : State.NORMAL;
+    this.writer = writer;
+  }
+
+  public AbstractObjectWriter writer() { return writer; }
+  public ColumnMetadata schema() { return writer.schema(); }
+
+  public ValueVector vector() { return vectorState.vector(); }
+
+  public void allocateVectors() {
+    assert outerCardinality != 0;
+    resultSetLoader.tallyAllocations(
+        vectorState.allocate(outerCardinality));
+  }
+
+  /**
+   * Prepare the column for a new row batch after overflow on the previous
+   * batch. Restore the look-ahead buffer to the
+   * active vector so we start writing where we left off.
+   */
+
+  public void startBatch() {
+    switch (state) {
+    case NORMAL:
+      resultSetLoader.tallyAllocations(vectorState.allocate(outerCardinality));
+      break;
+
+    case NEW_LOOK_AHEAD:
+
+      // Column is new, was not exchanged with backup vector
+
+      break;
+
+    case LOOK_AHEAD:
+
+      // Restore the look-ahead values to the main vector.
+
+      vectorState.startBatchWithLookAhead();
+      break;
+
+    default:
+      throw new IllegalStateException("Unexpected state: " + state);
+    }
+
+    // In all cases, we are back to normal writing.
+
+    state = State.NORMAL;
+  }
+
+  /**
+   * A column within the row batch overflowed. Prepare to absorb the rest of the
+   * in-flight row by rolling values over to a new vector, saving the complete
+   * vector for later. This column could have a value for the overflow row, or
+   * for some previous row, depending on exactly when and where the overflow
+   * occurs.
+   */
+
+  public void rollover() {
+    assert state == State.NORMAL;
+
+    // If the source index is 0, then we could not fit this one
+    // value in the original vector. Nothing will be accomplished by
+    // trying again with an an overflow vector. Just fail.
+    //
+    // Note that this is a judgment call. It is possible to allow the
+    // vector to double beyond the limit, but that will require a bit
+    // of thought to get right -- and, of course, completely defeats
+    // the purpose of limiting vector size to avoid memory fragmentation...
+
+    if (resultSetLoader.writerIndex().vectorIndex() == 0) {
+      throw UserException
+        .memoryError("A single column value is larger than the maximum allowed size of 16 MB")
+        .build(logger);
+    }
+
+    // Otherwise, do the roll-over to a look-ahead vector.
+
+    vectorState.rollover(outerCardinality);
+
+    // Remember that we did this overflow processing.
+
+    state = State.OVERFLOW;
+  }
+
+  /**
+   * Writing of a row batch is complete. Prepare the vector for harvesting
+   * to send downstream. If this batch encountered overflow, set aside the
+   * look-ahead vector and put the full vector buffer back into the active
+   * vector.
+   */
+
+  public void harvestWithLookAhead() {
+    switch (state) {
+    case NEW_LOOK_AHEAD:
+
+      // If added after overflow, no data to save from the complete
+      // batch: the vector does not appear in the completed batch.
+
+      break;
+
+    case OVERFLOW:
+
+      // Otherwise, restore the original, full buffer and
+      // last write position.
+
+      vectorState.harvestWithLookAhead();
+
+      // Remember that we have look-ahead values stashed away in the
+      // backup vector.
+
+      state = State.LOOK_AHEAD;
+      break;
+
+    default:
+      throw new IllegalStateException("Unexpected state: " + state);
+    }
+  }
+
+  public void close() {
+    vectorState.reset();
+  }
+
+  public void updateCardinality(int cardinality) {
+    outerCardinality = cardinality;
+  }
+
+  public void dump(HierarchicalFormatter format) {
+    format
+      .startObject(this)
+      .attribute("addVersion", addVersion)
+      .attribute("state", state)
+      .attributeIdentity("writer", writer)
+      .attribute("vectorState")
+      ;
+    vectorState.dump(format);
+    format.endObject();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java
new file mode 100644
index 0000000..2fcc813
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+/**
+ * Represents a wildcard: SELECT * when used at the root tuple.
+ * When used with maps, means selection of all map columns, either
+ * implicitly, or because the map itself is selected.
+ */
+
+public class NullProjectionSet implements ProjectionSet {
+
+  private boolean allProjected;
+
+  public NullProjectionSet(boolean allProjected) {
+    this.allProjected = allProjected;
+  }
+
+  @Override
+  public boolean isProjected(String colName) { return allProjected; }
+
+  @Override
+  public ProjectionSet mapProjection(String colName) {
+    return new NullProjectionSet(allProjected);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java
new file mode 100644
index 0000000..930dc30
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class NullResultVectorCacheImpl implements ResultVectorCache {
+
+  private final BufferAllocator allocator;
+
+  public NullResultVectorCacheImpl(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  @Override
+  public BufferAllocator allocator() { return allocator; }
+
+  @Override
+  public ValueVector addOrGet(MaterializedField colSchema) {
+    return TypeHelper.getNewVector(colSchema, allocator, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java
new file mode 100644
index 0000000..8372758
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+
+/**
+ * Do-nothing vector state for a map column which has no actual vector
+ * associated with it.
+ */
+
+public class NullVectorState implements VectorState {
+
+  @Override public int allocate(int cardinality) { return 0; }
+  @Override public void rollover(int cardinality) { }
+  @Override public void harvestWithLookAhead() { }
+  @Override public void startBatchWithLookAhead() { }
+  @Override public void reset() { }
+  @Override public ValueVector vector() { return null; }
+
+  public static class UnmanagedVectorState extends NullVectorState {
+    ValueVector vector;
+
+    public UnmanagedVectorState(ValueVector vector) {
+      this.vector = vector;
+    }
+
+    @Override
+    public ValueVector vector() { return vector; }
+  }
+
+  @Override
+  public void dump(HierarchicalFormatter format) {
+    format.startObject(this).endObject();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
new file mode 100644
index 0000000..bf91032
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.accessor.writer.NullableScalarWriter;
+
+public class NullableVectorState implements VectorState {
+
+  public static class BitsVectorState extends ValuesVectorState {
+
+    public BitsVectorState(ColumnMetadata schema, AbstractScalarWriter writer, ValueVector mainVector) {
+      super(schema, writer, mainVector);
+    }
+
+    @Override
+    public int allocateVector(ValueVector vector, int cardinality) {
+      ((FixedWidthVector) vector).allocateNew(cardinality);
+      return vector.getBufferSize();
+    }
+  }
+
+  private final ColumnMetadata schema;
+  private final NullableScalarWriter writer;
+  private final NullableVector vector;
+  private final ValuesVectorState bitsState;
+  private final ValuesVectorState valuesState;
+
+  public NullableVectorState(AbstractObjectWriter writer, NullableVector vector) {
+    this.schema = writer.schema();
+    this.vector = vector;
+
+    this.writer = (NullableScalarWriter) writer.scalar();
+    bitsState = new BitsVectorState(schema, this.writer.bitsWriter(), vector.getBitsVector());
+    valuesState = new ValuesVectorState(schema, this.writer.baseWriter(), vector.getValuesVector());
+  }
+
+  @Override
+  public int allocate(int cardinality) {
+    return bitsState.allocate(cardinality) +
+           valuesState.allocate(cardinality);
+  }
+
+  @Override
+  public void rollover(int cardinality) {
+    bitsState.rollover(cardinality);
+    valuesState.rollover(cardinality);
+  }
+
+  @Override
+  public void harvestWithLookAhead() {
+    bitsState.harvestWithLookAhead();
+    valuesState.harvestWithLookAhead();
+  }
+
+  @Override
+  public void startBatchWithLookAhead() {
+    bitsState.startBatchWithLookAhead();
+    valuesState.startBatchWithLookAhead();
+  }
+
+  @Override
+  public void reset() {
+    bitsState.reset();
+    valuesState.reset();
+  }
+
+  @Override
+  public ValueVector vector() { return vector; }
+
+  @Override
+  public void dump(HierarchicalFormatter format) {
+    format
+      .startObject(this)
+      .attribute("schema", schema)
+      .attributeIdentity("writer", writer)
+      .attributeIdentity("vector", vector)
+      .attribute("bitsState");
+    bitsState.dump(format);
+    format
+      .attribute("valuesState");
+    valuesState.dump(format);
+    format
+      .endObject();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
new file mode 100644
index 0000000..a743052
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.Collection;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Builder for the options for the row set loader. Reasonable defaults
+ * are provided for all options; use these options for test code or
+ * for clients that don't need special settings.
+ */
+
+public class OptionBuilder {
+  protected int vectorSizeLimit;
+  protected int rowCountLimit;
+  protected Collection<SchemaPath> projection;
+  protected ResultVectorCache vectorCache;
+  protected TupleMetadata schema;
+  protected long maxBatchSize;
+
+  public OptionBuilder() {
+    ResultSetOptions options = new ResultSetOptions();
+    vectorSizeLimit = options.vectorSizeLimit;
+    rowCountLimit = options.rowCountLimit;
+    maxBatchSize = options.maxBatchSize;
+  }
+
+  /**
+   * Specify the maximum number of rows per batch. Defaults to
+   * {@link BaseValueVector#INITIAL_VALUE_ALLOCATION}. Batches end either
+   * when this limit is reached, or when a vector overflows, whichever
+   * occurs first. The limit is capped at
+   * {@link ValueVector#MAX_ROW_COUNT}.
+   *
+   * @param limit the row count limit
+   * @return this builder
+   */
+
+  public OptionBuilder setRowCountLimit(int limit) {
+    rowCountLimit = Math.max(1,
+        Math.min(limit, ValueVector.MAX_ROW_COUNT));
+    return this;
+  }
+
+  public OptionBuilder setBatchSizeLimit(int bytes) {
+    maxBatchSize = bytes;
+    return this;
+  }
+
+  /**
+   * Record (batch) readers often read a subset of available table columns,
+   * but want to use a writer schema that includes all columns for ease of
+   * writing. (For example, a CSV reader must read all columns, even if the user
+   * wants a subset. The unwanted columns are simply discarded.)
+   * <p>
+   * This option provides a projection list, in the form of column names, for
+   * those columns which are to be projected. Only those columns will be
+   * backed by value vectors; non-projected columns will be backed by "null"
+   * writers that discard all values.
+   *
+   * @param projection the list of projected columns
+   * @return this builder
+   */
+
+  // TODO: Use SchemaPath in place of strings.
+
+  public OptionBuilder setProjection(Collection<SchemaPath> projection) {
+    this.projection = projection;
+    return this;
+  }
+
+  /**
+   * Downstream operators require "vector persistence": the same vector
+   * must represent the same column in every batch. For the scan operator,
+   * which creates multiple readers, this can be a challenge. The vector
+   * cache provides a transparent mechanism to enable vector persistence
+   * by returning the same vector for a set of independent readers. By
+   * default, the code uses a "null" cache which creates a new vector on
+   * each request. If a true cache is needed, the caller must provide one
+   * here.
+   */
+
+  public OptionBuilder setVectorCache(ResultVectorCache vectorCache) {
+    this.vectorCache = vectorCache;
+    return this;
+  }
+
+  /**
+   * Clients can use the row set builder in several ways:
+   * <ul>
+   * <li>Provide the schema up front, when known, by using this method to
+   * provide the schema.</li>
+   * <li>Discover the schema on the fly, adding columns during the write
+   * operation. Leave this method unset to start with an empty schema.</li>
+   * <li>A combination of the above.</li>
+   * </ul>
+   * @param schema the initial schema for the loader
+   * @return this builder
+   */
+
+  public OptionBuilder setSchema(TupleMetadata schema) {
+    this.schema = schema;
+    return this;
+  }
+
+  // TODO: No setter for vector length yet: is hard-coded
+  // at present in the value vector.
+
+  public ResultSetOptions build() {
+    return new ResultSetOptions(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java
new file mode 100644
index 0000000..c97ec18
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+/**
+ * Primitive (non-map) column state. Handles all three cardinalities.
+ * Column metadata is hosted on the writer.
+ */
+
+public class PrimitiveColumnState extends ColumnState implements ColumnWriterListener {
+
+  public PrimitiveColumnState(ResultSetLoaderImpl resultSetLoader,
+      AbstractObjectWriter colWriter,
+      VectorState vectorState) {
+    super(resultSetLoader, colWriter, vectorState);
+    writer.bindListener(this);
+  }
+
+  public static PrimitiveColumnState newPrimitive(
+      ResultSetLoaderImpl resultSetLoader,
+      ValueVector vector,
+      AbstractObjectWriter writer) {
+    VectorState vectorState;
+    if (vector == null) {
+      vectorState = new NullVectorState();
+    } else {
+      vectorState = new ValuesVectorState(
+          writer.schema(),
+          (AbstractScalarWriter) writer.scalar(),
+          vector);
+    }
+    return new PrimitiveColumnState(resultSetLoader, writer,
+        vectorState);
+  }
+
+  public static PrimitiveColumnState newNullablePrimitive(
+      ResultSetLoaderImpl resultSetLoader,
+      ValueVector vector,
+      AbstractObjectWriter writer) {
+    VectorState vectorState;
+    if (vector == null) {
+      vectorState = new NullVectorState();
+    } else {
+      vectorState = new NullableVectorState(
+          writer,
+          (NullableVector) vector);
+    }
+    return new PrimitiveColumnState(resultSetLoader, writer,
+        vectorState);
+  }
+
+  public static PrimitiveColumnState newPrimitiveArray(
+      ResultSetLoaderImpl resultSetLoader,
+      ValueVector vector,
+      AbstractObjectWriter writer) {
+    VectorState vectorState;
+    if (vector == null) {
+      vectorState = new NullVectorState();
+    } else {
+      vectorState = new RepeatedVectorState(writer, (RepeatedValueVector) vector);
+    }
+    return new PrimitiveColumnState(resultSetLoader, writer,
+        vectorState);
+  }
+
+  @Override
+  public void overflowed(ScalarWriter writer) {
+    resultSetLoader.overflowed();
+  }
+
+  @Override
+  public void dump(HierarchicalFormatter format) {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public boolean canExpand(ScalarWriter writer, int delta) {
+    return resultSetLoader.canExpand(delta);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java
new file mode 100644
index 0000000..9ea118f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+/**
+ * Represents the set of columns projected for a tuple (row or map.)
+ * The projected columns might themselves be columns, so returns a
+ * projection set for such columns.
+ * <p>
+ * Three implementations exist:
+ * <ul>
+ * <li>Project all ({@link NullProjectionSet): used for a tuple when
+ * all columns are projected. Example: the root tuple (the row) in
+ * a <tt>SELECT *</tt> query.</li>
+ * <li>Project none  (also {@link NullProjectionSet): used when no
+ * columns are projected from a tuple, such as when a map itself is
+ * not projected, so none of its member columns are projected.</li>
+ * <li>Project some ({@link ProjectionSetImpl}: used in the
+ * <tt>SELECT a, c, e</tt> case in which the query identifies which
+ * columns to project (implicitly leaving out others, such as b and
+ * d in our example.)</li>
+ * </ul>
+ * <p>
+ * The result is that each tuple (row and map) has an associated
+ * projection set which the code can query to determine if a newly
+ * added column is wanted (and so should have a backing vector) or
+ * is unwanted (and can just receive a dummy writer.)
+ */
+
+interface ProjectionSet {
+  boolean isProjected(String colName);
+  ProjectionSet mapProjection(String colName);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java
new file mode 100644
index 0000000..e17f486
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+
+/**
+ * Represents an explicit projection at some tuple level.
+ * <p>
+ * A column is projected if it is explicitly listed in the selection list.
+ * <p>
+ * If a column is a map, then the projection for the map's columns is based on
+ * two rules:
+ * <ol>
+ * <li>If the projection list includes at least one explicit mention of a map
+ * member, then include only those columns explicitly listed.</li>
+ * <li>If the projection at the parent level lists only the map column itself
+ * (which the projection can't know is a map), then assume this implies all
+ * columns, as if the entry where "map.*".</li>
+ * </ol>
+ * <p>
+ * Examples:<br>
+ * <code>m</code><br>
+ * If m turns out to be a map, project all members of m.<br>
+ * <code>m.a</code><br>
+ * Column m must be a map. Project only column a.<br>
+ * <code>m, m.a</code><br>
+ * Tricky case. We interpret this as projecting only the "a" element of map m.
+ * <p>
+ * The projection set is build from a list of columns, represented as
+ * {@link SchemaPath} objects, provided by the physical plan. The structure of
+ * <tt>SchemaPath</tt> is a bit awkward:
+ * <p>
+ * <ul>
+ * <li><tt>SchemaPath> is a wrapper for a column which directly holds the
+ * <tt>NameSegment</tt> for the top-level column.</li>
+ * <li><tt>NameSegment</tt> holds a name. This can be a top name such as
+ * `a`, or parts of a compound name such as `a`.`b`. Each <tt>NameSegment</tt>
+ * has a "child" that points to the option following parts of the name.</li>
+ * <li><PathSegment</tt> is the base class for the parts of a name.</tt>
+ * <li><tt>ArraySegment</tt> is the other kind of name part and represents
+ * an array index such as the "[1]" in `columns`[1].</li>
+ * <ul>
+ * The parser here consumes only names, this mechanism does not consider
+ * array indexes. As a result, there may be multiple projected columns that
+ * map to the same projection here: `columns`[1] and `columns`[2] both map to
+ * the name `columns`, for example.
+ */
+
+public class ProjectionSetImpl implements ProjectionSet {
+
+  Set<String> projection = new HashSet<>();
+  Map<String, ProjectionSetImpl> mapProjections = CaseInsensitiveMap
+      .newHashMap();
+
+  @Override
+  public boolean isProjected(String colName) {
+    return projection.contains(colName.toLowerCase());
+  }
+
+  @Override
+  public ProjectionSet mapProjection(String colName) {
+    ProjectionSet mapProj = mapProjections.get(colName.toLowerCase());
+    if (mapProj != null) {
+      return mapProj;
+    }
+
+    // No explicit information for the map. Members inherit the
+    // same projection as the map itself.
+
+    return new NullProjectionSet(isProjected(colName));
+  }
+
+  /**
+   * Parse a projection list. The list should consist of a list of column
+   * names; any wildcards should have been processed by the caller. An
+   * empty or null list means everything is projected (that is, an
+   * empty list here is equivalent to a wildcard in the SELECT
+   * statement.)
+   *
+   * @param projList
+   * @return
+   */
+  public static ProjectionSet parse(Collection<SchemaPath> projList) {
+    if (projList == null || projList.isEmpty()) {
+      return new NullProjectionSet(true);
+    }
+    ProjectionSetImpl projSet = new ProjectionSetImpl();
+    for (SchemaPath col : projList) {
+      projSet.addSegment(col.getRootSegment());
+    }
+    return projSet;
+  }
+
+  private void addSegment(NameSegment rootSegment) {
+    String rootKey = rootSegment.getPath().toLowerCase();
+    projection.add(rootKey);
+    PathSegment child = rootSegment.getChild();
+    if (child == null) {
+      return;
+    }
+    if (child.isArray()) {
+      // Ignore the [x] array suffix.
+      return;
+    }
+    ProjectionSetImpl map = mapProjections.get(rootKey);
+    if (map == null) {
+      map = new ProjectionSetImpl();
+      mapProjections.put(rootKey, map);
+    }
+    map.addSegment((NameSegment) child);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
new file mode 100644
index 0000000..98b6beb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+/**
+ * Vector state for a scalar array (repeated scalar) vector. Manages both the
+ * offsets vector and data vector during overflow and other operations.
+ */
+
+public class RepeatedVectorState implements VectorState {
+  private final ColumnMetadata schema;
+  private final AbstractArrayWriter arrayWriter;
+  private final RepeatedValueVector vector;
+  private final OffsetVectorState offsetsState;
+  private final ValuesVectorState valuesState;
+
+  public RepeatedVectorState(AbstractObjectWriter writer, RepeatedValueVector vector) {
+    this.schema = writer.schema();
+
+    // Get the repeated vector
+
+    this.vector = vector;
+
+    // Create the values state using the value (data) portion of the repeated
+    // vector, and the scalar (value) portion of the array writer.
+
+    arrayWriter = (AbstractArrayWriter) writer.array();
+    AbstractScalarWriter colWriter = (AbstractScalarWriter) arrayWriter.scalar();
+    valuesState = new ValuesVectorState(schema, colWriter, vector.getDataVector());
+
+    // Create the offsets state with the offset vector portion of the repeated
+    // vector, and the offset writer portion of the array writer.
+
+    offsetsState = new OffsetVectorState(arrayWriter.offsetWriter(),
+        vector.getOffsetVector(),
+        (AbstractObjectWriter) arrayWriter.entry());
+  }
+
+  @Override
+  public ValueVector vector() { return vector; }
+
+  @Override
+  public int allocate(int cardinality) {
+    return offsetsState.allocate(cardinality) +
+           valuesState.allocate(childCardinality(cardinality));
+  }
+
+  private int childCardinality(int cardinality) {
+    return cardinality * schema.expectedElementCount();
+  }
+
+  /**
+   * The column is a scalar or an array of scalars. We need to roll over both the column
+   * values and the offsets that point to those values. The index provided is
+   * the index into the offset vector. We use that to obtain the index of the
+   * values to roll-over.
+   * <p>
+   * Data structure:
+   * <p><pre></code>
+   * RepeatedVectorState (this class)
+   * +- OffsetVectorState
+   * .  +- OffsetVectorWriter (A)
+   * .  +- Offset vector (B)
+   * .  +- Backup (e.g. look-ahead) offset vector
+   * +- ValuesVectorState
+   * .  +- Scalar (element) writer (C)
+   * .  +- Data (elements) vector (D)
+   * .  +- Backup elements vector
+   * +- Array Writer
+   * .  +- ColumnWriterIndex (for array as a whole)
+   * .  +- OffsetVectorWriter (A)
+   * .  .  +- Offset vector (B)
+   * .  +- ArrayElementWriterIndex
+   * .  +- ScalarWriter (D)
+   * .  .  +- ArrayElementWriterIndex
+   * .  .  +- Scalar vector (D)
+   * </code></pre>
+   * <p>
+   * The top group of objects point into the writer objects in the second
+   * group. Letters in parens show the connections.
+   * <p>
+   * To perform the roll-over, we must:
+   * <ul>
+   * <li>Copy values from the current vectors to a set of new, look-ahead
+   * vectors.</li>
+   * <li>Swap buffers between the main and "backup" vectors, effectively
+   * moving the "full" batch to the sidelines, putting the look-ahead vectors
+   * into play in order to finish writing the current row.</li>
+   * <li>Update the writers to point to the look-ahead buffers, including
+   * the initial set of data copied into those vectors.</li>
+   * <li>Update the vector indexes to point to the next write positions
+   * after the values copied during roll-over.</li>
+   * </ul>
+   *
+   * @param cardinality the number of outer elements to create in the look-ahead
+   * vector
+   */
+
+  @Override
+  public void rollover(int cardinality) {
+
+    // Swap out the two vectors. The index presented to the caller
+    // is that of the data vector: the next position in the data
+    // vector to be set into the data vector writer index.
+
+    valuesState.rollover(childCardinality(cardinality));
+    offsetsState.rollover(cardinality);
+  }
+
+  @Override
+  public void harvestWithLookAhead() {
+    offsetsState.harvestWithLookAhead();
+    valuesState.harvestWithLookAhead();
+  }
+
+  @Override
+  public void startBatchWithLookAhead() {
+    offsetsState.startBatchWithLookAhead();
+    valuesState.startBatchWithLookAhead();
+  }
+
+  @Override
+  public void reset() {
+    offsetsState.reset();
+    valuesState.reset();
+  }
+
+  @Override
+  public void dump(HierarchicalFormatter format) {
+    format
+      .startObject(this)
+      .attribute("schema", schema)
+      .attributeIdentity("writer", arrayWriter)
+      .attributeIdentity("vector", vector)
+      .attribute("offsetsState");
+    offsetsState.dump(format);
+    format
+      .attribute("valuesState");
+    valuesState.dump(format);
+    format
+      .endObject();
+  }
+}