You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:42 UTC
[15/15] drill git commit: DRILL-5657: Size-aware vector writer
structure
DRILL-5657: Size-aware vector writer structure
- Vector and accessor layer
- Row Set layer
- Tuple and column models
- Revised write-time metadata
- "Result set loader" layer
this closes #914
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/40de8ca4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/40de8ca4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/40de8ca4
Branch: refs/heads/master
Commit: 40de8ca4f47533fa6593d1266403868ae1a2119f
Parents: eb0c403
Author: Paul Rogers <pr...@maprtech.com>
Authored: Thu Aug 17 22:41:30 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Wed Dec 20 21:17:48 2017 -0800
----------------------------------------------------------------------
.../exec/physical/rowSet/ResultSetLoader.java | 204 +++
.../exec/physical/rowSet/ResultVectorCache.java | 33 +
.../exec/physical/rowSet/RowSetLoader.java | 153 +++
.../exec/physical/rowSet/impl/ColumnState.java | 358 +++++
.../physical/rowSet/impl/NullProjectionSet.java | 41 +
.../rowSet/impl/NullResultVectorCacheImpl.java | 41 +
.../physical/rowSet/impl/NullVectorState.java | 52 +
.../rowSet/impl/NullableVectorState.java | 108 ++
.../physical/rowSet/impl/OptionBuilder.java | 134 ++
.../rowSet/impl/PrimitiveColumnState.java | 105 ++
.../physical/rowSet/impl/ProjectionSet.java | 48 +
.../physical/rowSet/impl/ProjectionSetImpl.java | 136 ++
.../rowSet/impl/RepeatedVectorState.java | 168 +++
.../rowSet/impl/ResultSetLoaderImpl.java | 775 +++++++++++
.../rowSet/impl/ResultVectorCacheImpl.java | 186 +++
.../physical/rowSet/impl/RowSetLoaderImpl.java | 98 ++
.../physical/rowSet/impl/SingleVectorState.java | 274 ++++
.../exec/physical/rowSet/impl/TupleState.java | 388 ++++++
.../rowSet/impl/VectorContainerBuilder.java | 257 ++++
.../exec/physical/rowSet/impl/VectorState.java | 102 ++
.../physical/rowSet/impl/WriterIndexImpl.java | 100 ++
.../exec/physical/rowSet/impl/package-info.java | 304 +++++
.../physical/rowSet/model/BaseTupleModel.java | 117 ++
.../physical/rowSet/model/ContainerVisitor.java | 115 ++
.../physical/rowSet/model/MetadataProvider.java | 93 ++
.../exec/physical/rowSet/model/ReaderIndex.java | 53 +
.../physical/rowSet/model/SchemaInference.java | 61 +
.../exec/physical/rowSet/model/TupleModel.java | 117 ++
.../rowSet/model/hyper/BaseReaderBuilder.java | 149 +++
.../rowSet/model/hyper/package-info.java | 30 +
.../physical/rowSet/model/package-info.java | 68 +
.../rowSet/model/single/BaseReaderBuilder.java | 89 ++
.../rowSet/model/single/BaseWriterBuilder.java | 72 +
.../model/single/BuildVectorsFromMetadata.java | 97 ++
.../rowSet/model/single/VectorAllocator.java | 112 ++
.../rowSet/model/single/package-info.java | 28 +
.../exec/physical/rowSet/package-info.java | 193 +++
.../apache/drill/exec/record/BatchSchema.java | 42 +-
.../apache/drill/exec/record/RecordBatch.java | 3 +-
.../apache/drill/exec/record/TupleSchema.java | 534 ++++++++
.../exec/record/selection/SelectionVector2.java | 20 +-
.../exec/cache/TestBatchSerialization.java | 22 +-
.../exec/physical/impl/TopN/TopNBatchTest.java | 26 +-
.../impl/validate/TestBatchValidator.java | 64 +-
.../physical/impl/xsort/TestExternalSort.java | 12 +-
.../impl/xsort/managed/SortTestUtilities.java | 8 +-
.../physical/impl/xsort/managed/TestCopier.java | 146 +-
.../impl/xsort/managed/TestShortArrays.java | 8 +-
.../impl/xsort/managed/TestSortImpl.java | 46 +-
.../physical/impl/xsort/managed/TestSorter.java | 38 +-
.../rowSet/impl/TestResultSetLoaderLimits.java | 224 ++++
.../impl/TestResultSetLoaderMapArray.java | 481 +++++++
.../rowSet/impl/TestResultSetLoaderMaps.java | 810 +++++++++++
.../impl/TestResultSetLoaderOmittedValues.java | 379 ++++++
.../impl/TestResultSetLoaderOverflow.java | 680 ++++++++++
.../impl/TestResultSetLoaderProjection.java | 470 +++++++
.../impl/TestResultSetLoaderProtocol.java | 586 ++++++++
.../rowSet/impl/TestResultSetLoaderTorture.java | 453 +++++++
.../rowSet/impl/TestResultSetSchemaChange.java | 245 ++++
.../drill/exec/record/TestTupleSchema.java | 509 +++++++
.../drill/exec/record/TestVectorContainer.java | 127 --
.../exec/record/vector/TestValueVector.java | 12 +
.../apache/drill/exec/sql/TestInfoSchema.java | 2 +-
.../exec/store/easy/text/compliant/TestCsv.java | 6 +-
.../java/org/apache/drill/test/ExampleTest.java | 4 +-
.../org/apache/drill/test/OperatorFixture.java | 30 +-
.../org/apache/drill/test/QueryBuilder.java | 12 +-
.../apache/drill/test/QueryRowSetIterator.java | 2 +-
.../drill/test/rowSet/AbstractRowSet.java | 109 +-
.../drill/test/rowSet/AbstractSingleRowSet.java | 182 +--
.../apache/drill/test/rowSet/DirectRowSet.java | 171 +--
.../drill/test/rowSet/HyperRowSetImpl.java | 245 +---
.../drill/test/rowSet/IndirectRowSet.java | 38 +-
.../org/apache/drill/test/rowSet/RowSet.java | 81 +-
.../apache/drill/test/rowSet/RowSetBuilder.java | 32 +-
.../drill/test/rowSet/RowSetComparison.java | 124 +-
.../apache/drill/test/rowSet/RowSetPrinter.java | 30 +-
.../apache/drill/test/rowSet/RowSetReader.java | 54 +
.../drill/test/rowSet/RowSetReaderImpl.java | 76 ++
.../apache/drill/test/rowSet/RowSetSchema.java | 304 -----
.../drill/test/rowSet/RowSetUtilities.java | 101 +-
.../apache/drill/test/rowSet/RowSetWriter.java | 119 ++
.../drill/test/rowSet/RowSetWriterImpl.java | 155 +++
.../apache/drill/test/rowSet/SchemaBuilder.java | 87 +-
.../drill/test/rowSet/file/JsonFileBuilder.java | 35 +-
.../drill/test/rowSet/test/DummyWriterTest.java | 169 +++
.../drill/test/rowSet/test/PerformanceTool.java | 296 ++++
.../drill/test/rowSet/test/RowSetTest.java | 858 +++++++-----
.../drill/test/rowSet/test/TestFillEmpties.java | 241 ++++
.../test/rowSet/test/TestFixedWidthWriter.java | 444 ++++++
.../rowSet/test/TestOffsetVectorWriter.java | 425 ++++++
.../test/rowSet/test/TestScalarAccessors.java | 1266 ++++++++++++++++++
.../rowSet/test/TestVariableWidthWriter.java | 418 ++++++
.../drill/test/rowSet/test/VectorPrinter.java | 72 +
.../apache/drill/vector/TestFillEmpties.java | 55 +-
.../apache/drill/vector/TestVectorLimits.java | 487 -------
exec/jdbc-all/pom.xml | 2 +-
.../src/main/java/io/netty/buffer/DrillBuf.java | 70 +-
.../netty/buffer/PooledByteBufAllocatorL.java | 62 +-
.../drill/exec/memory/AllocationManager.java | 89 +-
.../main/codegen/templates/ColumnAccessors.java | 383 +++---
.../codegen/templates/FixedValueVectors.java | 293 +---
.../codegen/templates/NullableValueVectors.java | 91 +-
.../codegen/templates/RepeatedValueVectors.java | 71 +-
.../src/main/codegen/templates/UnionVector.java | 44 +-
.../templates/VariableLengthVectors.java | 216 +--
.../drill/exec/record/ColumnMetadata.java | 114 ++
.../drill/exec/record/MaterializedField.java | 41 +-
.../apache/drill/exec/record/TupleMetadata.java | 88 ++
.../drill/exec/record/TupleNameSpace.java | 89 ++
.../drill/exec/vector/AllocationHelper.java | 2 +-
.../drill/exec/vector/BaseDataValueVector.java | 16 +
.../org/apache/drill/exec/vector/BitVector.java | 52 +-
.../drill/exec/vector/FixedWidthVector.java | 7 +-
.../apache/drill/exec/vector/ObjectVector.java | 26 +-
.../drill/exec/vector/UntypedNullVector.java | 59 +-
.../apache/drill/exec/vector/ValueVector.java | 53 +-
.../drill/exec/vector/VariableWidthVector.java | 4 +-
.../apache/drill/exec/vector/VectorUtils.java | 63 -
.../apache/drill/exec/vector/ZeroVector.java | 6 +-
.../exec/vector/accessor/AccessorUtilities.java | 125 --
.../drill/exec/vector/accessor/ArrayReader.java | 108 +-
.../drill/exec/vector/accessor/ArrayWriter.java | 60 +-
.../exec/vector/accessor/ColumnAccessor.java | 40 -
.../exec/vector/accessor/ColumnReader.java | 64 -
.../exec/vector/accessor/ColumnReaderIndex.java | 28 +
.../exec/vector/accessor/ColumnWriter.java | 45 -
.../exec/vector/accessor/ColumnWriterIndex.java | 76 ++
.../exec/vector/accessor/ObjectReader.java | 60 +
.../drill/exec/vector/accessor/ObjectType.java | 28 +
.../exec/vector/accessor/ObjectWriter.java | 101 ++
.../vector/accessor/ScalarElementReader.java | 65 +
.../exec/vector/accessor/ScalarReader.java | 75 ++
.../exec/vector/accessor/ScalarWriter.java | 71 +-
.../exec/vector/accessor/TupleAccessor.java | 71 -
.../drill/exec/vector/accessor/TupleReader.java | 36 +-
.../drill/exec/vector/accessor/TupleWriter.java | 154 ++-
.../drill/exec/vector/accessor/ValueType.java | 31 +
.../accessor/impl/AbstractArrayReader.java | 128 --
.../accessor/impl/AbstractArrayWriter.java | 127 --
.../accessor/impl/AbstractColumnAccessor.java | 43 -
.../accessor/impl/AbstractColumnReader.java | 126 --
.../accessor/impl/AbstractColumnWriter.java | 87 --
.../accessor/impl/AbstractTupleAccessor.java | 38 -
.../vector/accessor/impl/AccessorUtilities.java | 53 +
.../accessor/impl/ColumnAccessorFactory.java | 122 --
.../accessor/impl/HierarchicalFormatter.java | 38 +
.../accessor/impl/HierarchicalPrinter.java | 238 ++++
.../vector/accessor/impl/TupleReaderImpl.java | 151 ---
.../vector/accessor/impl/TupleWriterImpl.java | 162 ---
.../exec/vector/accessor/package-info.java | 79 +-
.../accessor/reader/AbstractArrayReader.java | 188 +++
.../accessor/reader/AbstractObjectReader.java | 52 +
.../accessor/reader/AbstractTupleReader.java | 189 +++
.../accessor/reader/BaseElementReader.java | 187 +++
.../accessor/reader/BaseScalarReader.java | 189 +++
.../accessor/reader/ColumnReaderFactory.java | 109 ++
.../accessor/reader/ElementReaderIndex.java | 24 +
.../reader/FixedWidthElementReaderIndex.java | 38 +
.../exec/vector/accessor/reader/MapReader.java | 43 +
.../accessor/reader/ObjectArrayReader.java | 159 +++
.../accessor/reader/ScalarArrayReader.java | 102 ++
.../vector/accessor/reader/VectorAccessor.java | 26 +
.../vector/accessor/reader/package-info.java | 26 +
.../accessor/writer/AbstractArrayWriter.java | 348 +++++
.../writer/AbstractFixedWidthWriter.java | 258 ++++
.../accessor/writer/AbstractObjectWriter.java | 72 +
.../accessor/writer/AbstractScalarWriter.java | 126 ++
.../accessor/writer/AbstractTupleWriter.java | 450 +++++++
.../accessor/writer/BaseScalarWriter.java | 272 ++++
.../accessor/writer/BaseVarWidthWriter.java | 157 +++
.../accessor/writer/ColumnWriterFactory.java | 196 +++
.../exec/vector/accessor/writer/MapWriter.java | 155 +++
.../accessor/writer/NullableScalarWriter.java | 190 +++
.../accessor/writer/ObjectArrayWriter.java | 143 ++
.../accessor/writer/OffsetVectorWriter.java | 283 ++++
.../accessor/writer/ScalarArrayWriter.java | 229 ++++
.../vector/accessor/writer/WriterEvents.java | 127 ++
.../accessor/writer/dummy/DummyArrayWriter.java | 96 ++
.../writer/dummy/DummyScalarWriter.java | 89 ++
.../accessor/writer/dummy/package-info.java | 54 +
.../vector/accessor/writer/package-info.java | 151 +++
.../exec/vector/complex/AbstractMapVector.java | 13 +-
.../vector/complex/BaseRepeatedValueVector.java | 13 +-
.../drill/exec/vector/complex/ListVector.java | 4 +-
.../drill/exec/vector/complex/MapVector.java | 24 +-
.../exec/vector/complex/RepeatedListVector.java | 20 +-
.../exec/vector/complex/RepeatedMapVector.java | 21 +-
188 files changed, 22717 insertions(+), 4811 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
new file mode 100644
index 0000000..a4b260b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet;
+
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+
+/**
+ * Builds a result set (series of zero or more row sets) based on a defined
+ * schema which may
+ * evolve (expand) over time. Automatically rolls "overflow" rows over
+ * when a batch fills.
+ * <p>
+ * Many of the methods in this interface verify that the loader is
+ * in the proper state. For example, an exception is thrown if the caller
+ * attempts to save a row before starting a batch. However, the per-column
+ * write methods are checked only through assertions that should enabled
+ * during testing, but will be disabled during production.
+ *
+ * @see {@link VectorContainerWriter}, the class which this class
+ * replaces
+ */
+
+public interface ResultSetLoader {
+
+ public static final int DEFAULT_ROW_COUNT = BaseValueVector.INITIAL_VALUE_ALLOCATION;
+
+ /**
+ * Current schema version. The version increments by one each time
+ * a column is added.
+ * @return the current schema version
+ */
+
+ int schemaVersion();
+
+ /**
+ * Adjust the number of rows to produce in the next batch. Takes
+ * affect after the next call to {@link #startBatch()}.
+ *
+ * @param count target batch row count
+ */
+
+ void setTargetRowCount(int count);
+
+ /**
+ * The number of rows produced by this loader (as configured in the loader
+ * options.)
+ *
+ * @return the target row count for batches that this loader produces
+ */
+
+ int targetRowCount();
+
+ /**
+ * The largest vector size produced by this loader (as specified by
+ * the value vector limit.)
+ *
+ * @return the largest vector size. Attempting to extend a vector beyond
+ * this limit causes automatic vector overflow and terminates the
+ * in-flight batch, even if the batch has not yet reached the target
+ * row count
+ */
+
+ int targetVectorSize();
+
+ /**
+ * Total number of batches created. Includes the current batch if
+ * the row count in this batch is non-zero.
+ * @return the number of batches produced including the current
+ * one
+ */
+
+ int batchCount();
+
+ /**
+ * Total number of rows loaded for all previous batches and the
+ * current batch.
+ * @return total row count
+ */
+
+ int totalRowCount();
+
+ /**
+ * Start a new row batch. Valid only when first started, or after the
+ * previous batch has been harvested.
+ */
+
+ void startBatch();
+
+ /**
+ * Writer for the top-level tuple (the entire row). Valid only when
+ * the mutator is actively writing a batch (after <tt>startBatch()</tt>
+ * but before </tt>harvest()</tt>.)
+ *
+ * @return writer for the top-level columns
+ */
+
+ RowSetLoader writer();
+ boolean writeable();
+
+ /**
+ * Load a row using column values passed as variable-length arguments. Expects
+ * map values to represented as an array.
+ * A schema of (a:int, b:map(c:varchar)) would be>
+ * set as <br><tt>loadRow(10, new Object[] {"foo"});</tt><br>
+ * Values of arrays can be expressed as a Java
+ * array. A schema of (a:int, b:int[]) can be set as<br>
+ * <tt>loadRow(10, new int[] {100, 200});</tt><br>.
+ * Primarily for testing, too slow for production code.
+ * <p>
+ * If the row consists of a single map or list, then the one value will be an
+ * <tt>Object</tt> array, creating an ambiguity. Use <tt>writer().set(0, value);</tt>
+ * in this case.
+ *
+ * @param values column values in column index order
+ * @return this loader
+ */
+
+ ResultSetLoader setRow(Object...values);
+
+ /**
+ * Return the output container, primarily to obtain the schema
+ * and set of vectors. Depending on when this is called, the
+ * data may or may not be populated: call
+ * {@link #harvest()} to obtain the container for a batch.
+ * <p>
+ * This method is useful when the schema is known and fixed.
+ * After declaring the schema, call this method to get the container
+ * that holds the vectors for use in planning projection, etc.
+ * <p>
+ * If the result set schema changes, then a call to this method will
+ * return the latest schema. But, if the schema changes during the
+ * overflow row, then this method will not see those changes until
+ * after harvesting the current batch. (This avoid the appearance
+ * of phantom columns in the output since the new column won't
+ * appear until the next batch.)
+ * <p>
+ * Never count on the data in the container; it may be empty, half
+ * written, or inconsistent. Always call
+ * {@link #harvest()} to obtain the container for a batch.
+ *
+ * @return the output container including schema and value
+ * vectors
+ */
+
+ VectorContainer outputContainer();
+
+ /**
+ * Harvest the current row batch, and reset the mutator
+ * to the start of the next row batch (which may already contain
+ * an overflow row.
+ * <p>
+ * The schema of the returned container is defined as:
+ * <ul>
+ * <li>The schema as passed in via the loader options, plus</li>
+ * <li>Columns added dynamically during write, minus</li>
+ * <li>Any columns not included in the project list, minus</li>
+ * <li>Any columns added in the overflow row.</li>
+ * </ul>
+ * That is, column order is as defined by the initial schema and column
+ * additions. In particular, the schema order is <b>not</b> defined by
+ * the projection list. (Another mechanism is required to reorder columns
+ * for the actual projection.)
+ *
+ * @return the row batch to send downstream
+ */
+
+ VectorContainer harvest();
+
+ /**
+ * The schema of the harvested batch. Valid until the start of the
+ * next batch.
+ *
+ * @return the extended schema of the harvested batch which includes
+ * any allocation hints used when creating the batch
+ */
+
+ TupleMetadata harvestSchema();
+
+ /**
+ * Called after all rows are returned, whether because no more data is
+ * available, or the caller wishes to cancel the current row batch
+ * and complete.
+ */
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java
new file mode 100644
index 0000000..6e32b5d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Interface for a cache that implements "vector persistence" across
+ * multiple result set loaders. Allows a single scan operator to offer
+ * the same set of vectors even when data is read by a set of readers.
+ */
+
+public interface ResultVectorCache {
+ BufferAllocator allocator();
+ ValueVector addOrGet(MaterializedField colSchema);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java
new file mode 100644
index 0000000..070e9a9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet;
+
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * Interface for writing values to a row set. Only available for newly-created
+ * single row sets.
+ * <p>
+ * Typical usage:
+ *
+ * <pre></code>
+ * void writeABatch() {
+ * RowSetLoader writer = ...
+ * while (! writer.isFull()) {
+ * writer.start();
+ * writer.scalar(0).setInt(10);
+ * writer.scalar(1).setString("foo");
+ * ...
+ * writer.save();
+ * }
+ * }</code></pre>
+ * Alternative usage:
+ *
+ * <pre></code>
+ * void writeABatch() {
+ * RowSetLoader writer = ...
+ * while (writer.start()) {
+ * writer.scalar(0).setInt(10);
+ * writer.scalar(1).setString("foo");
+ * ...
+ * writer.save();
+ * }
+ * }</code></pre>
+ *
+ * The above writes until the batch is full, based on size or vector overflow.
+ * That is, the details of vector overflow are hidden from the code that calls
+ * the writer.
+ */
+
+public interface RowSetLoader extends TupleWriter {
+
+ ResultSetLoader loader();
+
+ /**
+ * Write a row of values, given by Java objects. Object type must match
+ * expected column type. Stops writing, and returns false, if any value causes
+ * vector overflow. Value format:
+ * <ul>
+ * <li>For scalars, the value as a suitable Java type (int or Integer, say,
+ * for <tt>INTEGER</tt> values.)</li>
+ * <li>For scalar arrays, an array of a suitable Java primitive type for
+ * scalars. For example, <tt>int[]</tt> for an <tt>INTEGER</tt> column.</li>
+ * <li>For a Map, an <tt>Object<tt> array with values encoded as above.
+ * (In fact, the list here is the same as the map format.</li>
+ * <li>For a list (repeated map, list of list), an <tt>Object</tt> array with
+ * values encoded as above. (So, for a repeated map, an outer <tt>Object</tt>
+ * map encodes the array, an inner one encodes the map members.</li>
+ * </ul>
+ *
+ * @param values
+ * variable-length argument list of column values
+ */
+
+ RowSetLoader addRow(Object... values);
+
+ /**
+ * Indicates that no more rows fit into the current row batch and that the row
+ * batch should be harvested and sent downstream. Any overflow row is
+ * automatically saved for the next cycle. The value is undefined when a batch
+ * is not active.
+ * <p>
+ * Will be false on the first row, and all subsequent rows until either the
+ * maximum number of rows are written, or a vector overflows. After that, will
+ * return true. The method returns false as soon as any column writer
+ * overflows even in the middle of a row write. That is, this writer does not
+ * automatically handle overflow rows because that added complexity is seldom
+ * needed for tests.
+ *
+ * @return true if another row can be written, false if not
+ */
+
+ boolean isFull();
+
+ /**
+ * The number of rows in the current row set. Does not count any overflow row
+ * saved for the next batch.
+ *
+ * @return number of rows to be sent downstream
+ */
+
+ int rowCount();
+
+ /**
+ * The index of the current row. Same as the row count except in an overflow
+ * row in which case the row index will revert to zero as soon as any vector
+ * overflows. Note: this means that the index can change between columns in a
+ * single row. Applications usually don't use this index directly; rely on the
+ * writers to write to the proper location.
+ *
+ * @return the current write index
+ */
+
+ int rowIndex();
+
+ /**
+ * Prepare a new row for writing. Call this before each row.
+ * <p>
+ * Handles a very special case: that of discarding the last row written.
+ * A reader can read a row into vectors, then "sniff" the row to check,
+ * for example, against a filter. If the row is not wanted, simply omit
+ * the call to <tt>save()</tt> and the next all to <tt>start()</tt> will
+ * discard the unsaved row.
+ * <p>
+ * Note that the vectors still contain values in the
+ * discarded position; just the various pointers are unset. If
+ * the batch ends before the discarded values are overwritten, the
+ * discarded values just exist at the end of the vector. Since vectors
+ * start with garbage contents, the discarded values are simply a different
+ * kind of garbage. But, if the client writes a new row, then the new
+ * row overwrites the discarded row. This works because we only change
+ * the tail part of a vector; never the internals.
+ *
+ * @return true if another row can be added, false if the batch is full
+ */
+
+ boolean start();
+
+ /**
+ * Saves the current row and moves to the next row. Failing to call this
+ * method effectively abandons the in-flight row; something that may be useful
+ * to recover from partially-written rows that turn out to contain errors.
+ * Done automatically if using <tt>setRow()</tt>.
+ */
+
+ void save();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
new file mode 100644
index 0000000..f3626d9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.ArrayList;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapState;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
+import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
+
+/**
+ * Represents the write-time state for a column including the writer and the (optional)
+ * backing vector. Implements per-column operations such as vector overflow. If a column
+ * is a (possibly repeated) map, then the column state will hold a tuple state.
+ * <p>
+ * If a column is not projected, then the writer exists (to make life easier for the
+ * reader), but there will be no vector backing the writer.
+ * <p>
+ * Different columns need different kinds of vectors: a data vector, possibly an offset
+ * vector, or even a non-existent vector. The {@link VectorState} class abstracts out
+ * these diffrences.
+ */
+
+public abstract class ColumnState {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnState.class);
+
+ public static abstract class BaseMapColumnState extends ColumnState {
+ protected final MapState mapState;
+
+ public BaseMapColumnState(ResultSetLoaderImpl resultSetLoader,
+ AbstractObjectWriter writer, VectorState vectorState,
+ ProjectionSet projectionSet) {
+ super(resultSetLoader, writer, vectorState);
+ mapState = new MapState(resultSetLoader, this, projectionSet);
+ }
+
+ @Override
+ public void rollover() {
+ super.rollover();
+ mapState.rollover();
+ }
+
+ @Override
+ public void startBatch() {
+ super.startBatch();
+ mapState.startBatch();
+ }
+
+ @Override
+ public void harvestWithLookAhead() {
+ super.harvestWithLookAhead();
+ mapState.harvestWithLookAhead();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ mapState.close();
+ }
+
+ public MapState mapState() { return mapState; }
+ }
+
+ public static class MapColumnState extends BaseMapColumnState {
+
+ public MapColumnState(ResultSetLoaderImpl resultSetLoader,
+ ColumnMetadata columnSchema,
+ ProjectionSet projectionSet) {
+ super(resultSetLoader,
+ ColumnWriterFactory.buildMap(columnSchema, null,
+ new ArrayList<AbstractObjectWriter>()),
+ new NullVectorState(),
+ projectionSet);
+ }
+
+ @Override
+ public void updateCardinality(int cardinality) {
+ super.updateCardinality(cardinality);
+ mapState.updateCardinality(cardinality);
+ }
+ }
+
+ public static class MapArrayColumnState extends BaseMapColumnState {
+
+ public MapArrayColumnState(ResultSetLoaderImpl resultSetLoader,
+ AbstractObjectWriter writer,
+ VectorState vectorState,
+ ProjectionSet projectionSet) {
+ super(resultSetLoader, writer,
+ vectorState,
+ projectionSet);
+ }
+
+ @SuppressWarnings("resource")
+ public static MapArrayColumnState build(ResultSetLoaderImpl resultSetLoader,
+ ColumnMetadata columnSchema,
+ ProjectionSet projectionSet) {
+
+ // Create the map's offset vector.
+
+ UInt4Vector offsetVector = new UInt4Vector(
+ BaseRepeatedValueVector.OFFSETS_FIELD,
+ resultSetLoader.allocator());
+
+ // Create the writer using the offset vector
+
+ AbstractObjectWriter writer = ColumnWriterFactory.buildMapArray(
+ columnSchema, offsetVector,
+ new ArrayList<AbstractObjectWriter>());
+
+ // Wrap the offset vector in a vector state
+
+ VectorState vectorState = new OffsetVectorState(
+ ((AbstractArrayWriter) writer.array()).offsetWriter(),
+ offsetVector,
+ (AbstractObjectWriter) writer.array().entry());
+
+ // Assemble it all into the column state.
+
+ return new MapArrayColumnState(resultSetLoader,
+ writer, vectorState, projectionSet);
+ }
+
+ @Override
+ public void updateCardinality(int cardinality) {
+ super.updateCardinality(cardinality);
+ int childCardinality = cardinality * schema().expectedElementCount();
+ mapState.updateCardinality(childCardinality);
+ }
+ }
+
+ /**
+ * Columns move through various lifecycle states as identified by this
+ * enum. (Yes, sorry that the term "state" is used in two different ways
+ * here: the variables for a column and the point within the column
+ * lifecycle.
+ */
+
+ protected enum State {
+
+ /**
+ * Column is in the normal state of writing with no overflow
+ * in effect.
+ */
+
+ NORMAL,
+
+ /**
+ * Like NORMAL, but means that the data has overflowed and the
+ * column's data for the current row appears in the new,
+ * overflow batch. For a client that omits some columns, written
+ * columns will be in OVERFLOW state, unwritten columns in
+ * NORMAL state.
+ */
+
+ OVERFLOW,
+
+ /**
+ * Indicates that the column has data saved
+ * in the overflow batch.
+ */
+
+ LOOK_AHEAD,
+
+ /**
+ * Like LOOK_AHEAD, but indicates the special case that the column
+ * was added after overflow, so there is no vector for the column
+ * in the harvested batch.
+ */
+
+ NEW_LOOK_AHEAD
+ }
+
+ protected final ResultSetLoaderImpl resultSetLoader;
+ protected final int addVersion;
+ protected final VectorState vectorState;
+ protected State state;
+ protected AbstractObjectWriter writer;
+
+ /**
+ * Cardinality of the value itself. If this is an array,
+ * then this is the number of arrays. A separate number,
+ * the inner cardinality, is computed as the outer cardinality
+ * times the expected array count (from metadata.) The inner
+ * cardinality is the total number of array items in the
+ * vector.
+ */
+
+ protected int outerCardinality;
+
+ public ColumnState(ResultSetLoaderImpl resultSetLoader,
+ AbstractObjectWriter writer, VectorState vectorState) {
+ this.resultSetLoader = resultSetLoader;
+ this.vectorState = vectorState;
+ this.addVersion = resultSetLoader.bumpVersion();
+ state = resultSetLoader.hasOverflow() ?
+ State.NEW_LOOK_AHEAD : State.NORMAL;
+ this.writer = writer;
+ }
+
+ public AbstractObjectWriter writer() { return writer; }
+ public ColumnMetadata schema() { return writer.schema(); }
+
+ public ValueVector vector() { return vectorState.vector(); }
+
+ public void allocateVectors() {
+ assert outerCardinality != 0;
+ resultSetLoader.tallyAllocations(
+ vectorState.allocate(outerCardinality));
+ }
+
+ /**
+ * Prepare the column for a new row batch after overflow on the previous
+ * batch. Restore the look-ahead buffer to the
+ * active vector so we start writing where we left off.
+ */
+
+ public void startBatch() {
+ switch (state) {
+ case NORMAL:
+ resultSetLoader.tallyAllocations(vectorState.allocate(outerCardinality));
+ break;
+
+ case NEW_LOOK_AHEAD:
+
+ // Column is new, was not exchanged with backup vector
+
+ break;
+
+ case LOOK_AHEAD:
+
+ // Restore the look-ahead values to the main vector.
+
+ vectorState.startBatchWithLookAhead();
+ break;
+
+ default:
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+
+ // In all cases, we are back to normal writing.
+
+ state = State.NORMAL;
+ }
+
+ /**
+ * A column within the row batch overflowed. Prepare to absorb the rest of the
+ * in-flight row by rolling values over to a new vector, saving the complete
+ * vector for later. This column could have a value for the overflow row, or
+ * for some previous row, depending on exactly when and where the overflow
+ * occurs.
+ */
+
+ public void rollover() {
+ assert state == State.NORMAL;
+
+ // If the source index is 0, then we could not fit this one
+ // value in the original vector. Nothing will be accomplished by
+ // trying again with an an overflow vector. Just fail.
+ //
+ // Note that this is a judgment call. It is possible to allow the
+ // vector to double beyond the limit, but that will require a bit
+ // of thought to get right -- and, of course, completely defeats
+ // the purpose of limiting vector size to avoid memory fragmentation...
+
+ if (resultSetLoader.writerIndex().vectorIndex() == 0) {
+ throw UserException
+ .memoryError("A single column value is larger than the maximum allowed size of 16 MB")
+ .build(logger);
+ }
+
+ // Otherwise, do the roll-over to a look-ahead vector.
+
+ vectorState.rollover(outerCardinality);
+
+ // Remember that we did this overflow processing.
+
+ state = State.OVERFLOW;
+ }
+
+ /**
+ * Writing of a row batch is complete. Prepare the vector for harvesting
+ * to send downstream. If this batch encountered overflow, set aside the
+ * look-ahead vector and put the full vector buffer back into the active
+ * vector.
+ */
+
+ public void harvestWithLookAhead() {
+ switch (state) {
+ case NEW_LOOK_AHEAD:
+
+ // If added after overflow, no data to save from the complete
+ // batch: the vector does not appear in the completed batch.
+
+ break;
+
+ case OVERFLOW:
+
+ // Otherwise, restore the original, full buffer and
+ // last write position.
+
+ vectorState.harvestWithLookAhead();
+
+ // Remember that we have look-ahead values stashed away in the
+ // backup vector.
+
+ state = State.LOOK_AHEAD;
+ break;
+
+ default:
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+ }
+
+ public void close() {
+ vectorState.reset();
+ }
+
+ public void updateCardinality(int cardinality) {
+ outerCardinality = cardinality;
+ }
+
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attribute("addVersion", addVersion)
+ .attribute("state", state)
+ .attributeIdentity("writer", writer)
+ .attribute("vectorState")
+ ;
+ vectorState.dump(format);
+ format.endObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java
new file mode 100644
index 0000000..2fcc813
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+/**
+ * Represents a wildcard: SELECT * when used at the root tuple.
+ * When used with maps, means selection of all map columns, either
+ * implicitly, or because the map itself is selected.
+ */
+
+public class NullProjectionSet implements ProjectionSet {
+
+ private boolean allProjected;
+
+ public NullProjectionSet(boolean allProjected) {
+ this.allProjected = allProjected;
+ }
+
+ @Override
+ public boolean isProjected(String colName) { return allProjected; }
+
+ @Override
+ public ProjectionSet mapProjection(String colName) {
+ return new NullProjectionSet(allProjected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java
new file mode 100644
index 0000000..930dc30
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class NullResultVectorCacheImpl implements ResultVectorCache {
+
+ private final BufferAllocator allocator;
+
+ public NullResultVectorCacheImpl(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ @Override
+ public BufferAllocator allocator() { return allocator; }
+
+ @Override
+ public ValueVector addOrGet(MaterializedField colSchema) {
+ return TypeHelper.getNewVector(colSchema, allocator, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java
new file mode 100644
index 0000000..8372758
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+
+/**
+ * Do-nothing vector state for a map column which has no actual vector
+ * associated with it.
+ */
+
+public class NullVectorState implements VectorState {
+
+ @Override public int allocate(int cardinality) { return 0; }
+ @Override public void rollover(int cardinality) { }
+ @Override public void harvestWithLookAhead() { }
+ @Override public void startBatchWithLookAhead() { }
+ @Override public void reset() { }
+ @Override public ValueVector vector() { return null; }
+
+ public static class UnmanagedVectorState extends NullVectorState {
+ ValueVector vector;
+
+ public UnmanagedVectorState(ValueVector vector) {
+ this.vector = vector;
+ }
+
+ @Override
+ public ValueVector vector() { return vector; }
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format.startObject(this).endObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
new file mode 100644
index 0000000..bf91032
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.accessor.writer.NullableScalarWriter;
+
+public class NullableVectorState implements VectorState {
+
+ public static class BitsVectorState extends ValuesVectorState {
+
+ public BitsVectorState(ColumnMetadata schema, AbstractScalarWriter writer, ValueVector mainVector) {
+ super(schema, writer, mainVector);
+ }
+
+ @Override
+ public int allocateVector(ValueVector vector, int cardinality) {
+ ((FixedWidthVector) vector).allocateNew(cardinality);
+ return vector.getBufferSize();
+ }
+ }
+
+ private final ColumnMetadata schema;
+ private final NullableScalarWriter writer;
+ private final NullableVector vector;
+ private final ValuesVectorState bitsState;
+ private final ValuesVectorState valuesState;
+
+ public NullableVectorState(AbstractObjectWriter writer, NullableVector vector) {
+ this.schema = writer.schema();
+ this.vector = vector;
+
+ this.writer = (NullableScalarWriter) writer.scalar();
+ bitsState = new BitsVectorState(schema, this.writer.bitsWriter(), vector.getBitsVector());
+ valuesState = new ValuesVectorState(schema, this.writer.baseWriter(), vector.getValuesVector());
+ }
+
+ @Override
+ public int allocate(int cardinality) {
+ return bitsState.allocate(cardinality) +
+ valuesState.allocate(cardinality);
+ }
+
+ @Override
+ public void rollover(int cardinality) {
+ bitsState.rollover(cardinality);
+ valuesState.rollover(cardinality);
+ }
+
+ @Override
+ public void harvestWithLookAhead() {
+ bitsState.harvestWithLookAhead();
+ valuesState.harvestWithLookAhead();
+ }
+
+ @Override
+ public void startBatchWithLookAhead() {
+ bitsState.startBatchWithLookAhead();
+ valuesState.startBatchWithLookAhead();
+ }
+
+ @Override
+ public void reset() {
+ bitsState.reset();
+ valuesState.reset();
+ }
+
+ @Override
+ public ValueVector vector() { return vector; }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attribute("schema", schema)
+ .attributeIdentity("writer", writer)
+ .attributeIdentity("vector", vector)
+ .attribute("bitsState");
+ bitsState.dump(format);
+ format
+ .attribute("valuesState");
+ valuesState.dump(format);
+ format
+ .endObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
new file mode 100644
index 0000000..a743052
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.Collection;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Builder for the options for the row set loader. Reasonable defaults
+ * are provided for all options; use these options for test code or
+ * for clients that don't need special settings.
+ */
+
+public class OptionBuilder {
+ protected int vectorSizeLimit;
+ protected int rowCountLimit;
+ protected Collection<SchemaPath> projection;
+ protected ResultVectorCache vectorCache;
+ protected TupleMetadata schema;
+ protected long maxBatchSize;
+
+ public OptionBuilder() {
+ ResultSetOptions options = new ResultSetOptions();
+ vectorSizeLimit = options.vectorSizeLimit;
+ rowCountLimit = options.rowCountLimit;
+ maxBatchSize = options.maxBatchSize;
+ }
+
+ /**
+ * Specify the maximum number of rows per batch. Defaults to
+ * {@link BaseValueVector#INITIAL_VALUE_ALLOCATION}. Batches end either
+ * when this limit is reached, or when a vector overflows, whichever
+ * occurs first. The limit is capped at
+ * {@link ValueVector#MAX_ROW_COUNT}.
+ *
+ * @param limit the row count limit
+ * @return this builder
+ */
+
+ public OptionBuilder setRowCountLimit(int limit) {
+ rowCountLimit = Math.max(1,
+ Math.min(limit, ValueVector.MAX_ROW_COUNT));
+ return this;
+ }
+
+ public OptionBuilder setBatchSizeLimit(int bytes) {
+ maxBatchSize = bytes;
+ return this;
+ }
+
+ /**
+ * Record (batch) readers often read a subset of available table columns,
+ * but want to use a writer schema that includes all columns for ease of
+ * writing. (For example, a CSV reader must read all columns, even if the user
+ * wants a subset. The unwanted columns are simply discarded.)
+ * <p>
+ * This option provides a projection list, in the form of column names, for
+ * those columns which are to be projected. Only those columns will be
+ * backed by value vectors; non-projected columns will be backed by "null"
+ * writers that discard all values.
+ *
+ * @param projection the list of projected columns
+ * @return this builder
+ */
+
+ // TODO: Use SchemaPath in place of strings.
+
+ public OptionBuilder setProjection(Collection<SchemaPath> projection) {
+ this.projection = projection;
+ return this;
+ }
+
+ /**
+ * Downstream operators require "vector persistence": the same vector
+ * must represent the same column in every batch. For the scan operator,
+ * which creates multiple readers, this can be a challenge. The vector
+ * cache provides a transparent mechanism to enable vector persistence
+ * by returning the same vector for a set of independent readers. By
+ * default, the code uses a "null" cache which creates a new vector on
+ * each request. If a true cache is needed, the caller must provide one
+ * here.
+ */
+
+ public OptionBuilder setVectorCache(ResultVectorCache vectorCache) {
+ this.vectorCache = vectorCache;
+ return this;
+ }
+
+ /**
+ * Clients can use the row set builder in several ways:
+ * <ul>
+ * <li>Provide the schema up front, when known, by using this method to
+ * provide the schema.</li>
+ * <li>Discover the schema on the fly, adding columns during the write
+ * operation. Leave this method unset to start with an empty schema.</li>
+ * <li>A combination of the above.</li>
+ * </ul>
+ * @param schema the initial schema for the loader
+ * @return this builder
+ */
+
+ public OptionBuilder setSchema(TupleMetadata schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ // TODO: No setter for vector length yet: is hard-coded
+ // at present in the value vector.
+
+ public ResultSetOptions build() {
+ return new ResultSetOptions(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java
new file mode 100644
index 0000000..c97ec18
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+/**
+ * Primitive (non-map) column state. Handles all three cardinalities.
+ * Column metadata is hosted on the writer.
+ */
+
+public class PrimitiveColumnState extends ColumnState implements ColumnWriterListener {
+
+ public PrimitiveColumnState(ResultSetLoaderImpl resultSetLoader,
+ AbstractObjectWriter colWriter,
+ VectorState vectorState) {
+ super(resultSetLoader, colWriter, vectorState);
+ writer.bindListener(this);
+ }
+
+ public static PrimitiveColumnState newPrimitive(
+ ResultSetLoaderImpl resultSetLoader,
+ ValueVector vector,
+ AbstractObjectWriter writer) {
+ VectorState vectorState;
+ if (vector == null) {
+ vectorState = new NullVectorState();
+ } else {
+ vectorState = new ValuesVectorState(
+ writer.schema(),
+ (AbstractScalarWriter) writer.scalar(),
+ vector);
+ }
+ return new PrimitiveColumnState(resultSetLoader, writer,
+ vectorState);
+ }
+
+ public static PrimitiveColumnState newNullablePrimitive(
+ ResultSetLoaderImpl resultSetLoader,
+ ValueVector vector,
+ AbstractObjectWriter writer) {
+ VectorState vectorState;
+ if (vector == null) {
+ vectorState = new NullVectorState();
+ } else {
+ vectorState = new NullableVectorState(
+ writer,
+ (NullableVector) vector);
+ }
+ return new PrimitiveColumnState(resultSetLoader, writer,
+ vectorState);
+ }
+
+ public static PrimitiveColumnState newPrimitiveArray(
+ ResultSetLoaderImpl resultSetLoader,
+ ValueVector vector,
+ AbstractObjectWriter writer) {
+ VectorState vectorState;
+ if (vector == null) {
+ vectorState = new NullVectorState();
+ } else {
+ vectorState = new RepeatedVectorState(writer, (RepeatedValueVector) vector);
+ }
+ return new PrimitiveColumnState(resultSetLoader, writer,
+ vectorState);
+ }
+
+ @Override
+ public void overflowed(ScalarWriter writer) {
+ resultSetLoader.overflowed();
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public boolean canExpand(ScalarWriter writer, int delta) {
+ return resultSetLoader.canExpand(delta);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java
new file mode 100644
index 0000000..9ea118f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+/**
+ * Represents the set of columns projected for a tuple (row or map.)
+ * The projected columns might themselves be columns, so returns a
+ * projection set for such columns.
+ * <p>
+ * Three implementations exist:
+ * <ul>
+ * <li>Project all ({@link NullProjectionSet): used for a tuple when
+ * all columns are projected. Example: the root tuple (the row) in
+ * a <tt>SELECT *</tt> query.</li>
+ * <li>Project none (also {@link NullProjectionSet): used when no
+ * columns are projected from a tuple, such as when a map itself is
+ * not projected, so none of its member columns are projected.</li>
+ * <li>Project some ({@link ProjectionSetImpl}: used in the
+ * <tt>SELECT a, c, e</tt> case in which the query identifies which
+ * columns to project (implicitly leaving out others, such as b and
+ * d in our example.)</li>
+ * </ul>
+ * <p>
+ * The result is that each tuple (row and map) has an associated
+ * projection set which the code can query to determine if a newly
+ * added column is wanted (and so should have a backing vector) or
+ * is unwanted (and can just receive a dummy writer.)
+ */
+
+interface ProjectionSet {
+ boolean isProjected(String colName);
+ ProjectionSet mapProjection(String colName);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java
new file mode 100644
index 0000000..e17f486
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+
+/**
+ * Represents an explicit projection at some tuple level.
+ * <p>
+ * A column is projected if it is explicitly listed in the selection list.
+ * <p>
+ * If a column is a map, then the projection for the map's columns is based on
+ * two rules:
+ * <ol>
+ * <li>If the projection list includes at least one explicit mention of a map
+ * member, then include only those columns explicitly listed.</li>
+ * <li>If the projection at the parent level lists only the map column itself
+ * (which the projection can't know is a map), then assume this implies all
+ * columns, as if the entry where "map.*".</li>
+ * </ol>
+ * <p>
+ * Examples:<br>
+ * <code>m</code><br>
+ * If m turns out to be a map, project all members of m.<br>
+ * <code>m.a</code><br>
+ * Column m must be a map. Project only column a.<br>
+ * <code>m, m.a</code><br>
+ * Tricky case. We interpret this as projecting only the "a" element of map m.
+ * <p>
+ * The projection set is build from a list of columns, represented as
+ * {@link SchemaPath} objects, provided by the physical plan. The structure of
+ * <tt>SchemaPath</tt> is a bit awkward:
+ * <p>
+ * <ul>
+ * <li><tt>SchemaPath> is a wrapper for a column which directly holds the
+ * <tt>NameSegment</tt> for the top-level column.</li>
+ * <li><tt>NameSegment</tt> holds a name. This can be a top name such as
+ * `a`, or parts of a compound name such as `a`.`b`. Each <tt>NameSegment</tt>
+ * has a "child" that points to the option following parts of the name.</li>
+ * <li><PathSegment</tt> is the base class for the parts of a name.</tt>
+ * <li><tt>ArraySegment</tt> is the other kind of name part and represents
+ * an array index such as the "[1]" in `columns`[1].</li>
+ * <ul>
+ * The parser here consumes only names, this mechanism does not consider
+ * array indexes. As a result, there may be multiple projected columns that
+ * map to the same projection here: `columns`[1] and `columns`[2] both map to
+ * the name `columns`, for example.
+ */
+
+public class ProjectionSetImpl implements ProjectionSet {
+
+ Set<String> projection = new HashSet<>();
+ Map<String, ProjectionSetImpl> mapProjections = CaseInsensitiveMap
+ .newHashMap();
+
+ @Override
+ public boolean isProjected(String colName) {
+ return projection.contains(colName.toLowerCase());
+ }
+
+ @Override
+ public ProjectionSet mapProjection(String colName) {
+ ProjectionSet mapProj = mapProjections.get(colName.toLowerCase());
+ if (mapProj != null) {
+ return mapProj;
+ }
+
+ // No explicit information for the map. Members inherit the
+ // same projection as the map itself.
+
+ return new NullProjectionSet(isProjected(colName));
+ }
+
+ /**
+ * Parse a projection list. The list should consist of a list of column
+ * names; any wildcards should have been processed by the caller. An
+ * empty or null list means everything is projected (that is, an
+ * empty list here is equivalent to a wildcard in the SELECT
+ * statement.)
+ *
+ * @param projList
+ * @return
+ */
+ public static ProjectionSet parse(Collection<SchemaPath> projList) {
+ if (projList == null || projList.isEmpty()) {
+ return new NullProjectionSet(true);
+ }
+ ProjectionSetImpl projSet = new ProjectionSetImpl();
+ for (SchemaPath col : projList) {
+ projSet.addSegment(col.getRootSegment());
+ }
+ return projSet;
+ }
+
+ private void addSegment(NameSegment rootSegment) {
+ String rootKey = rootSegment.getPath().toLowerCase();
+ projection.add(rootKey);
+ PathSegment child = rootSegment.getChild();
+ if (child == null) {
+ return;
+ }
+ if (child.isArray()) {
+ // Ignore the [x] array suffix.
+ return;
+ }
+ ProjectionSetImpl map = mapProjections.get(rootKey);
+ if (map == null) {
+ map = new ProjectionSetImpl();
+ mapProjections.put(rootKey, map);
+ }
+ map.addSegment((NameSegment) child);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
new file mode 100644
index 0000000..98b6beb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+
+/**
+ * Vector state for a scalar array (repeated scalar) vector. Manages both the
+ * offsets vector and data vector during overflow and other operations.
+ */
+
+public class RepeatedVectorState implements VectorState {
+ private final ColumnMetadata schema;
+ private final AbstractArrayWriter arrayWriter;
+ private final RepeatedValueVector vector;
+ private final OffsetVectorState offsetsState;
+ private final ValuesVectorState valuesState;
+
+ public RepeatedVectorState(AbstractObjectWriter writer, RepeatedValueVector vector) {
+ this.schema = writer.schema();
+
+ // Get the repeated vector
+
+ this.vector = vector;
+
+ // Create the values state using the value (data) portion of the repeated
+ // vector, and the scalar (value) portion of the array writer.
+
+ arrayWriter = (AbstractArrayWriter) writer.array();
+ AbstractScalarWriter colWriter = (AbstractScalarWriter) arrayWriter.scalar();
+ valuesState = new ValuesVectorState(schema, colWriter, vector.getDataVector());
+
+ // Create the offsets state with the offset vector portion of the repeated
+ // vector, and the offset writer portion of the array writer.
+
+ offsetsState = new OffsetVectorState(arrayWriter.offsetWriter(),
+ vector.getOffsetVector(),
+ (AbstractObjectWriter) arrayWriter.entry());
+ }
+
+ @Override
+ public ValueVector vector() { return vector; }
+
+ @Override
+ public int allocate(int cardinality) {
+ return offsetsState.allocate(cardinality) +
+ valuesState.allocate(childCardinality(cardinality));
+ }
+
+ private int childCardinality(int cardinality) {
+ return cardinality * schema.expectedElementCount();
+ }
+
+ /**
+ * The column is a scalar or an array of scalars. We need to roll over both the column
+ * values and the offsets that point to those values. The index provided is
+ * the index into the offset vector. We use that to obtain the index of the
+ * values to roll-over.
+ * <p>
+ * Data structure:
+ * <p><pre></code>
+ * RepeatedVectorState (this class)
+ * +- OffsetVectorState
+ * . +- OffsetVectorWriter (A)
+ * . +- Offset vector (B)
+ * . +- Backup (e.g. look-ahead) offset vector
+ * +- ValuesVectorState
+ * . +- Scalar (element) writer (C)
+ * . +- Data (elements) vector (D)
+ * . +- Backup elements vector
+ * +- Array Writer
+ * . +- ColumnWriterIndex (for array as a whole)
+ * . +- OffsetVectorWriter (A)
+ * . . +- Offset vector (B)
+ * . +- ArrayElementWriterIndex
+ * . +- ScalarWriter (D)
+ * . . +- ArrayElementWriterIndex
+ * . . +- Scalar vector (D)
+ * </code></pre>
+ * <p>
+ * The top group of objects point into the writer objects in the second
+ * group. Letters in parens show the connections.
+ * <p>
+ * To perform the roll-over, we must:
+ * <ul>
+ * <li>Copy values from the current vectors to a set of new, look-ahead
+ * vectors.</li>
+ * <li>Swap buffers between the main and "backup" vectors, effectively
+ * moving the "full" batch to the sidelines, putting the look-ahead vectors
+ * into play in order to finish writing the current row.</li>
+ * <li>Update the writers to point to the look-ahead buffers, including
+ * the initial set of data copied into those vectors.</li>
+ * <li>Update the vector indexes to point to the next write positions
+ * after the values copied during roll-over.</li>
+ * </ul>
+ *
+ * @param cardinality the number of outer elements to create in the look-ahead
+ * vector
+ */
+
+ @Override
+ public void rollover(int cardinality) {
+
+ // Swap out the two vectors. The index presented to the caller
+ // is that of the data vector: the next position in the data
+ // vector to be set into the data vector writer index.
+
+ valuesState.rollover(childCardinality(cardinality));
+ offsetsState.rollover(cardinality);
+ }
+
+ @Override
+ public void harvestWithLookAhead() {
+ offsetsState.harvestWithLookAhead();
+ valuesState.harvestWithLookAhead();
+ }
+
+ @Override
+ public void startBatchWithLookAhead() {
+ offsetsState.startBatchWithLookAhead();
+ valuesState.startBatchWithLookAhead();
+ }
+
+ @Override
+ public void reset() {
+ offsetsState.reset();
+ valuesState.reset();
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attribute("schema", schema)
+ .attributeIdentity("writer", arrayWriter)
+ .attributeIdentity("vector", vector)
+ .attribute("offsetsState");
+ offsetsState.dump(format);
+ format
+ .attribute("valuesState");
+ valuesState.dump(format);
+ format
+ .endObject();
+ }
+}