You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:41 UTC
[14/15] drill git commit: DRILL-5657: Size-aware vector writer
structure
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
new file mode 100644
index 0000000..b875e7e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java
@@ -0,0 +1,775 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.Collection;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.TupleState.RowState;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+
+/**
+ * Implementation of the result set loader.
+ * @see {@link ResultSetLoader}
+ */
+
+public class ResultSetLoaderImpl implements ResultSetLoader {
+
+ /**
+ * Read-only set of options for the result set loader.
+ */
+
+ public static class ResultSetOptions {
+ public final int vectorSizeLimit;
+ public final int rowCountLimit;
+ public final ResultVectorCache vectorCache;
+ public final Collection<SchemaPath> projection;
+ public final TupleMetadata schema;
+ public final long maxBatchSize;
+
+ public ResultSetOptions() {
+ vectorSizeLimit = ValueVector.MAX_BUFFER_SIZE;
+ rowCountLimit = DEFAULT_ROW_COUNT;
+ projection = null;
+ vectorCache = null;
+ schema = null;
+ maxBatchSize = -1;
+ }
+
+ public ResultSetOptions(OptionBuilder builder) {
+ this.vectorSizeLimit = builder.vectorSizeLimit;
+ this.rowCountLimit = builder.rowCountLimit;
+ this.projection = builder.projection;
+ this.vectorCache = builder.vectorCache;
+ this.schema = builder.schema;
+ this.maxBatchSize = builder.maxBatchSize;
+ }
+
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attribute("vectorSizeLimit", vectorSizeLimit)
+ .attribute("rowCountLimit", rowCountLimit)
+ .attribute("projection", projection)
+ .endObject();
+ }
+ }
+
+ private enum State {
+ /**
+ * Before the first batch.
+ */
+
+ START,
+
+ /**
+ * Writing to a batch normally.
+ */
+
+ ACTIVE,
+
+ /**
+ * Batch overflowed a vector while writing. Can continue
+ * to write to a temporary "overflow" batch until the
+ * end of the current row.
+ */
+
+ OVERFLOW,
+
+ /**
+ * Temporary state to avoid batch-size related overflow while
+ * an overflow is in progress.
+ */
+
+ IN_OVERFLOW,
+
+ /**
+ * Batch is full due to reaching the row count limit
+ * when saving a row.
+ * No more writes allowed until harvesting the current batch.
+ */
+
+ FULL_BATCH,
+
+ /**
+ * Current batch was harvested: data is gone. No lookahead
+ * batch exists.
+ */
+
+ HARVESTED,
+
+ /**
+ * Current batch was harvested and its data is gone. However,
+ * overflow occurred during that batch and the data exists
+ * in the overflow vectors.
+ * <p>
+ * This state needs special consideration. The column writer
+ * structure maintains its state (offsets, etc.) from the OVERFLOW
+ * state, but the buffers currently in the vectors are from the
+ * complete batch. <b>No writes can be done in this state!</b>
+ * The writer state does not match the data in the buffers.
+ * The code here does what it can to catch this state. But, if
+ * some client tries to write to a column writer in this state,
+ * bad things will happen. Doing so is invalid (the write is outside
+ * of a batch), so this is not a terrible restriction.
+ * <p>
+ * Said another way, the current writer state is invalid with respect
+ * to the active buffers, but only if the writers try to act on the
+ * buffers. Since the writers won't do so, this temporary state is
+ * fine. The correct buffers are restored once a new batch is started
+ * and the state moves to ACTIVE.
+ */
+
+ LOOK_AHEAD,
+
+ /**
+ * Mutator is closed: no more operations are allowed.
+ */
+
+ CLOSED
+ }
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResultSetLoaderImpl.class);
+
+ /**
+ * Options provided to this loader.
+ */
+
+ private final ResultSetOptions options;
+
+ /**
+ * Allocator for vectors created by this loader.
+ */
+
+ final BufferAllocator allocator;
+
+ /**
+ * Internal structure used to work with the vectors (real or dummy) used
+ * by this loader.
+ */
+
+ final RowState rootState;
+
+ /**
+ * Top-level writer index that steps through the rows as they are written.
+ * When an overflow batch is in effect, indexes into that batch instead.
+ * Since a batch is really a tree of tuples, in which some branches of
+ * the tree are arrays, the root indexes here feeds into array indexes
+ * within the writer structure that points to the current position within
+ * an array column.
+ */
+
+ private final WriterIndexImpl writerIndex;
+
+ /**
+ * The row-level writer for stepping through rows as they are written,
+ * and for accessing top-level columns.
+ */
+
+ private final RowSetLoaderImpl rootWriter;
+
+ /**
+ * Vector cache for this loader.
+ * @see {@link OptionBuilder#setVectorCache()}.
+ */
+
+ private final ResultVectorCache vectorCache;
+
+ /**
+ * Tracks the state of the row set loader. Handling vector overflow requires
+ * careful stepping through a variety of states as the write proceeds.
+ */
+
+ private State state = State.START;
+
+ /**
+ * Track the current schema as seen by the writer. Each addition of a column
+ * anywhere in the schema causes the active schema version to increase by one.
+ * This allows very easy checks for schema changes: save the prior version number
+ * and compare it against the current version number.
+ */
+
+ private int activeSchemaVersion;
+
+ /**
+ * Track the current schema as seen by the consumer of the batches that this
+ * loader produces. The harvest schema version can be behind the active schema
+ * version in the case in which new columns are added to the overflow row.
+ * Since the overflow row won't be visible to the harvested batch, that batch
+ * sees the schema as it existed at a prior version: the harvest schema
+ * version.
+ */
+
+ private int harvestSchemaVersion;
+
+ /**
+ * Builds the harvest vector container that includes only the columns that
+ * are included in the harvest schema version. That is, it excludes columns
+ * added while writing the overflow row.
+ */
+
+ private VectorContainerBuilder containerBuilder;
+
+ /**
+ * Counts the batches harvested (sent downstream) from this loader. Does
+ * not include the current, in-flight batch.
+ */
+
+ private int harvestBatchCount;
+
+ /**
+ * Counts the rows included in previously-harvested batches. Does not
+ * include the number of rows in the current batch.
+ */
+
+ private int previousRowCount;
+
+ /**
+ * Number of rows in the harvest batch. If an overflow batch is in effect,
+ * then this is the number of rows in the "main" batch before the overflow;
+ * that is the number of rows in the batch that will be harvested. If no
+ * overflow row is in effect, then this number is undefined (and should be
+ * zero.)
+ */
+
+ private int pendingRowCount;
+
+ /**
+ * The number of rows per batch. Starts with the configured amount. Can be
+ * adjusted between batches, perhaps based on the actual observed size of
+ * input data.
+ */
+
+ private int targetRowCount;
+
+ /**
+ * Total bytes allocated to the current batch.
+ */
+
+ protected int accumulatedBatchSize;
+
+ protected final ProjectionSet projectionSet;
+
+ public ResultSetLoaderImpl(BufferAllocator allocator, ResultSetOptions options) {
+ this.allocator = allocator;
+ this.options = options;
+ targetRowCount = options.rowCountLimit;
+ writerIndex = new WriterIndexImpl(this);
+
+ if (options.vectorCache == null) {
+ vectorCache = new NullResultVectorCacheImpl(allocator);
+ } else {
+ vectorCache = options.vectorCache;
+ }
+
+ // If projection, build the projection map.
+
+ projectionSet = ProjectionSetImpl.parse(options.projection);
+
+ // Build the row set model depending on whether a schema is provided.
+
+ rootState = new RowState(this);
+ rootWriter = rootState.rootWriter();
+
+ // If no schema, columns will be added incrementally as they
+ // are discovered. Start with an empty model.
+
+ if (options.schema != null) {
+
+ // Schema provided. Populate a model (and create vectors) for the
+ // provided schema. The schema can be extended later, but normally
+ // won't be if known up front.
+
+ logger.debug("Schema: " + options.schema.toString());
+ rootState.buildSchema(options.schema);
+ }
+ }
+
+ private void updateCardinality() {
+ rootState.updateCardinality(targetRowCount());
+ }
+
+ public ResultSetLoaderImpl(BufferAllocator allocator) {
+ this(allocator, new ResultSetOptions());
+ }
+
+ public BufferAllocator allocator() { return allocator; }
+
+ protected int bumpVersion() {
+
+ // Update the active schema version. We cannot update the published
+ // schema version at this point because a column later in this same
+ // row might cause overflow, and any new columns in this row will
+ // be hidden until a later batch. But, if we are between batches,
+ // then it is fine to add the column to the schema.
+
+ activeSchemaVersion++;
+ switch (state) {
+ case HARVESTED:
+ case START:
+ case LOOK_AHEAD:
+ harvestSchemaVersion = activeSchemaVersion;
+ break;
+ default:
+ break;
+
+ }
+ return activeSchemaVersion;
+ }
+
+ @Override
+ public int schemaVersion() { return harvestSchemaVersion; }
+
+ @Override
+ public void startBatch() {
+ switch (state) {
+ case HARVESTED:
+ case START:
+ logger.trace("Start batch");
+ accumulatedBatchSize = 0;
+ updateCardinality();
+ rootState.startBatch();
+ checkInitialAllocation();
+
+ // The previous batch ended without overflow, so start
+ // a new batch, and reset the write index to 0.
+
+ writerIndex.reset();
+ rootWriter.startWrite();
+ break;
+
+ case LOOK_AHEAD:
+
+ // A row overflowed so keep the writer index at its current value
+ // as it points to the second row in the overflow batch. However,
+ // the last write position of each writer must be restored on
+ // a column-by-column basis, which is done by the visitor.
+
+ logger.trace("Start batch after overflow");
+ rootState.startBatch();
+
+ // Note: no need to do anything with the writers; they were left
+ // pointing to the correct positions in the look-ahead batch.
+ // The above simply puts the look-ahead vectors back "under"
+ // the writers.
+
+ break;
+
+ default:
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+
+ // Update the visible schema with any pending overflow batch
+ // updates.
+
+ harvestSchemaVersion = activeSchemaVersion;
+ pendingRowCount = 0;
+ state = State.ACTIVE;
+ }
+
+ @Override
+ public RowSetLoader writer() {
+ if (state == State.CLOSED) {
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+ return rootWriter;
+ }
+
+ @Override
+ public ResultSetLoader setRow(Object... values) {
+ startRow();
+ writer().setTuple(values);
+ saveRow();
+ return this;
+ }
+
+ /**
+ * Called before writing a new row. Implementation of
+ * {@link RowSetLoader#start()}.
+ */
+
+ protected void startRow() {
+ switch (state) {
+ case ACTIVE:
+
+ // Update the visible schema with any pending overflow batch
+ // updates.
+
+ harvestSchemaVersion = activeSchemaVersion;
+ rootWriter.startRow();
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+ }
+
+ /**
+ * Finalize the current row. Implementation of
+ * {@link RowSetLoader#save()}.
+ */
+
+ protected void saveRow() {
+ switch (state) {
+ case ACTIVE:
+ rootWriter.endArrayValue();
+ rootWriter.saveRow();
+ if (! writerIndex.next()) {
+ state = State.FULL_BATCH;
+ }
+
+ // No overflow row. Advertise the schema version to the client.
+
+ harvestSchemaVersion = activeSchemaVersion;
+ break;
+
+ case OVERFLOW:
+
+ // End the value of the look-ahead row in the look-ahead vectors.
+
+ rootWriter.endArrayValue();
+ rootWriter.saveRow();
+
+ // Advance the writer index relative to the look-ahead batch.
+
+ writerIndex.next();
+
+ // Stay in the overflow state. Doing so will cause the writer
+ // to report that it is full.
+ //
+ // Also, do not change the harvest schema version. We will
+ // expose to the downstream operators the schema in effect
+ // at the start of the row. Columns added within the row won't
+ // appear until the next batch.
+
+ break;
+
+ default:
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+ }
+
+ /**
+ * Implementation of {@link RowSetLoader#isFull()}
+ * @return true if the batch is full (reached vector capacity or the
+ * row count limit), false if more rows can be added
+ */
+
+ protected boolean isFull() {
+ switch (state) {
+ case ACTIVE:
+ return ! writerIndex.valid();
+ case OVERFLOW:
+ case FULL_BATCH:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public boolean writeable() {
+ return state == State.ACTIVE || state == State.OVERFLOW;
+ }
+
+ private boolean isBatchActive() {
+ return state == State.ACTIVE || state == State.OVERFLOW ||
+ state == State.FULL_BATCH ;
+ }
+
+ /**
+ * Implementation for {#link {@link RowSetLoader#rowCount()}.
+ *
+ * @return the number of rows to be sent downstream for this
+ * batch. Does not include the overflow row.
+ */
+
+ protected int rowCount() {
+ switch (state) {
+ case ACTIVE:
+ case FULL_BATCH:
+ return writerIndex.size();
+ case OVERFLOW:
+ return pendingRowCount;
+ default:
+ return 0;
+ }
+ }
+
+ protected WriterIndexImpl writerIndex() { return writerIndex; }
+
+ @Override
+ public void setTargetRowCount(int rowCount) {
+ targetRowCount = Math.max(1, rowCount);
+ }
+
+ @Override
+ public int targetRowCount() { return targetRowCount; }
+
+ @Override
+ public int targetVectorSize() { return options.vectorSizeLimit; }
+
+ protected void overflowed() {
+ logger.trace("Vector overflow");
+
+ // If we see overflow when we are already handling overflow, it means
+ // that a single value is too large to fit into an entire vector.
+ // Fail the query.
+ //
+ // Note that this is a judgment call. It is possible to allow the
+ // vector to double beyond the limit, but that will require a bit
+ // of thought to get right -- and, of course, completely defeats
+ // the purpose of limiting vector size to avoid memory fragmentation...
+ //
+ // Individual columns handle the case in which overflow occurs on the
+ // first row of the main batch. This check handles the pathological case
+ // in which we successfully overflowed, but then another column
+ // overflowed during the overflow row -- that indicates that that one
+ // column can't fit in an empty vector. That is, this check is for a
+ // second-order overflow.
+
+ if (state == State.OVERFLOW) {
+ throw UserException
+ .memoryError("A single column value is larger than the maximum allowed size of 16 MB")
+ .build(logger);
+ }
+ if (state != State.ACTIVE) {
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+ state = State.IN_OVERFLOW;
+
+ // Preserve the number of rows in the now-complete batch.
+
+ pendingRowCount = writerIndex.vectorIndex();
+
+ // Roll-over will allocate new vectors. Update with the latest
+ // array cardinality.
+
+ updateCardinality();
+
+// rootWriter.dump(new HierarchicalPrinter());
+
+ // Wrap up the completed rows into a batch. Sets
+ // vector value counts. The rollover data still exists so
+ // it can be moved, but it is now past the recorded
+ // end of the vectors (though, obviously, not past the
+ // physical end.)
+
+ rootWriter.preRollover();
+
+ // Roll over vector values.
+
+ accumulatedBatchSize = 0;
+ rootState.rollover();
+
+ // Adjust writer state to match the new vector values. This is
+ // surprisingly easy if we not that the current row is shifted to
+ // the 0 position in the new vector, so we just shift all offsets
+ // downward by the current row position at each repeat level.
+
+ rootWriter.postRollover();
+
+ // The writer index is reset back to 0. Because of the above roll-over
+ // processing, some vectors may now already have values in the 0 slot.
+ // However, the vector that triggered overflow has not yet written to
+ // the current record, and so will now write to position 0. After the
+ // completion of the row, all 0-position values should be written (or
+ // at least those provided by the client.)
+ //
+ // For arrays, the writer might have written a set of values
+ // (v1, v2, v3), and v4 might have triggered the overflow. In this case,
+ // the array values have been moved, offset vectors adjusted, the
+ // element writer adjusted, so that v4 will be written to index 3
+ // to produce (v1, v2, v3, v4, v5, ...) in the look-ahead vector.
+
+ writerIndex.rollover();
+ checkInitialAllocation();
+
+ // Remember that overflow is in effect.
+
+ state = State.OVERFLOW;
+ }
+
+ protected boolean hasOverflow() { return state == State.OVERFLOW; }
+
+ @Override
+ public VectorContainer harvest() {
+ int rowCount;
+ switch (state) {
+ case ACTIVE:
+ case FULL_BATCH:
+ rowCount = harvestNormalBatch();
+ logger.trace("Harvesting {} rows", rowCount);
+ break;
+ case OVERFLOW:
+ rowCount = harvestOverflowBatch();
+ logger.trace("Harvesting {} rows after overflow", rowCount);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state: " + state);
+ }
+
+ // Build the output container
+
+ VectorContainer container = outputContainer();
+ container.setRecordCount(rowCount);
+
+ // Finalize: update counts, set state.
+
+ harvestBatchCount++;
+ previousRowCount += rowCount;
+ return container;
+ }
+
+ private int harvestNormalBatch() {
+
+ // Wrap up the vectors: final fill-in, set value count, etc.
+
+ rootWriter.endBatch();
+ harvestSchemaVersion = activeSchemaVersion;
+ state = State.HARVESTED;
+ return writerIndex.size();
+ }
+
+ private int harvestOverflowBatch() {
+ rootState.harvestWithLookAhead();
+ state = State.LOOK_AHEAD;
+ return pendingRowCount;
+ }
+
+ @Override
+ public VectorContainer outputContainer() {
+ // Build the output container.
+
+ if (containerBuilder == null) {
+ containerBuilder = new VectorContainerBuilder(this);
+ }
+ containerBuilder.update(harvestSchemaVersion);
+ return containerBuilder.container();
+ }
+
+ @Override
+ public TupleMetadata harvestSchema() {
+ return containerBuilder.schema();
+ }
+
+ @Override
+ public void close() {
+ if (state == State.CLOSED) {
+ return;
+ }
+ rootState.close();
+
+ // Do not close the vector cache; the caller owns that and
+ // will, presumably, reuse those vectors for another writer.
+
+ state = State.CLOSED;
+ }
+
+ @Override
+ public int batchCount() {
+ return harvestBatchCount + (rowCount() == 0 ? 0 : 1);
+ }
+
+ @Override
+ public int totalRowCount() {
+ int total = previousRowCount;
+ if (isBatchActive()) {
+ total += pendingRowCount + writerIndex.size();
+ }
+ return total;
+ }
+
+ public ResultVectorCache vectorCache() { return vectorCache; }
+ public RowState rootState() { return rootState; }
+
+ /**
+ * Return whether a vector within the current batch can expand. Limits
+ * are enforce only if a limit was provided in the options.
+ *
+ * @param delta increase in vector size
+ * @return true if the vector can expand, false if an overflow
+ * event should occur
+ */
+
+ public boolean canExpand(int delta) {
+ accumulatedBatchSize += delta;
+ return state == State.IN_OVERFLOW ||
+ options.maxBatchSize <= 0 ||
+ accumulatedBatchSize <= options.maxBatchSize;
+ }
+
+ /**
+ * Accumulate the initial vector allocation sizes.
+ *
+ * @param allocationBytes number of bytes allocated to a vector
+ * in the batch setup step
+ */
+
+ public void tallyAllocations(int allocationBytes) {
+ accumulatedBatchSize += allocationBytes;
+ }
+
+ /**
+ * Log and check the initial vector allocation. If a batch size
+ * limit is set, warn if the initial allocation exceeds the limit.
+ * This will occur if the target row count is incorrect for the
+ * data size.
+ */
+
+ private void checkInitialAllocation() {
+ if (options.maxBatchSize < 0) {
+ logger.debug("Initial vector allocation: {}, no batch limit specified",
+ accumulatedBatchSize);
+ }
+ else if (accumulatedBatchSize > options.maxBatchSize) {
+ logger.warn("Initial vector allocation: {}, but batch size limit is: {}",
+ accumulatedBatchSize, options.maxBatchSize);
+ } else {
+ logger.debug("Initial vector allocation: {}, batch size limit: {}",
+ accumulatedBatchSize, options.maxBatchSize);
+ }
+ }
+
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attribute("options");
+ options.dump(format);
+ format
+ .attribute("index", writerIndex.vectorIndex())
+ .attribute("state", state)
+ .attribute("activeSchemaVersion", activeSchemaVersion)
+ .attribute("harvestSchemaVersion", harvestSchemaVersion)
+ .attribute("pendingRowCount", pendingRowCount)
+ .attribute("targetRowCount", targetRowCount)
+ ;
+ format.attribute("root");
+ rootState.dump(format);
+ format.attribute("rootWriter");
+ rootWriter.dump(format);
+ format.endObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java
new file mode 100644
index 0000000..c7288b2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Manages an inventory of value vectors used across row batch readers.
+ * Drill semantics for batches is complex. Each operator logically returns
+ * a batch of records on each call of the Drill Volcano iterator protocol
+ * <tt>next()</tt> operation. However, the batches "returned" are not
+ * separate objects. Instead, Drill enforces the following semantics:
+ * <ul>
+ * <li>If a <tt>next()</tt> call returns <tt>OK</tt> then the set of vectors
+ * in the "returned" batch must be identical to those in the prior batch. Not
+ * just the same type; they must be the same <tt>ValueVector</tt> objects.
+ * (The buffers within the vectors will be different.)</li>
+ * <li>If the set of vectors changes in any way (add a vector, remove a
+ * vector, change the type of a vector), then the <tt>next()</tt> call
+ * <b>must</b> return <tt>OK_NEW_SCHEMA</tt>.</ul>
+ * </ul>
+ * These rules create interesting constraints for the scan operator.
+ * Conceptually, each batch is distinct. But, it must share vectors. The
+ * {@link ResultSetLoader} class handles this by managing the set of vectors
+ * used by a single reader.
+ * <p>
+ * Readers are independent: each may read a distinct schema (as in JSON.)
+ * Yet, the Drill protocol requires minimizing spurious <tt>OK_NEW_SCHEMA</tt>
+ * events. As a result, two readers run by the same scan operator must
+ * share the same set of vectors, despite the fact that they may have
+ * different schemas and thus different <tt>ResultSetLoader</tt>s.
+ * <p>
+ * The purpose of this inventory is to persist vectors across readers, even
+ * when, say, reader B does not use a vector that reader A created.
+ * <p>
+ * The semantics supported by this class include:
+ * <ul>
+ * <li>Ability to "pre-declare" columns based on columns that appear in
+ * an explicit select list. This ensures that the columns are known (but
+ * not their types).</li>
+ * <li>Ability to reuse a vector across readers if the column retains the same
+ * name and type (minor type and mode.)</li>
+ * <li>Ability to flush unused vectors for readers with changing schemas
+ * if a schema change occurs.</li>
+ * <li>Support schema "hysteresis"; that is, the a "sticky" schema that
+ * minimizes spurious changes. Once a vector is declared, it can be included
+ * in all subsequent batches (provided the column is nullable or an array.)</li>
+ * </ul>
+ */
+public class ResultVectorCacheImpl implements ResultVectorCache {
+
+ /**
+ * State of a projected vector. At first all we have is a name.
+ * Later, we'll discover the type.
+ */
+
+ private static class VectorState {
+ protected final String name;
+ protected ValueVector vector;
+ protected boolean touched;
+
+ public VectorState(String name) {
+ this.name = name;
+ }
+
+ public boolean satisfies(MaterializedField colSchema) {
+ if (vector == null) {
+ return false;
+ }
+ MaterializedField vectorSchema = vector.getField();
+ return vectorSchema.getType().equals(colSchema.getType());
+ }
+ }
+
+ private final BufferAllocator allocator;
+ private final Map<String, VectorState> vectors = new HashMap<>();
+
+ public ResultVectorCacheImpl(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ @Override
+ public BufferAllocator allocator() { return allocator; }
+
+ public void predefine(List<String> selected) {
+ for (String colName : selected) {
+ addVector(colName);
+ }
+ }
+
+ private VectorState addVector(String colName) {
+ VectorState vs = new VectorState(colName);
+ vectors.put(vs.name, vs);
+ return vs;
+ }
+
+ public void newBatch() {
+ for (VectorState vs : vectors.values()) {
+ vs.touched = false;
+ }
+ }
+
+ public void trimUnused() {
+ List<VectorState> unused = new ArrayList<>();
+ for (VectorState vs : vectors.values()) {
+ if (! vs.touched) {
+ unused.add(vs);
+ }
+ }
+ if (unused.isEmpty()) {
+ return;
+ }
+ for (VectorState vs : unused) {
+ vectors.remove(vs.name);
+ }
+ }
+
+ @Override
+ public ValueVector addOrGet(MaterializedField colSchema) {
+ VectorState vs = vectors.get(colSchema.getName());
+
+ // If the vector is found, and is of the right type, reuse it.
+
+ if (vs != null && vs.satisfies(colSchema)) {
+ return vs.vector;
+ }
+
+ // If no vector, this is a late schema. Create the vector.
+
+ if (vs == null) {
+ vs = addVector(colSchema.getName());
+
+ // Else, if the vector changed type, close the old one.
+
+ } else if (vs.vector != null) {
+ vs.vector.close();
+ vs.vector = null;
+ }
+
+ // Create the new vector.
+
+ vs.touched = true;
+ vs.vector = TypeHelper.getNewVector(colSchema, allocator, null);
+ return vs.vector;
+ }
+
+ public MajorType getType(String name) {
+ VectorState vs = vectors.get(name);
+ if (vs == null || vs.vector == null) {
+ return null;
+ }
+ return vs.vector.getField().getType();
+ }
+
+ public void close() {
+ for (VectorState vs : vectors.values()) {
+ vs.vector.close();
+ }
+ vectors.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java
new file mode 100644
index 0000000..ec61ae7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.ArrayList;
+
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
+
+/**
+ * Implementation of the row set loader. Provides row-level operations, leaving the
+ * result set loader to provide batch-level operations. However, all control
+ * operations are actually delegated to the result set loader, which handles
+ * the details of working with overflow rows.
+ */
+
+public class RowSetLoaderImpl extends AbstractTupleWriter implements RowSetLoader {
+
+ private final ResultSetLoaderImpl rsLoader;
+
+ protected RowSetLoaderImpl(ResultSetLoaderImpl rsLoader, TupleMetadata schema) {
+ super(schema, new ArrayList<AbstractObjectWriter>());
+ this.rsLoader = rsLoader;
+ bindIndex(rsLoader.writerIndex());
+ }
+
+ @Override
+ public ResultSetLoader loader() { return rsLoader; }
+
+ @Override
+ public RowSetLoader addRow(Object...values) {
+ if (! start()) {
+ throw new IllegalStateException("Batch is full.");
+ }
+ setObject(values);
+ save();
+ return this;
+ }
+
+ @Override
+ public int rowIndex() { return rsLoader.writerIndex().vectorIndex(); }
+
+ @Override
+ public void save() { rsLoader.saveRow(); }
+
+ @Override
+ public boolean start() {
+ if (rsLoader.isFull()) {
+
+ // Full batch? Return false.
+
+ return false;
+ } else if (state == State.IN_ROW) {
+
+ // Already in a row? Rewind the to start of the row.
+
+ restartRow();
+ } else {
+
+ // Otherwise, advance to the next row.
+
+ rsLoader.startRow();
+ }
+ return true;
+ }
+
+ public void endBatch() {
+ if (state == State.IN_ROW) {
+ restartRow();
+ state = State.IN_WRITE;
+ }
+ endWrite();
+ }
+
+ @Override
+ public boolean isFull( ) { return rsLoader.isFull(); }
+
+ @Override
+ public int rowCount() { return rsLoader.rowCount(); }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
new file mode 100644
index 0000000..f6bc5f3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter;
+
+/**
+ * Base class for a single vector. Handles the bulk of work for that vector.
+ * Subclasses are specialized for offset vectors or values vectors.
+ * (The "single vector" name contrasts with classes that manage compound
+ * vectors, such as a data and offsets vector.)
+ */
+
+public abstract class SingleVectorState implements VectorState {
+
+ /**
+ * State for a scalar value vector. The vector might be for a simple (non-array)
+ * vector, or might be the payload part of a scalar array (repeated scalar)
+ * vector.
+ */
+
+ public static class ValuesVectorState extends SingleVectorState {
+
+ private final ColumnMetadata schema;
+
+ public ValuesVectorState(ColumnMetadata schema, AbstractScalarWriter writer, ValueVector mainVector) {
+ super(writer, mainVector);
+ this.schema = schema;
+ }
+
+ @Override
+ public int allocateVector(ValueVector vector, int cardinality) {
+ if (schema.isVariableWidth()) {
+
+ // Cap the allocated size to the maximum.
+
+ int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) cardinality * schema.expectedWidth());
+ ((VariableWidthVector) vector).allocateNew(size, cardinality);
+ } else {
+ ((FixedWidthVector) vector).allocateNew(cardinality);
+ }
+ return vector.getBufferSize();
+ }
+
+ @Override
+ protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
+ int newIndex = 0;
+ ResultSetLoaderImpl.logger.trace("Vector {} of type {}: copy {} values from {} to {}",
+ mainVector.getField().toString(),
+ mainVector.getClass().getSimpleName(),
+ Math.max(0, sourceEndIndex - sourceStartIndex + 1),
+ sourceStartIndex, newIndex);
+
+ // Copy overflow values from the full vector to the new
+ // look-ahead vector. Uses vector-level operations for convenience.
+ // These aren't very efficient, but overflow does not happen very
+ // often.
+
+ for (int src = sourceStartIndex; src <= sourceEndIndex; src++, newIndex++) {
+ mainVector.copyEntry(newIndex, backupVector, src);
+ }
+ }
+ }
+
+ /**
+ * Special case for an offset vector. Offset vectors are managed like any other
+ * vector with respect to overflow and allocation. This means that the loader
+ * classes avoid the use of the RepeatedVector class methods, instead working
+ * with the offsets vector (here) or the values vector to allow the needed
+ * fine control over overflow operations.
+ */
+
+ public static class OffsetVectorState extends SingleVectorState {
+
+ private final AbstractObjectWriter childWriter;
+
+ public OffsetVectorState(AbstractScalarWriter writer, ValueVector mainVector,
+ AbstractObjectWriter childWriter) {
+ super(writer, mainVector);
+ this.childWriter = childWriter;
+ }
+
+ @Override
+ public int allocateVector(ValueVector toAlloc, int cardinality) {
+ ((UInt4Vector) toAlloc).allocateNew(cardinality);
+ return toAlloc.getBufferSize();
+ }
+
+ public int rowStartOffset() {
+ return ((OffsetVectorWriter) writer).rowStartOffset();
+ }
+
+ @Override
+ protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
+
+ if (sourceStartIndex > sourceEndIndex) {
+ return;
+ }
+
+ // This is an offset vector. The data to copy is one greater
+ // than the row index.
+
+ sourceStartIndex++;
+ sourceEndIndex++;
+
+ // Copy overflow values from the full vector to the new
+ // look-ahead vector. Since this is an offset vector, values must
+ // be adjusted as they move across.
+ //
+ // Indexing can be confusing. Offset vectors have values offset
+ // from their row by one position. The offset vector position for
+ // row i has the start value for row i. The offset vector position for
+ // i+1 has the start of the next value. The difference between the
+ // two is the element length. As a result, the offset vector always has
+ // one more value than the number of rows, and position 0 is always 0.
+ //
+ // The index passed in here is that of the row that overflowed. That
+ // offset vector position contains the offset of the start of the data
+ // for the current row. We must subtract that offset from each copied
+ // value to adjust the offset for the destination.
+
+ UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) backupVector).getAccessor();
+ UInt4Vector.Mutator destMutator = ((UInt4Vector) mainVector).getMutator();
+ int offset = childWriter.events().writerIndex().rowStartIndex();
+ int newIndex = 1;
+ ResultSetLoaderImpl.logger.trace("Offset vector: copy {} values from {} to {} with offset {}",
+ Math.max(0, sourceEndIndex - sourceStartIndex + 1),
+ sourceStartIndex, newIndex, offset);
+ assert offset == sourceAccessor.get(sourceStartIndex - 1);
+
+ // Position zero is special and will be filled in by the writer
+ // later.
+
+ for (int src = sourceStartIndex; src <= sourceEndIndex; src++, newIndex++) {
+ destMutator.set(newIndex, sourceAccessor.get(src) - offset);
+ }
+// VectorPrinter.printOffsets((UInt4Vector) backupVector, sourceStartIndex - 1, sourceEndIndex - sourceStartIndex + 3);
+// VectorPrinter.printOffsets((UInt4Vector) mainVector, 0, newIndex);
+ }
+ }
+
+ protected final AbstractScalarWriter writer;
+ protected final ValueVector mainVector;
+ protected ValueVector backupVector;
+
+ public SingleVectorState(AbstractScalarWriter writer, ValueVector mainVector) {
+ this.writer = writer;
+ this.mainVector = mainVector;
+ }
+
+ @Override
+ public ValueVector vector() { return mainVector; }
+
+ @Override
+ public int allocate(int cardinality) {
+ return allocateVector(mainVector, cardinality);
+ }
+
+ protected abstract int allocateVector(ValueVector vector, int cardinality);
+
+ /**
+ * A column within the row batch overflowed. Prepare to absorb the rest of
+ * the in-flight row by rolling values over to a new vector, saving the
+ * complete vector for later. This column could have a value for the overflow
+ * row, or for some previous row, depending on exactly when and where the
+ * overflow occurs.
+ *
+ * @param sourceStartIndex the index of the row that caused the overflow, the
+ * values of which should be copied to a new "look-ahead" vector. If the
+ * vector is an array, then the overflowIndex is the position of the first
+ * element to be moved, and multiple elements may need to move
+ */
+
+ @Override
+ public void rollover(int cardinality) {
+
+ int sourceStartIndex = writer.writerIndex().rowStartIndex();
+
+ // Remember the last write index for the original vector.
+ // This tells us the end of the set of values to move, while the
+ // sourceStartIndex above tells us the start.
+
+ int sourceEndIndex = writer.lastWriteIndex();
+
+ // Switch buffers between the backup vector and the writer's output
+ // vector. Done this way because writers are bound to vectors and
+ // we wish to keep the binding.
+
+ if (backupVector == null) {
+ backupVector = TypeHelper.getNewVector(mainVector.getField(), mainVector.getAllocator(), null);
+ }
+ assert cardinality > 0;
+ allocateVector(backupVector, cardinality);
+ mainVector.exchange(backupVector);
+
+ // Copy overflow values from the full vector to the new
+ // look-ahead vector.
+
+ copyOverflow(sourceStartIndex, sourceEndIndex);
+
+ // At this point, the writer is positioned to write to the look-ahead
+ // vector at the position after the copied values. The original vector
+ // is saved along with a last write position that is no greater than
+ // the retained values.
+ }
+
+ protected abstract void copyOverflow(int sourceStartIndex, int sourceEndIndex);
+
+ /**
+ * Exchange the data from the backup vector and the main vector, putting
+ * the completed buffers back into the main vectors, and stashing the
+ * overflow buffers away in the backup vector.
+ * Restore the main vector's last write position.
+ */
+
+ @Override
+ public void harvestWithLookAhead() {
+ mainVector.exchange(backupVector);
+ }
+
+ /**
+ * The previous full batch has been sent downstream and the client is
+ * now ready to start writing to the next batch. Initialize that new batch
+ * with the look-ahead values saved during overflow of the previous batch.
+ */
+
+ @Override
+ public void startBatchWithLookAhead() {
+ mainVector.exchange(backupVector);
+ backupVector.clear();
+ }
+
+ @Override
+ public void reset() {
+ mainVector.clear();
+ if (backupVector != null) {
+ backupVector.clear();
+ }
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attributeIdentity("writer", writer)
+ .attributeIdentity("mainVector", mainVector)
+ .attributeIdentity("backupVector", backupVector)
+ .endObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
new file mode 100644
index 0000000..de41ee4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseMapColumnState;
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.MapArrayColumnState;
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.MapColumnState;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+import org.apache.drill.exec.record.TupleSchema.AbstractColumnMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
+import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
+
+/**
+ * Represents the loader state for a tuple: a row or a map. This is "state" in
+ * the sense of variables that are carried along with each tuple. Handles
+ * write-time issues such as defining new columns, allocating memory, handling
+ * overflow, assembling the output version of the map, and so on. Each
+ * row and map in the result set has a tuple state instances associated
+ * with it.
+ * <p>
+ * Here, by "tuple" we mean a container of vectors, each of which holds
+ * a variety of values. So, the "tuple" here is structural, not a specific
+ * set of values, but rather the collection of vectors that hold tuple
+ * values.
+ */
+
+public abstract class TupleState implements TupleWriterListener {
+
+ /**
+ * Handles the details of the top-level tuple, the data row itself.
+ * Note that by "row" we mean the set of vectors that define the
+ * set of rows.
+ */
+
+ public static class RowState extends TupleState {
+
+ /**
+ * The row-level writer for stepping through rows as they are written,
+ * and for accessing top-level columns.
+ */
+
+ private final RowSetLoaderImpl writer;
+
+ public RowState(ResultSetLoaderImpl rsLoader) {
+ super(rsLoader, rsLoader.projectionSet);
+ writer = new RowSetLoaderImpl(rsLoader, schema);
+ writer.bindListener(this);
+ }
+
+ public RowSetLoaderImpl rootWriter() { return writer; }
+
+ @Override
+ public AbstractTupleWriter writer() { return writer; }
+
+ @Override
+ public int innerCardinality() { return resultSetLoader.targetRowCount();}
+ }
+
+ /**
+ * Represents a tuple defined as a Drill map: single or repeated. Note that
+ * the map vector does not exist here; it is assembled only when "harvesting"
+ * a batch. This design supports the obscure case in which a new column
+ * is added during an overflow row, so exists within this abstraction,
+ * but is not published to the map that makes up the output.
+ */
+
+ public static class MapState extends TupleState {
+
+ protected final BaseMapColumnState mapColumnState;
+ protected int outerCardinality;
+
+ public MapState(ResultSetLoaderImpl rsLoader,
+ BaseMapColumnState mapColumnState,
+ ProjectionSet projectionSet) {
+ super(rsLoader, projectionSet);
+ this.mapColumnState = mapColumnState;
+ mapColumnState.writer().bindListener(this);
+ }
+
+ /**
+ * Return the tuple writer for the map. If this is a single
+ * map, then it is the writer itself. If this is a map array,
+ * then the tuple is nested inside the array.
+ */
+
+ @Override
+ public AbstractTupleWriter writer() {
+ AbstractObjectWriter objWriter = mapColumnState.writer();
+ TupleWriter tupleWriter;
+ if (objWriter.type() == ObjectType.ARRAY) {
+ tupleWriter = objWriter.array().tuple();
+ } else {
+ tupleWriter = objWriter.tuple();
+ }
+ return (AbstractTupleWriter) tupleWriter;
+ }
+
+ /**
+ * In order to allocate the correct-sized vectors, the map must know
+ * its member cardinality: the number of elements in each row. This
+ * is 1 for a single map, but may be any number for a map array. Then,
+ * this value is recursively pushed downward to compute the cardinality
+ * of lists of maps that contains lists of maps, and so on.
+ */
+
+ @Override
+ public void updateCardinality(int outerCardinality) {
+ this.outerCardinality = outerCardinality;
+ super.updateCardinality(outerCardinality);
+ }
+
+ @Override
+ public int innerCardinality() {
+ return outerCardinality * mapColumnState.schema().expectedElementCount();
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attribute("column", mapColumnState.schema().name())
+ .attribute("cardinality", outerCardinality)
+ .endObject();
+ }
+ }
+
+ protected final ResultSetLoaderImpl resultSetLoader;
+ protected final List<ColumnState> columns = new ArrayList<>();
+ protected final TupleSchema schema = new TupleSchema();
+ protected final ProjectionSet projectionSet;
+
+ protected TupleState(ResultSetLoaderImpl rsLoader, ProjectionSet projectionSet) {
+ this.resultSetLoader = rsLoader;
+ this.projectionSet = projectionSet;
+ }
+
+ public abstract int innerCardinality();
+
+ /**
+ * Returns an ordered set of the columns which make up the tuple.
+ * Column order is the same as that defined by the map's schema,
+ * to allow indexed access. New columns always appear at the end
+ * of the list to preserve indexes.
+ *
+ * @return ordered list of column states for the columns within
+ * this tuple
+ */
+
+ public List<ColumnState> columns() { return columns; }
+
+ public TupleMetadata schema() { return writer().schema(); }
+
+ public abstract AbstractTupleWriter writer();
+
+ @Override
+ public ObjectWriter addColumn(TupleWriter tupleWriter, MaterializedField column) {
+ return addColumn(tupleWriter, TupleSchema.fromField(column));
+ }
+
+ @Override
+ public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSchema) {
+
+ // Verify name is not a (possibly case insensitive) duplicate.
+
+ TupleMetadata tupleSchema = schema();
+ String colName = columnSchema.name();
+ if (tupleSchema.column(colName) != null) {
+ throw new IllegalArgumentException("Duplicate column: " + colName);
+ }
+
+ return addColumn(columnSchema);
+ }
+
+ /**
+ * Implementation of the work to add a new column to this tuple given a
+ * schema description of the column.
+ *
+ * @param columnSchema schema of the column
+ * @return writer for the new column
+ */
+
+ private AbstractObjectWriter addColumn(ColumnMetadata columnSchema) {
+
+ // Indicate projection in the metadata.
+
+ ((AbstractColumnMetadata) columnSchema).setProjected(
+ projectionSet.isProjected(columnSchema.name()));
+
+ // Build the column
+
+ ColumnState colState;
+ if (columnSchema.isMap()) {
+ colState = buildMap(columnSchema);
+ } else {
+ colState = buildPrimitive(columnSchema);
+ }
+ columns.add(colState);
+ colState.updateCardinality(innerCardinality());
+ colState.allocateVectors();
+ return colState.writer();
+ }
+
+ /**
+ * Build a primitive column. Check if the column is projected. If not,
+ * allocate a dummy writer for the column. If projected, then allocate
+ * a vector, a writer, and the column state which binds the two together
+ * and manages the column.
+ *
+ * @param columnSchema schema of the new primitive column
+ * @return column state for the new column
+ */
+
+ @SuppressWarnings("resource")
+ private ColumnState buildPrimitive(ColumnMetadata columnSchema) {
+ ValueVector vector;
+ if (columnSchema.isProjected()) {
+
+ // Create the vector for the column.
+
+ vector = resultSetLoader.vectorCache().addOrGet(columnSchema.schema());
+ } else {
+
+ // Column is not projected. No materialized backing for the column.
+
+ vector = null;
+ }
+
+ // Create the writer. Will be returned to the tuple writer.
+
+ AbstractObjectWriter colWriter = ColumnWriterFactory.buildColumnWriter(columnSchema, vector);
+
+ if (columnSchema.isArray()) {
+ return PrimitiveColumnState.newPrimitiveArray(resultSetLoader, vector, colWriter);
+ } else {
+ return PrimitiveColumnState.newPrimitive(resultSetLoader, vector, colWriter);
+ }
+ }
+
+ /**
+ * Build a new map (single or repeated) column. No map vector is created
+ * here, instead we create a tuple state to hold the columns, and defer the
+ * map vector (or vector container) until harvest time.
+ *
+ * @param columnSchema description of the map column
+ * @return column state for the map column
+ */
+
+ private ColumnState buildMap(ColumnMetadata columnSchema) {
+
+ // When dynamically adding columns, must add the (empty)
+ // map by itself, then add columns to the map via separate
+ // calls.
+
+ assert columnSchema.isMap();
+ assert columnSchema.mapSchema().size() == 0;
+
+ // Create the writer. Will be returned to the tuple writer.
+
+ ProjectionSet childProjection = projectionSet.mapProjection(columnSchema.name());
+ if (columnSchema.isArray()) {
+ return MapArrayColumnState.build(resultSetLoader,
+ columnSchema,
+ childProjection);
+ } else {
+ return new MapColumnState(resultSetLoader,
+ columnSchema,
+ childProjection);
+ }
+ }
+
+ /**
+ * When creating a schema up front, provide the schema of the desired tuple,
+ * then build vectors and writers to match. Allows up-front schema definition
+ * in addition to on-the-fly schema creation handled elsewhere.
+ *
+ * @param schema desired tuple schema to be materialized
+ */
+
+ public void buildSchema(TupleMetadata schema) {
+ for (int i = 0; i < schema.size(); i++) {
+ ColumnMetadata colSchema = schema.metadata(i);
+ AbstractObjectWriter colWriter;
+ if (colSchema.isMap()) {
+ colWriter = addColumn(colSchema.cloneEmpty());
+ BaseMapColumnState mapColState = (BaseMapColumnState) columns.get(columns.size() - 1);
+ mapColState.mapState().buildSchema(colSchema.mapSchema());
+ } else {
+ colWriter = addColumn(colSchema);
+ }
+ writer().addColumnWriter(colWriter);
+ }
+ }
+
+ public void updateCardinality(int cardinality) {
+ for (ColumnState colState : columns) {
+ colState.updateCardinality(cardinality);
+ }
+ }
+
+ /**
+ * A column within the row batch overflowed. Prepare to absorb the rest of the
+ * in-flight row by rolling values over to a new vector, saving the complete
+ * vector for later. This column could have a value for the overflow row, or
+ * for some previous row, depending on exactly when and where the overflow
+ * occurs.
+ */
+
+ public void rollover() {
+ for (ColumnState colState : columns) {
+ colState.rollover();
+ }
+ }
+
+ /**
+ * Writing of a row batch is complete, and an overflow occurred. Prepare the
+ * vector for harvesting to send downstream. Set aside the look-ahead vector
+ * and put the full vector buffer back into the active vector.
+ */
+
+ public void harvestWithLookAhead() {
+ for (ColumnState colState : columns) {
+ colState.harvestWithLookAhead();
+ }
+ }
+
+ /**
+ * Start a new batch by shifting the overflow buffers back into the main
+ * write vectors and updating the writers.
+ */
+
+ public void startBatch() {
+ for (ColumnState colState : columns) {
+ colState.startBatch();
+ }
+ }
+
+ /**
+ * Clean up state (such as backup vectors) associated with the state
+ * for each vector.
+ */
+
+ public void close() {
+ for (ColumnState colState : columns) {
+ colState.close();
+ }
+ }
+
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attributeArray("columns");
+ for (int i = 0; i < columns.size(); i++) {
+ format.element(i);
+ columns.get(i).dump(format);
+ }
+ format
+ .endArray()
+ .endObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java
new file mode 100644
index 0000000..faa68cb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import java.util.List;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseMapColumnState;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+/**
+ * Builds the harvest vector container that includes only the columns that
+ * are included in the harvest schema version. That is, it excludes columns
+ * added while writing an overflow row.
+ * <p>
+ * Because a Drill row is actually a hierarchy, walks the internal hierarchy
+ * and builds a corresponding output hierarchy.
+ * <ul>
+ * <li>The root node is the row itself (vector container),</li>
+ * <li>Internal nodes are maps (structures),</li>
+ * <li>Leaf notes are primitive vectors (which may be arrays).</li>
+ * </ul>
+ * The basic algorithm is to identify the version of the output schema,
+ * then add any new columns added up to that version. This object maintains
+ * the output container across batches, meaning that updates are incremental:
+ * we need only add columns that are new since the last update. And, those new
+ * columns will always appear directly after all existing columns in the row
+ * or in a map.
+ * <p>
+ * As special case occurs when columns are added in the overflow row. These
+ * columns <i>do not</i> appear in the output container for the main part
+ * of the batch; instead they appear in the <i>next</i> output container
+ * that includes the overflow row.
+ * <p>
+ * Since the container here may contain a subset of the internal columns, an
+ * interesting case occurs for maps. The maps in the output container are
+ * <b>not</b> the same as those used internally. Since a map column can contain
+ * either one list of columns or another, the internal and external maps must
+ * differ. The set of child vectors (except for child maps) are shared.
+ */
+
+public class VectorContainerBuilder {
+
+ /**
+ * Drill vector containers and maps are both tuples, but they irritatingly
+ * have completely different APIs for working with their child vectors.
+ * This class acts as a proxy to wrap the two APIs to provide a common
+ * view for the use of the container builder.
+ */
+
+ public static abstract class TupleProxy {
+ protected TupleMetadata schema;
+
+ public TupleProxy(TupleMetadata schema) {
+ this.schema = schema;
+ }
+
+ protected abstract int size();
+ protected abstract ValueVector vector(int index);
+ protected abstract void add(ValueVector vector);
+
+ protected TupleProxy mapProxy(int index) {
+ return new MapProxy(
+ schema.metadata(index).mapSchema(),
+ (AbstractMapVector) vector(index));
+ }
+ }
+
+ /**
+ * Proxy wrapper class for a vector container.
+ */
+
+ protected static class ContainerProxy extends TupleProxy {
+
+ private VectorContainer container;
+
+ protected ContainerProxy(TupleMetadata schema, VectorContainer container) {
+ super(schema);
+ this.container = container;
+ }
+
+ @Override
+ protected int size() {
+ return container.getNumberOfColumns();
+ }
+
+ @Override
+ protected ValueVector vector(int index) {
+ return container.getValueVector(index).getValueVector();
+ }
+
+ @Override
+ protected void add(ValueVector vector) {
+ container.add(vector);
+ }
+ }
+
+ /**
+ * Proxy wrapper for a map container.
+ */
+
+ protected static class MapProxy extends TupleProxy {
+
+ private AbstractMapVector mapVector;
+
+ protected MapProxy(TupleMetadata schema, AbstractMapVector mapVector) {
+ super(schema);
+ this.mapVector = mapVector;
+ }
+
+ @Override
+ protected int size() {
+ return mapVector.size();
+ }
+
+ @Override
+ protected ValueVector vector(int index) {
+ return mapVector.getChildByOrdinal(index);
+ }
+
+ @Override
+ protected void add(ValueVector vector) {
+ mapVector.putChild(vector.getField().getName(), vector);
+ }
+ }
+
+ private final ResultSetLoaderImpl resultSetLoader;
+ private int outputSchemaVersion = -1;
+ private TupleMetadata schema;
+ private VectorContainer container;
+
+ public VectorContainerBuilder(ResultSetLoaderImpl rsLoader) {
+ this.resultSetLoader = rsLoader;
+ container = new VectorContainer(rsLoader.allocator);
+ schema = new TupleSchema();
+ }
+
+ public void update(int targetVersion) {
+ if (outputSchemaVersion >= targetVersion) {
+ return;
+ }
+ outputSchemaVersion = targetVersion;
+ updateTuple(resultSetLoader.rootState(), new ContainerProxy(schema, container));
+ container.buildSchema(SelectionVectorMode.NONE);
+ }
+
+ public VectorContainer container() { return container; }
+
+ public int outputSchemaVersion() { return outputSchemaVersion; }
+
+ public BufferAllocator allocator() {
+ return resultSetLoader.allocator();
+ }
+
+ private void updateTuple(TupleState sourceModel, TupleProxy destProxy) {
+ int prevCount = destProxy.size();
+ List<ColumnState> cols = sourceModel.columns();
+ int currentCount = cols.size();
+
+ // Scan any existing maps for column additions
+
+ for (int i = 0; i < prevCount; i++) {
+ ColumnState colState = cols.get(i);
+ if (! colState.schema().isProjected()) {
+ continue;
+ }
+ if (colState.schema().isMap()) {
+ updateTuple((TupleState) ((BaseMapColumnState) colState).mapState(), destProxy.mapProxy(i));
+ }
+ }
+
+ // Add new columns, which may be maps
+
+ for (int i = prevCount; i < currentCount; i++) {
+ ColumnState colState = cols.get(i);
+ if (! colState.schema().isProjected()) {
+ continue;
+ }
+
+ // If the column was added after the output schema version cutoff,
+ // skip that column for now.
+
+ if (colState.addVersion > outputSchemaVersion) {
+ break;
+ }
+ if (colState.schema().isMap()) {
+ buildMap(destProxy, (BaseMapColumnState) colState);
+ } else {
+ destProxy.add(colState.vector());
+ destProxy.schema.addColumn(colState.schema());
+ assert destProxy.size() == destProxy.schema.size();
+ }
+ }
+ }
+
+ @SuppressWarnings("resource")
+ private void buildMap(TupleProxy parentTuple, BaseMapColumnState colModel) {
+
+ // Creating the map vector will create its contained vectors if we
+ // give it a materialized field with children. So, instead pass a clone
+ // without children so we can add them.
+
+ ColumnMetadata mapColSchema = colModel.schema().cloneEmpty();
+
+ // Don't get the map vector from the vector cache. Map vectors may
+ // have content that varies from batch to batch. Only the leaf
+ // vectors can be cached.
+
+ AbstractMapVector mapVector;
+ if (mapColSchema.isArray()) {
+
+ // A repeated map shares an offset vector with the internal
+ // repeated map.
+
+ UInt4Vector offsets = (UInt4Vector) colModel.vector();
+ mapVector = new RepeatedMapVector(mapColSchema.schema(), offsets, null);
+ } else {
+ mapVector = new MapVector(mapColSchema.schema(), allocator(), null);
+ }
+
+ // Add the map vector and schema to the parent tuple
+
+ parentTuple.add(mapVector);
+ int index = parentTuple.schema.addColumn(mapColSchema);
+ assert parentTuple.size() == parentTuple.size();
+
+ // Update the tuple, which will add the new columns in the map
+
+ updateTuple(colModel.mapState(), parentTuple.mapProxy(index));
+ }
+
+ public TupleMetadata schema() { return schema; }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java
new file mode 100644
index 0000000..4a1c698
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+
+/**
+ * Handles batch and overflow operation for a (possibly compound) vector.
+ * <p>
+ * The data model is the following:
+ * <ul>
+ * <li>Column model<ul>
+ * <li>Value vector itself</li>
+ * <li>Column writer</li>
+ * <li>Column schema</li>
+ * <li>Column coordinator (this class)</li>
+ * </ul></li></ul>
+ * The vector state coordinates events between the result set loader
+ * on the one side and the vectors, writers and schema on the other.
+ * For example:
+ * <pre><code>
+ * Result Set Vector
+ * Loader <--> State <--> Vectors
+ * </code></pre>
+ * Events from the row set loader deal with allocation, roll-over,
+ * harvesting completed batches and so on. Events from the writer,
+ * via the tuple model deal with adding columns and column
+ * overflow.
+ */
+
+public interface VectorState {
+
+ /**
+ * Allocate a new vector with the number of elements given. If the vector
+ * is an array, then the cardinality given is the number of arrays.
+ * @param cardinality number of elements desired in the allocated
+ * vector
+ *
+ * @return the number of bytes allocated
+ */
+
+ int allocate(int cardinality);
+
+ /**
+ * A vector has overflowed. Create a new look-ahead vector of the given
+ * cardinality, then copy the overflow values from the main vector to the
+ * look-ahead vector.
+ *
+ * @param cardinality the number of elements in the new vector. If this
+ * vector is an array, then this is the number of arrays
+ * @return the new next write position for the vector index associated
+ * with the writer for this vector
+ */
+
+ void rollover(int cardinality);
+
+ /**
+ * A batch is being harvested after an overflow. Put the full batch
+ * back into the main vector so it can be harvested.
+ */
+
+ void harvestWithLookAhead();
+
+ /**
+ * A new batch is starting while an look-ahead vector exists. Move
+ * the look-ahead buffers into the main vector to prepare for writing
+ * the rest of the batch.
+ */
+
+ void startBatchWithLookAhead();
+
+ /**
+ * Clear the vector(s) associated with this state.
+ */
+
+ void reset();
+
+ /**
+ * Underlying vector: the one presented to the consumer of the
+ * result set loader.
+ */
+
+ ValueVector vector();
+
+ void dump(HierarchicalFormatter format);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
new file mode 100644
index 0000000..2158dd1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.rowSet.impl;
+
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+
+/**
+ * Writer index that points to each row in the row set. The index starts at
+ * the 0th row and advances one row on each increment. This allows writers to
+ * start positioned at the first row. Writes happen in the current row.
+ * Calling <tt>next()</tt> advances to the next position, effectively saving
+ * the current row. The most recent row can be abandoned easily simply by not
+ * calling <tt>next()</tt>. This means that the number of completed rows is
+ * the same as the row index.
+ * <p>
+ * The writer index enforces the row count limit for a new batch. The
+ * limit is set by the result set loader and can vary from batch to batch
+ * if the client chooses in order to adjust the row count based on actual
+ * data size.
+ */
+
+class WriterIndexImpl implements ColumnWriterIndex {
+
+ private final ResultSetLoader rsLoader;
+ private int rowIndex = 0;
+
+ public WriterIndexImpl(ResultSetLoader rsLoader) {
+ this.rsLoader = rsLoader;
+ }
+
+ @Override
+ public int vectorIndex() { return rowIndex; }
+
+ @Override
+ public int rowStartIndex() { return rowIndex; }
+
+ public boolean next() {
+ if (++rowIndex < rsLoader.targetRowCount()) {
+ return true;
+ } else {
+ // Should not call next() again once batch is full.
+ rowIndex = rsLoader.targetRowCount();
+ return false;
+ }
+ }
+
+ public int size() {
+
+ // The index always points to the next slot past the
+ // end of valid rows.
+
+ return rowIndex;
+ }
+
+ public boolean valid() { return rowIndex < rsLoader.targetRowCount(); }
+
+ @Override
+ public void rollover() {
+
+ // The top level index always rolls over to 0 --
+ // the first row position in the new vectors.
+
+ reset();
+ }
+
+ public void reset() { rowIndex = 0; }
+
+ @Override
+ public void nextElement() { }
+
+ @Override
+ public ColumnWriterIndex outerIndex() { return null; }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("[")
+ .append(getClass().getSimpleName())
+ .append(" rowIndex = ")
+ .append(rowIndex)
+ .append("]")
+ .toString();
+ }
+}