You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/12/21 05:19:36 UTC
[09/15] drill git commit: DRILL-5657: Size-aware vector writer
structure
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
index 030f95a..e1e18dc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java
@@ -17,23 +17,13 @@
*/
package org.apache.drill.test.rowSet;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval;
+import org.apache.drill.exec.physical.rowSet.model.single.BaseReaderBuilder;
+import org.apache.drill.exec.record.TupleMetadata;
import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
-import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
-import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
-import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn;
-import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
/**
* Base class for row sets backed by a single record batch.
@@ -41,151 +31,27 @@ import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
public abstract class AbstractSingleRowSet extends AbstractRowSet implements SingleRowSet {
- /**
- * Internal helper class to organize a set of value vectors for use by the
- * row set class. Subclasses either build vectors from a schema, or map an
- * existing vector container into the row set structure. The row set
- * structure is based on a flattened structure; all vectors appear in
- * a single vector array. Maps are set aside in a separate map list.
- */
-
- public abstract static class StructureBuilder {
- protected final PhysicalSchema schema;
- protected final BufferAllocator allocator;
- protected final ValueVector[] valueVectors;
- protected final MapVector[] mapVectors;
- protected int vectorIndex;
- protected int mapIndex;
-
- public StructureBuilder(BufferAllocator allocator, RowSetSchema schema) {
- this.allocator = allocator;
- this.schema = schema.physical();
- FlattenedSchema flatSchema = schema.flatAccess();
- valueVectors = new ValueVector[flatSchema.count()];
- if (flatSchema.mapCount() == 0) {
- mapVectors = null;
- } else {
- mapVectors = new MapVector[flatSchema.mapCount()];
- }
- }
- }
-
- /**
- * Create a set of value vectors given a schema, then map them into both
- * the value container and the row set structure.
- */
-
- public static class VectorBuilder extends StructureBuilder {
-
- public VectorBuilder(BufferAllocator allocator, RowSetSchema schema) {
- super(allocator, schema);
- }
-
- public ValueVector[] buildContainer(VectorContainer container) {
- for (int i = 0; i < schema.count(); i++) {
- LogicalColumn colSchema = schema.column(i);
- @SuppressWarnings("resource")
- ValueVector v = TypeHelper.getNewVector(colSchema.field, allocator, null);
- container.add(v);
- if (colSchema.field.getType().getMinorType() == MinorType.MAP) {
- MapVector mv = (MapVector) v;
- mapVectors[mapIndex++] = mv;
- buildMap(mv, colSchema.mapSchema);
- } else {
- valueVectors[vectorIndex++] = v;
- }
- }
- container.buildSchema(SelectionVectorMode.NONE);
- return valueVectors;
- }
-
- private void buildMap(MapVector mapVector, PhysicalSchema mapSchema) {
- for (int i = 0; i < mapSchema.count(); i++) {
- LogicalColumn colSchema = mapSchema.column(i);
- MajorType type = colSchema.field.getType();
- Class<? extends ValueVector> vectorClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
- @SuppressWarnings("resource")
- ValueVector v = mapVector.addOrGet(colSchema.field.getName(), type, vectorClass);
- if (type.getMinorType() == MinorType.MAP) {
- MapVector mv = (MapVector) v;
- mapVectors[mapIndex++] = mv;
- buildMap(mv, colSchema.mapSchema);
- } else {
- valueVectors[vectorIndex++] = v;
- }
- }
- }
- }
-
- /**
- * Build a row set given an existing vector container. In this case,
- * the vectors exist and we simply need to pull them out of the container
- * and maps and put them into the row set arrays.
- */
-
- public static class VectorMapper extends StructureBuilder {
-
- public VectorMapper(BufferAllocator allocator, RowSetSchema schema) {
- super(allocator, schema);
- }
+ public static class RowSetReaderBuilder extends BaseReaderBuilder {
- public ValueVector[] mapContainer(VectorContainer container) {
- for (VectorWrapper<?> w : container) {
- @SuppressWarnings("resource")
- ValueVector v = w.getValueVector();
- if (v.getField().getType().getMinorType() == MinorType.MAP) {
- MapVector mv = (MapVector) v;
- mapVectors[mapIndex++] = mv;
- buildMap(mv);
- } else {
- valueVectors[vectorIndex++] = v;
- }
- }
- return valueVectors;
+ public RowSetReader buildReader(AbstractSingleRowSet rowSet, ReaderIndex rowIndex) {
+ TupleMetadata schema = rowSet.schema();
+ return new RowSetReaderImpl(schema, rowIndex,
+ buildContainerChildren(rowSet.container(),
+ new MetadataRetrieval(schema)));
}
-
- private void buildMap(MapVector mapVector) {
- for (ValueVector v : mapVector) {
- if (v.getField().getType().getMinorType() == MinorType.MAP) {
- MapVector mv = (MapVector) v;
- mapVectors[mapIndex++] = mv;
- buildMap(mv);
- } else {
- valueVectors[vectorIndex++] = v;
- }
- }
- }
- }
-
- /**
- * Flattened representation of value vectors using a depth-first
- * traversal of maps. Order of vectors here correspond to the column
- * indexes used to access columns in a reader or writer.
- */
-
- protected final ValueVector[] valueVectors;
-
- public AbstractSingleRowSet(BufferAllocator allocator, BatchSchema schema) {
- super(allocator, schema, new VectorContainer());
- valueVectors = new VectorBuilder(allocator, super.schema).buildContainer(container);
- }
-
- public AbstractSingleRowSet(BufferAllocator allocator, VectorContainer container) {
- super(allocator, container.getSchema(), container);
- valueVectors = new VectorMapper(allocator, super.schema).mapContainer(container);
}
public AbstractSingleRowSet(AbstractSingleRowSet rowSet) {
- super(rowSet.allocator, rowSet.schema.batch(), rowSet.container);
- valueVectors = rowSet.valueVectors;
+ super(rowSet.container, rowSet.schema);
}
- @Override
- public ValueVector[] vectors() { return valueVectors; }
+ public AbstractSingleRowSet(VectorContainer container, TupleMetadata schema) {
+ super(container, schema);
+ }
@Override
public long size() {
- RecordBatchSizer sizer = new RecordBatchSizer(container);
+ RecordBatchSizer sizer = new RecordBatchSizer(container());
return sizer.actualSize();
}
@@ -197,21 +63,7 @@ public abstract class AbstractSingleRowSet extends AbstractRowSet implements Sin
* (non-map) vectors.
*/
- protected RowSetReader buildReader(RowSetIndex rowIndex) {
- FlattenedSchema accessSchema = schema().flatAccess();
- ValueVector[] valueVectors = vectors();
- AbstractColumnReader[] readers = new AbstractColumnReader[valueVectors.length];
- for (int i = 0; i < readers.length; i++) {
- MinorType type = accessSchema.column(i).getType().getMinorType();
- if (type == MinorType.MAP) {
- readers[i] = null; // buildMapAccessor(i);
- } else if (type == MinorType.LIST) {
- readers[i] = null; // buildListAccessor(i);
- } else {
- readers[i] = ColumnAccessorFactory.newReader(valueVectors[i].getField().getType());
- readers[i].bind(rowIndex, valueVectors[i]);
- }
- }
- return new RowSetReaderImpl(accessSchema, rowIndex, readers);
+ protected RowSetReader buildReader(ReaderIndex rowIndex) {
+ return new RowSetReaderBuilder().buildReader(this, rowIndex);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
index 29a1702..5972f05 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java
@@ -18,19 +18,21 @@
package org.apache.drill.test.rowSet;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
+import org.apache.drill.exec.physical.rowSet.model.SchemaInference;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval;
+import org.apache.drill.exec.physical.rowSet.model.single.BaseWriterBuilder;
+import org.apache.drill.exec.physical.rowSet.model.single.BuildVectorsFromMetadata;
+import org.apache.drill.exec.physical.rowSet.model.single.VectorAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnWriter;
-import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
-import org.apache.drill.exec.vector.accessor.impl.TupleWriterImpl;
import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSetWriterImpl.WriterIndexImpl;
/**
* Implementation of a single row set with no indirection (selection)
@@ -46,118 +48,54 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
* the first. (This is the JDBC RecordSet convention.)
*/
- private static class DirectRowIndex extends BoundedRowIndex {
+ private static class DirectRowIndex extends ReaderIndex {
public DirectRowIndex(int rowCount) {
super(rowCount);
}
@Override
- public int index() { return rowIndex; }
+ public int vectorIndex() { return rowIndex; }
@Override
- public int batch() { return 0; }
+ public int batchIndex() { return 0; }
}
- /**
- * Writer index that points to each row in the row set. The index starts at
- * the 0th row and advances one row on each increment. This allows writers to
- * start positioned at the first row. Writes happen in the current row.
- * Calling <tt>next()</tt> advances to the next position, effectively saving
- * the current row. The most recent row can be abandoned easily simply by not
- * calling <tt>next()</tt>. This means that the number of completed rows is
- * the same as the row index.
- */
-
- private static class ExtendableRowIndex extends RowSetIndex {
-
- private final int maxSize;
-
- public ExtendableRowIndex(int maxSize) {
- this.maxSize = maxSize;
- rowIndex = 0;
- }
+ public static class RowSetWriterBuilder extends BaseWriterBuilder {
- @Override
- public int index() { return rowIndex; }
-
- @Override
- public boolean next() {
- if (++rowIndex <= maxSize ) {
- return true;
- } else {
- rowIndex--;
- return false;
- }
+ public RowSetWriter buildWriter(DirectRowSet rowSet) {
+ WriterIndexImpl index = new WriterIndexImpl();
+ TupleMetadata schema = rowSet.schema();
+ RowSetWriterImpl writer = new RowSetWriterImpl(rowSet, schema, index,
+ buildContainerChildren(rowSet.container(),
+ new MetadataRetrieval(schema)));
+ return writer;
}
-
- @Override
- public int size() { return rowIndex; }
-
- @Override
- public boolean valid() { return rowIndex < maxSize; }
-
- @Override
- public int batch() { return 0; }
}
- /**
- * Implementation of a row set writer. Only available for newly-created,
- * empty, direct, single row sets. Rewriting is not allowed, nor is writing
- * to a hyper row set.
- */
-
- public class RowSetWriterImpl extends TupleWriterImpl implements RowSetWriter {
-
- private final ExtendableRowIndex index;
- private final ExtendableRowSet rowSet;
-
- protected RowSetWriterImpl(ExtendableRowSet rowSet, TupleSchema schema, ExtendableRowIndex index, AbstractColumnWriter[] writers) {
- super(schema, writers);
- this.rowSet = rowSet;
- this.index = index;
- start();
- }
-
- @Override
- public void setRow(Object...values) {
- if (! index.valid()) {
- throw new IndexOutOfBoundsException("Write past end of row set");
- }
- for (int i = 0; i < values.length; i++) {
- set(i, values[i]);
- }
- save();
- }
-
- @Override
- public boolean valid() { return index.valid(); }
-
- @Override
- public int index() { return index.position(); }
+ private DirectRowSet(VectorContainer container, TupleMetadata schema) {
+ super(container, schema);
+ }
- @Override
- public void save() {
- index.next();
- start();
- }
+ public DirectRowSet(AbstractSingleRowSet from) {
+ super(from);
+ }
- @Override
- public void done() {
- rowSet.setRowCount(index.size());
- }
+ public static DirectRowSet fromSchema(BufferAllocator allocator, BatchSchema schema) {
+ return fromSchema(allocator, TupleSchema.fromFields(schema));
}
- public DirectRowSet(BufferAllocator allocator, BatchSchema schema) {
- super(allocator, schema);
+ public static DirectRowSet fromSchema(BufferAllocator allocator, TupleMetadata schema) {
+ BuildVectorsFromMetadata builder = new BuildVectorsFromMetadata(allocator);
+ return new DirectRowSet(builder.build(schema), schema);
}
- public DirectRowSet(BufferAllocator allocator, VectorContainer container) {
- super(allocator, container);
+ public static DirectRowSet fromContainer(VectorContainer container) {
+ return new DirectRowSet(container, new SchemaInference().infer(container));
}
- public DirectRowSet(BufferAllocator allocator, VectorAccessible va) {
- super(allocator, toContainer(va, allocator));
+ public static DirectRowSet fromVectorAccessible(BufferAllocator allocator, VectorAccessible va) {
+ return fromContainer(toContainer(va, allocator));
}
private static VectorContainer toContainer(VectorAccessible va, BufferAllocator allocator) {
@@ -168,16 +106,8 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
}
@Override
- public void allocate(int recordCount) {
- for (final ValueVector v : valueVectors) {
- AllocationHelper.allocate(v, recordCount, 50, 10);
- }
- }
-
- @Override
- public void setRowCount(int rowCount) {
- container.setRecordCount(rowCount);
- VectorAccessibleUtilities.setValueCount(container, rowCount);
+ public void allocate(int rowCount) {
+ new VectorAllocator(container()).allocate(rowCount, schema());
}
@Override
@@ -187,29 +117,11 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
@Override
public RowSetWriter writer(int initialRowCount) {
- if (container.hasRecordCount()) {
+ if (container().hasRecordCount()) {
throw new IllegalStateException("Row set already contains data");
}
allocate(initialRowCount);
- return buildWriter(new ExtendableRowIndex(Character.MAX_VALUE));
- }
-
- /**
- * Build writer objects for each column based on the column type.
- *
- * @param rowIndex the index which points to each row
- * @return an array of writers
- */
-
- protected RowSetWriter buildWriter(ExtendableRowIndex rowIndex) {
- ValueVector[] valueVectors = vectors();
- AbstractColumnWriter[] writers = new AbstractColumnWriter[valueVectors.length];
- for (int i = 0; i < writers.length; i++) {
- writers[i] = ColumnAccessorFactory.newWriter(valueVectors[i].getField().getType());
- writers[i].bind(rowIndex, valueVectors[i]);
- }
- TupleSchema accessSchema = schema().hierarchicalAccess();
- return new RowSetWriterImpl(this, accessSchema, rowIndex, writers);
+ return new RowSetWriterBuilder().buildWriter(this);
}
@Override
@@ -233,9 +145,4 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS
@Override
public SelectionVector2 getSv2() { return null; }
-
- @Override
- public RowSet merge(RowSet other) {
- return new DirectRowSet(allocator, container().merge(other.container()));
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
index afc2e6e..8a3db9f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
@@ -17,27 +17,14 @@
*/
package org.apache.drill.test.rowSet;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval;
+import org.apache.drill.exec.physical.rowSet.model.SchemaInference;
+import org.apache.drill.exec.physical.rowSet.model.hyper.BaseReaderBuilder;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.HyperVectorWrapper;
-import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TupleMetadata;
import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.AccessorUtilities;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader;
-import org.apache.drill.exec.vector.accessor.impl.AbstractColumnReader.VectorAccessor;
-import org.apache.drill.exec.vector.accessor.impl.ColumnAccessorFactory;
-import org.apache.drill.exec.vector.complex.AbstractMapVector;
import org.apache.drill.test.rowSet.RowSet.HyperRowSet;
-import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
-import org.apache.drill.test.rowSet.RowSetSchema.LogicalColumn;
-import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
/**
* Implements a row set wrapper around a collection of "hyper vectors."
@@ -52,176 +39,14 @@ import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
- /**
- * Read-only row index into the hyper row set with batch and index
- * values mapping via an SV4.
- */
-
- public static class HyperRowIndex extends BoundedRowIndex {
-
- private final SelectionVector4 sv4;
-
- public HyperRowIndex(SelectionVector4 sv4) {
- super(sv4.getCount());
- this.sv4 = sv4;
- }
-
- @Override
- public int index() {
- return AccessorUtilities.sv4Index(sv4.get(rowIndex));
- }
-
- @Override
- public int batch( ) {
- return AccessorUtilities.sv4Batch(sv4.get(rowIndex));
- }
- }
-
- /**
- * Vector accessor used by the column accessors to obtain the vector for
- * each column value. That is, position 0 might be batch 4, index 3,
- * while position 1 might be batch 1, index 7, and so on.
- */
-
- public static class HyperVectorAccessor implements VectorAccessor {
+ public static class RowSetReaderBuilder extends BaseReaderBuilder {
- private final HyperRowIndex rowIndex;
- private final ValueVector[] vectors;
-
- public HyperVectorAccessor(HyperVectorWrapper<ValueVector> hvw, HyperRowIndex rowIndex) {
- this.rowIndex = rowIndex;
- vectors = hvw.getValueVectors();
- }
-
- @Override
- public ValueVector vector() {
- return vectors[rowIndex.batch()];
- }
- }
-
- /**
- * Build a hyper row set by restructuring a hyper vector bundle into a uniform
- * shape. Consider this schema: <pre><code>
- * { a: 10, b: { c: 20, d: { e: 30 } } }</code></pre>
- * <p>
- * The hyper container, with two batches, has this structure:
- * <table border="1">
- * <tr><th>Batch</th><th>a</th><th>b</th></tr>
- * <tr><td>0</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th>
- * <tr><td>1</td><td>Int vector</td><td>Map Vector(Int vector, Map Vector(Int vector))</td></th>
- * </table>
- * <p>
- * The above table shows that top-level scalar vectors (such as the Int Vector for column
- * a) appear "end-to-end" as a hyper-vector. Maps also appear end-to-end. But, the
- * contents of the map (column c) do not appear end-to-end. Instead, they appear as
- * contents in the map vector. To get to c, one indexes into the map vector, steps inside
- * the map to find c and indexes to the right row.
- * <p>
- * Similarly, the maps for d do not appear end-to-end, one must step to the right batch
- * in b, then step to d.
- * <p>
- * Finally, to get to e, one must step
- * into the hyper vector for b, then steps to the proper batch, steps to d, step to e
- * and finally step to the row within e. This is a very complex, costly indexing scheme
- * that differs depending on map nesting depth.
- * <p>
- * To simplify access, this class restructures the maps to flatten the scalar vectors
- * into end-to-end hyper vectors. For example, for the above:
- * <p>
- * <table border="1">
- * <tr><th>Batch</th><th>a</th><th>c</th><th>d</th></tr>
- * <tr><td>0</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th>
- * <tr><td>1</td><td>Int vector</td><td>Int vector</td><td>Int vector</td></th>
- * </table>
- *
- * The maps are still available as hyper vectors, but separated into map fields.
- * (Scalar access no longer needs to access the maps.) The result is a uniform
- * addressing scheme for both top-level and nested vectors.
- */
-
- public static class HyperVectorBuilder {
-
- protected final HyperVectorWrapper<?> valueVectors[];
- protected final HyperVectorWrapper<AbstractMapVector> mapVectors[];
- private final List<ValueVector> nestedScalars[];
- private int vectorIndex;
- private int mapIndex;
- private final PhysicalSchema physicalSchema;
-
- @SuppressWarnings("unchecked")
- public HyperVectorBuilder(RowSetSchema schema) {
- physicalSchema = schema.physical();
- FlattenedSchema flatSchema = schema.flatAccess();
- valueVectors = new HyperVectorWrapper<?>[schema.hierarchicalAccess().count()];
- if (flatSchema.mapCount() == 0) {
- mapVectors = null;
- nestedScalars = null;
- } else {
- mapVectors = (HyperVectorWrapper<AbstractMapVector>[])
- new HyperVectorWrapper<?>[flatSchema.mapCount()];
- nestedScalars = new ArrayList[flatSchema.count()];
- }
- }
-
- @SuppressWarnings("unchecked")
- public HyperVectorWrapper<ValueVector>[] mapContainer(VectorContainer container) {
- int i = 0;
- for (VectorWrapper<?> w : container) {
- HyperVectorWrapper<?> hvw = (HyperVectorWrapper<?>) w;
- if (w.getField().getType().getMinorType() == MinorType.MAP) {
- HyperVectorWrapper<AbstractMapVector> mw = (HyperVectorWrapper<AbstractMapVector>) hvw;
- mapVectors[mapIndex++] = mw;
- buildHyperMap(physicalSchema.column(i).mapSchema(), mw);
- } else {
- valueVectors[vectorIndex++] = hvw;
- }
- i++;
- }
- if (nestedScalars != null) {
- buildNestedHyperVectors();
- }
- return (HyperVectorWrapper<ValueVector>[]) valueVectors;
- }
-
- private void buildHyperMap(PhysicalSchema mapSchema, HyperVectorWrapper<AbstractMapVector> mapWrapper) {
- createHyperVectors(mapSchema);
- for (AbstractMapVector mapVector : mapWrapper.getValueVectors()) {
- buildMap(mapSchema, mapVector);
- }
- }
-
- private void buildMap(PhysicalSchema mapSchema, AbstractMapVector mapVector) {
- for (ValueVector v : mapVector) {
- LogicalColumn col = mapSchema.column(v.getField().getName());
- if (col.isMap()) {
- buildMap(col.mapSchema, (AbstractMapVector) v);
- } else {
- nestedScalars[col.accessIndex()].add(v);
- }
- }
- }
-
- private void createHyperVectors(PhysicalSchema mapSchema) {
- for (int i = 0; i < mapSchema.count(); i++) {
- LogicalColumn col = mapSchema.column(i);
- if (col.isMap()) {
- createHyperVectors(col.mapSchema);
- } else {
- nestedScalars[col.accessIndex()] = new ArrayList<ValueVector>();
- }
- }
- }
-
- private void buildNestedHyperVectors() {
- for (int i = 0; i < nestedScalars.length; i++) {
- if (nestedScalars[i] == null) {
- continue;
- }
- ValueVector vectors[] = new ValueVector[nestedScalars[i].size()];
- nestedScalars[i].toArray(vectors);
- assert valueVectors[i] == null;
- valueVectors[i] = new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors, false);
- }
+ public RowSetReader buildReader(HyperRowSet rowSet, SelectionVector4 sv4) {
+ TupleMetadata schema = rowSet.schema();
+ HyperRowIndex rowIndex = new HyperRowIndex(sv4);
+ return new RowSetReaderImpl(schema, rowIndex,
+ buildContainerChildren(rowSet.container(),
+ new MetadataRetrieval(schema)));
}
}
@@ -231,18 +56,9 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
private final SelectionVector4 sv4;
- /**
- * Collection of hyper vectors in flattened order: a left-to-right,
- * depth first ordering of vectors in maps. Order here corresponds to
- * the order used for column indexes in the row set reader.
- */
-
- private final HyperVectorWrapper<ValueVector> hvw[];
-
- public HyperRowSetImpl(BufferAllocator allocator, VectorContainer container, SelectionVector4 sv4) {
- super(allocator, container.getSchema(), container);
+ public HyperRowSetImpl(VectorContainer container, SelectionVector4 sv4) {
+ super(container, new SchemaInference().infer(container));
this.sv4 = sv4;
- hvw = new HyperVectorBuilder(schema).mapContainer(container);
}
@Override
@@ -252,33 +68,8 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
public boolean isWritable() { return false; }
@Override
- public RowSetWriter writer() {
- throw new UnsupportedOperationException("Cannot write to a hyper vector");
- }
-
- @Override
public RowSetReader reader() {
- return buildReader(new HyperRowIndex(sv4));
- }
-
- /**
- * Internal method to build the set of column readers needed for
- * this row set. Used when building a row set reader.
- * @param rowIndex object that points to the current row
- * @return an array of column readers: in the same order as the
- * (non-map) vectors.
- */
-
- protected RowSetReader buildReader(HyperRowIndex rowIndex) {
- FlattenedSchema accessSchema = schema().flatAccess();
- AbstractColumnReader readers[] = new AbstractColumnReader[accessSchema.count()];
- for (int i = 0; i < readers.length; i++) {
- MaterializedField field = accessSchema.column(i);
- readers[i] = ColumnAccessorFactory.newReader(field.getType());
- HyperVectorWrapper<ValueVector> hvw = getHyperVector(i);
- readers[i].bind(rowIndex, field, new HyperVectorAccessor(hvw, rowIndex));
- }
- return new RowSetReaderImpl(accessSchema, rowIndex, readers);
+ return new RowSetReaderBuilder().buildReader(this, sv4);
}
@Override
@@ -288,13 +79,5 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
public SelectionVector4 getSv4() { return sv4; }
@Override
- public HyperVectorWrapper<ValueVector> getHyperVector(int i) { return hvw[i]; }
-
- @Override
public int rowCount() { return sv4.getCount(); }
-
- @Override
- public RowSet merge(RowSet other) {
- return new HyperRowSetImpl(allocator, container().merge(other.container()), sv4);
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index 1914705..e729bba 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -20,6 +20,8 @@ package org.apache.drill.test.rowSet;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
+import org.apache.drill.exec.physical.rowSet.model.SchemaInference;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -33,14 +35,14 @@ public class IndirectRowSet extends AbstractSingleRowSet {
/**
* Reader index that points to each row indirectly through the
- * selection vector. The {@link #index()} method points to the
+ * selection vector. The {@link #vectorIndex()} method points to the
* actual data row, while the {@link #position()} method gives
* the position relative to the indirection vector. That is,
* the position increases monotonically, but the index jumps
* around as specified by the indirection vector.
*/
- private static class IndirectRowIndex extends BoundedRowIndex {
+ private static class IndirectRowIndex extends ReaderIndex {
private final SelectionVector2 sv2;
@@ -50,21 +52,25 @@ public class IndirectRowSet extends AbstractSingleRowSet {
}
@Override
- public int index() { return sv2.getIndex(rowIndex); }
+ public int vectorIndex() { return sv2.getIndex(rowIndex); }
@Override
- public int batch() { return 0; }
+ public int batchIndex() { return 0; }
}
private final SelectionVector2 sv2;
- public IndirectRowSet(BufferAllocator allocator, VectorContainer container) {
- this(allocator, container, makeSv2(allocator, container));
+ private IndirectRowSet(VectorContainer container, SelectionVector2 sv2) {
+ super(container, new SchemaInference().infer(container));
+ this.sv2 = sv2;
}
- public IndirectRowSet(BufferAllocator allocator, VectorContainer container, SelectionVector2 sv2) {
- super(allocator, container);
- this.sv2 = sv2;
+ public static IndirectRowSet fromContainer(VectorContainer container) {
+ return new IndirectRowSet(container, makeSv2(container.getAllocator(), container));
+ }
+
+ public static IndirectRowSet fromSv2(VectorContainer container, SelectionVector2 sv2) {
+ return new IndirectRowSet(container, sv2);
}
private static SelectionVector2 makeSv2(BufferAllocator allocator, VectorContainer container) {
@@ -83,7 +89,7 @@ public class IndirectRowSet extends AbstractSingleRowSet {
public IndirectRowSet(DirectRowSet directRowSet) {
super(directRowSet);
- sv2 = makeSv2(allocator, container);
+ sv2 = makeSv2(allocator(), container());
}
@Override
@@ -96,11 +102,6 @@ public class IndirectRowSet extends AbstractSingleRowSet {
}
@Override
- public RowSetWriter writer() {
- throw new UnsupportedOperationException("Cannot write to an existing row set");
- }
-
- @Override
public RowSetReader reader() {
return buildReader(new IndirectRowIndex(getSv2()));
}
@@ -119,12 +120,7 @@ public class IndirectRowSet extends AbstractSingleRowSet {
@Override
public long size() {
- RecordBatchSizer sizer = new RecordBatchSizer(container, sv2);
+ RecordBatchSizer sizer = new RecordBatchSizer(container(), sv2);
return sizer.actualSize();
}
-
- @Override
- public RowSet merge(RowSet other) {
- return new IndirectRowSet(allocator, container().merge(other.container()), sv2);
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
index 474508c..f2435de 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java
@@ -20,25 +20,24 @@ package org.apache.drill.test.rowSet;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.TupleMetadata;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.ColumnReader;
-import org.apache.drill.exec.vector.accessor.ColumnWriter;
-import org.apache.drill.exec.vector.accessor.TupleReader;
-import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.accessor.ScalarReader;
+import org.apache.parquet.column.ColumnWriter;
/**
* A row set is a collection of rows stored as value vectors. Elsewhere in
* Drill we call this a "record batch", but that term has been overloaded to
- * mean the runtime implementation of an operator...
+ * mean the runtime implementation of an operator.
* <p>
* A row set encapsulates a set of vectors and provides access to Drill's
* various "views" of vectors: {@link VectorContainer},
- * {@link VectorAccessible}, etc.
+ * {@link VectorAccessible}, etc. The row set wraps a {#link TupleModel}
+ * which holds the vectors and column metadata. This form is optimized
+ * for easy use in testing; use other implementations for production code.
* <p>
* A row set is defined by a {@link RowSetSchema}. For testing purposes, a row
* set has a fixed schema; we don't allow changing the set of vectors
@@ -52,7 +51,7 @@ import org.apache.drill.exec.vector.accessor.TupleWriter;
* Drill provides a large number of vector (data) types. Each requires a
* type-specific way to set data. The row set writer uses a {@link ColumnWriter}
* to set each value in a way unique to the specific data type. Similarly, the
- * row set reader provides a {@link ColumnReader} interface. In both cases,
+ * row set reader provides a {@link ScalarReader} interface. In both cases,
* columns can be accessed by index number (as defined in the schema) or
* by name.
* <p>
@@ -78,54 +77,6 @@ import org.apache.drill.exec.vector.accessor.TupleWriter;
public interface RowSet {
- /**
- * Interface for writing values to a row set. Only available
- * for newly-created, single, direct row sets. Eventually, if
- * we want to allow updating a row set, we have to create a
- * new row set with the updated columns, then merge the new
- * and old row sets to create a new immutable row set.
- */
- interface RowSetWriter extends TupleWriter {
- void setRow(Object...values);
- boolean valid();
- int index();
- void save();
- void done();
- }
-
- /**
- * Reader for all types of row sets.
- */
- interface RowSetReader extends TupleReader {
-
- /**
- * Total number of rows in the row set.
- * @return total number of rows
- */
- int size();
-
- boolean next();
- int index();
- void set(int index);
-
- /**
- * Batch index: 0 for a single batch, batch for the current
- * row is a hyper-batch.
- * @return index of the batch for the current row
- */
- int batchIndex();
-
- /**
- * The index of the underlying row which may be indexed by an
- * Sv2 or Sv4.
- *
- * @return
- */
-
- int rowIndex();
- boolean valid();
- }
-
boolean isExtendable();
boolean isWritable();
@@ -136,13 +87,11 @@ public interface RowSet {
int rowCount();
- RowSetWriter writer();
-
RowSetReader reader();
void clear();
- RowSetSchema schema();
+ TupleMetadata schema();
BufferAllocator allocator();
@@ -157,17 +106,16 @@ public interface RowSet {
*
* @return memory size in bytes
*/
- long size();
- RowSet merge(RowSet other);
+ long size();
BatchSchema batchSchema();
/**
* Row set that manages a single batch of rows.
*/
- interface SingleRowSet extends RowSet {
- ValueVector[] vectors();
+
+ public interface SingleRowSet extends RowSet {
SingleRowSet toIndirect();
SelectionVector2 getSv2();
}
@@ -177,9 +125,10 @@ public interface RowSet {
* Once writing is complete, the row set becomes an
* immutable direct row set.
*/
+
interface ExtendableRowSet extends SingleRowSet {
void allocate(int recordCount);
- void setRowCount(int rowCount);
+ RowSetWriter writer();
RowSetWriter writer(int initialRowCount);
}
@@ -187,8 +136,8 @@ public interface RowSet {
* Row set comprised of multiple single row sets, along with
* an indirection vector (SV4).
*/
+
interface HyperRowSet extends RowSet {
SelectionVector4 getSv4();
- HyperVectorWrapper<ValueVector> getHyperVector(int i);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
index 6f9a8d9..7b1554c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
@@ -19,7 +19,10 @@ package org.apache.drill.test.rowSet;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.TupleSchema;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
/**
@@ -40,14 +43,20 @@ public final class RowSetBuilder {
private boolean withSv2;
public RowSetBuilder(BufferAllocator allocator, BatchSchema schema) {
+ this(allocator, TupleSchema.fromFields(schema), 10);
+ }
+
+ public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema) {
this(allocator, schema, 10);
}
- public RowSetBuilder(BufferAllocator allocator, BatchSchema schema, int capacity) {
- rowSet = new DirectRowSet(allocator, schema);
+ public RowSetBuilder(BufferAllocator allocator, TupleMetadata schema, int capacity) {
+ rowSet = DirectRowSet.fromSchema(allocator, schema);
writer = rowSet.writer(capacity);
}
+ public TupleWriter writer() { return writer; }
+
/**
* Add a new row using column values passed as variable-length arguments. Expects
* map values to be flattened. a schema of (a:int, b:map(c:varchar)) would be>
@@ -56,17 +65,18 @@ public final class RowSetBuilder {
* <tt>add(10, new int[] {100, 200});</tt><br>
* @param values column values in column index order
* @return this builder
- * @see {@link #addSingleCol(Object)} to create a row of a single column when
- * the value to <tt>add()</tt> is ambiguous
+ * @throws IllegalStateException if the batch, or any vector in the batch,
+ * becomes full. This method is designed to be used in tests where we will
+ * seldom create a full vector of data.
*/
- public RowSetBuilder add(Object...values) {
+ public RowSetBuilder addRow(Object...values) {
writer.setRow(values);
return this;
}
/**
- * The {@link #add(Object...)} method uses Java variable-length arguments to
+ * The {@link #addRow(Object...)} method uses Java variable-length arguments to
* pass a row of values. But, when the row consists of a single array, Java
* gets confused: is that an array for variable-arguments or is it the value
* of the first argument? This method clearly states that the single value
@@ -93,7 +103,7 @@ public final class RowSetBuilder {
*/
public RowSetBuilder addSingleCol(Object value) {
- return add(new Object[] { value });
+ return addRow(new Object[] { value });
}
/**
@@ -110,10 +120,10 @@ public final class RowSetBuilder {
}
public SingleRowSet build() {
- writer.done();
+ SingleRowSet result = writer.done();
if (withSv2) {
- return rowSet.toIndirect();
+ return result.toIndirect();
}
- return rowSet;
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
index 6e72923..1cae64f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
@@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.drill.exec.vector.accessor.ArrayReader;
-import org.apache.drill.exec.vector.accessor.ColumnReader;
-import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+import org.apache.drill.exec.vector.accessor.ObjectReader;
+import org.apache.drill.exec.vector.accessor.ScalarElementReader;
+import org.apache.drill.exec.vector.accessor.ScalarReader;
+import org.apache.drill.exec.vector.accessor.TupleReader;
import org.bouncycastle.util.Arrays;
import java.util.Comparator;
@@ -31,19 +33,48 @@ import java.util.Comparator;
* For testing, compare the contents of two row sets (record batches)
* to verify that they are identical. Supports masks to exclude certain
* columns from comparison.
+ * <p>
+ * Drill rows are analogous to JSON documents: they can have scalars,
+ * arrays and maps, with maps and lists holding maps, arrays and scalars.
+ * This class walks the row structure tree to compare each structure
+ * of two row sets checking counts, types and values to ensure that the
+ * "actual" result set (result of a test) matches the "expected" result
+ * set.
+ * <p>
+ * This class acts as an example of how to use the suite of reader
+ * abstractions.
*/
public class RowSetComparison {
+ /**
+ * Row set with the expected outcome of a test. This is the "golden"
+ * copy defined in the test itself.
+ */
private RowSet expected;
+ /**
+ * Some tests wish to ignore certain (top-level) columns. If a
+ * mask is provided, then only those columns with a <tt>true</tt>
+ * will be verified.
+ */
private boolean mask[];
+ /**
+ * Floats and doubles do not compare exactly. This delta is used
+ * by JUnit for such comparisons.
+ */
private double delta = 0.001;
+ /**
+ * Tests can skip the first n rows.
+ */
private int offset;
private int span = -1;
public RowSetComparison(RowSet expected) {
this.expected = expected;
- mask = new boolean[expected.schema().hierarchicalAccess().count()];
+
+ // TODO: The mask only works at the top level presently
+
+ mask = new boolean[expected.schema().size()];
for (int i = 0; i < mask.length; i++) {
mask[i] = true;
}
@@ -134,7 +165,8 @@ public class RowSetComparison {
for (int i = 0; i < testLength; i++) {
er.next();
ar.next();
- verifyRow(er, ar);
+ String label = Integer.toString(er.index() + 1);
+ verifyRow(label, er, ar);
}
}
@@ -167,22 +199,50 @@ public class RowSetComparison {
}
}
- private void verifyRow(RowSetReader er, RowSetReader ar) {
+ private void verifyRow(String label, TupleReader er, TupleReader ar) {
+ String prefix = label + ":";
for (int i = 0; i < mask.length; i++) {
if (! mask[i]) {
continue;
}
- ColumnReader ec = er.column(i);
- ColumnReader ac = ar.column(i);
- String label = (er.index() + 1) + ":" + i;
- assertEquals(label, ec.valueType(), ac.valueType());
- if (ec.isNull()) {
- assertTrue(label + " - column not null", ac.isNull());
- continue;
- }
- if (! ec.isNull()) {
- assertTrue(label + " - column is null", ! ac.isNull());
- }
+ verifyColumn(prefix + i, er.column(i), ar.column(i));
+ }
+ }
+
+ private void verifyColumn(String label, ObjectReader ec, ObjectReader ac) {
+ assertEquals(label, ec.type(), ac.type());
+ switch (ec.type()) {
+ case ARRAY:
+ verifyArray(label, ec.array(), ac.array());
+ break;
+ case SCALAR:
+ verifyScalar(label, ec.scalar(), ac.scalar());
+ break;
+ case TUPLE:
+ verifyTuple(label, ec.tuple(), ac.tuple());
+ break;
+ default:
+ throw new IllegalStateException( "Unexpected type: " + ec.type());
+ }
+ }
+
+ private void verifyTuple(String label, TupleReader er, TupleReader ar) {
+ assertEquals(label + " - tuple count", er.columnCount(), ar.columnCount());
+ String prefix = label + ":";
+ for (int i = 0; i < er.columnCount(); i++) {
+ verifyColumn(prefix + i, er.column(i), ar.column(i));
+ }
+ }
+
+ private void verifyScalar(String label, ScalarReader ec, ScalarReader ac) {
+ assertEquals(label + " - value type", ec.valueType(), ac.valueType());
+ if (ec.isNull()) {
+ assertTrue(label + " - column not null", ac.isNull());
+ return;
+ }
+ if (! ec.isNull()) {
+ assertTrue(label + " - column is null", ! ac.isNull());
+ }
switch (ec.valueType()) {
case BYTES: {
byte expected[] = ac.getBytes();
@@ -209,24 +269,42 @@ public class RowSetComparison {
case PERIOD:
assertEquals(label, ec.getPeriod(), ac.getPeriod());
break;
- case ARRAY:
- verifyArray(label, ec.array(), ac.array());
- break;
default:
throw new IllegalStateException( "Unexpected type: " + ec.valueType());
- }
}
}
- private void verifyArray(String colLabel, ArrayReader ea,
+ private void verifyArray(String label, ArrayReader ea,
ArrayReader aa) {
+ assertEquals(label, ea.entryType(), aa.entryType());
+ assertEquals(label, ea.size(), aa.size());
+ switch (ea.entryType()) {
+ case ARRAY:
+ throw new UnsupportedOperationException();
+ case SCALAR:
+ verifyScalarArray(label, ea.elements(), aa.elements());
+ break;
+ case TUPLE:
+ verifyTupleArray(label, ea, aa);
+ break;
+ default:
+ throw new IllegalStateException( "Unexpected type: " + ea.entryType());
+ }
+ }
+
+ private void verifyTupleArray(String label, ArrayReader ea, ArrayReader aa) {
+ for (int i = 0; i < ea.size(); i++) {
+ verifyTuple(label + "[" + i + "]", ea.tuple(i), aa.tuple(i));
+ }
+ }
+
+ private void verifyScalarArray(String colLabel, ScalarElementReader ea,
+ ScalarElementReader aa) {
assertEquals(colLabel, ea.valueType(), aa.valueType());
assertEquals(colLabel, ea.size(), aa.size());
for (int i = 0; i < ea.size(); i++) {
String label = colLabel + "[" + i + "]";
switch (ea.valueType()) {
- case ARRAY:
- throw new IllegalStateException("Arrays of arrays not supported yet");
case BYTES: {
byte expected[] = ea.getBytes(i);
byte actual[] = aa.getBytes(i);
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
index 42a7e63..e730987 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetPrinter.java
@@ -20,8 +20,8 @@ package org.apache.drill.test.rowSet;
import java.io.PrintStream;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
-import org.apache.drill.test.rowSet.RowSet.RowSetReader;
+import org.apache.drill.exec.record.ColumnMetadata;
+import org.apache.drill.exec.record.TupleMetadata;
/**
* Print a row set in CSV-like format. Primarily for debugging.
@@ -41,21 +41,21 @@ public class RowSetPrinter {
public void print(PrintStream out) {
SelectionVectorMode selectionMode = rowSet.indirectionType();
RowSetReader reader = rowSet.reader();
- int colCount = reader.schema().count();
- printSchema(out, selectionMode);
+ int colCount = reader.schema().size();
+ printSchema(out, selectionMode, reader);
while (reader.next()) {
printHeader(out, reader, selectionMode);
for (int i = 0; i < colCount; i++) {
if (i > 0) {
out.print(", ");
}
- out.print(reader.getAsString(i));
+ out.print(reader.column(i).getAsString());
}
out.println();
}
}
- private void printSchema(PrintStream out, SelectionVectorMode selectionMode) {
+ private void printSchema(PrintStream out, SelectionVectorMode selectionMode, RowSetReader reader) {
out.print("#");
switch (selectionMode) {
case FOUR_BYTE:
@@ -68,14 +68,24 @@ public class RowSetPrinter {
break;
}
out.print(": ");
- TupleSchema schema = rowSet.schema().hierarchicalAccess();
- for (int i = 0; i < schema.count(); i++) {
+ TupleMetadata schema = reader.schema();
+ printTupleSchema(out, schema);
+ out.println();
+ }
+
+ private void printTupleSchema(PrintStream out, TupleMetadata schema) {
+ for (int i = 0; i < schema.size(); i++) {
if (i > 0) {
out.print(", ");
}
- out.print(schema.column(i).getName());
+ ColumnMetadata colSchema = schema.metadata(i);
+ out.print(colSchema.name());
+ if (colSchema.isMap()) {
+ out.print("(");
+ printTupleSchema(out, colSchema.mapSchema());
+ out.print(")");
+ }
}
- out.println();
}
private void printHeader(PrintStream out, RowSetReader reader, SelectionVectorMode selectionMode) {
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java
new file mode 100644
index 0000000..3e27529
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.vector.accessor.TupleReader;
+
+/**
+ * Reader for all types of row sets.
+ */
+
+public interface RowSetReader extends TupleReader {
+
+ /**
+ * Total number of rows in the row set.
+ * @return total number of rows
+ */
+ int rowCount();
+
+ boolean next();
+ int index();
+ void set(int index);
+
+ /**
+ * Batch index: 0 for a single batch, batch for the current
+ * row is a hyper-batch.
+ * @return index of the batch for the current row
+ */
+ int batchIndex();
+
+ /**
+ * The index of the underlying row which may be indexed by an
+ * Sv2 or Sv4.
+ *
+ * @return
+ */
+
+ int rowIndex();
+ boolean valid();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java
new file mode 100644
index 0000000..2bae085
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetReaderImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
+import org.apache.drill.exec.vector.accessor.reader.AbstractTupleReader;
+
+/**
+ * Reader implementation for a row set.
+ */
+
+public class RowSetReaderImpl extends AbstractTupleReader implements RowSetReader {
+
+ protected final ReaderIndex readerIndex;
+
+ public RowSetReaderImpl(TupleMetadata schema, ReaderIndex index, AbstractObjectReader[] readers) {
+ super(schema, readers);
+ this.readerIndex = index;
+ bindIndex(index);
+ }
+
+ public RowSetReaderImpl(TupleMetadata schema, ReaderIndex index,
+ List<AbstractObjectReader> readers) {
+ this(schema, index,
+ readers.toArray(new AbstractObjectReader[readers.size()]));
+ }
+
+ @Override
+ public boolean next() {
+ if (! readerIndex.next()) {
+ return false;
+ }
+ reposition();
+ return true;
+ }
+
+ @Override
+ public boolean valid() { return readerIndex.valid(); }
+
+ @Override
+ public int index() { return readerIndex.position(); }
+
+ @Override
+ public int rowCount() { return readerIndex.size(); }
+
+ @Override
+ public int rowIndex() { return readerIndex.vectorIndex(); }
+
+ @Override
+ public int batchIndex() { return readerIndex.batchIndex(); }
+
+ @Override
+ public void set(int index) {
+ this.readerIndex.set(index);
+ reposition();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
deleted file mode 100644
index 55b5f12..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetSchema.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.test.rowSet;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
-import org.apache.drill.exec.record.MaterializedField;
-
-/**
- * Row set schema presented as a number of distinct "views" for various
- * purposes:
- * <ul>
- * <li>Batch schema: the schema used by a VectorContainer.</li>
- * <li>Physical schema: the schema expressed as a hierarchy of
- * tuples with the top tuple representing the row, nested tuples
- * representing maps.</li>
- * <li>Access schema: a flattened schema with all scalar columns
- * at the top level, and with map columns pulled out into a separate
- * collection. The flattened-scalar view is the one used to write to,
- * and read from, the row set.</li>
- * </ul>
- * Allows easy creation of multiple row sets from the same schema.
- * Each schema is immutable, which is fine for tests in which we
- * want known inputs and outputs.
- */
-
-public class RowSetSchema {
-
- /**
- * Logical description of a column. A logical column is a
- * materialized field. For maps, also includes a logical schema
- * of the map.
- */
-
- public static class LogicalColumn {
- protected final String fullName;
- protected final int accessIndex;
- protected int flatIndex;
- protected final MaterializedField field;
-
- /**
- * Schema of the map. Includes only those fields directly within
- * the map; does not include fields from nested tuples.
- */
-
- protected PhysicalSchema mapSchema;
-
- public LogicalColumn(String fullName, int accessIndex, MaterializedField field) {
- this.fullName = fullName;
- this.accessIndex = accessIndex;
- this.field = field;
- }
-
- private void updateStructure(int index, PhysicalSchema children) {
- flatIndex = index;
- mapSchema = children;
- }
-
- public int accessIndex() { return accessIndex; }
- public int flatIndex() { return flatIndex; }
- public boolean isMap() { return mapSchema != null; }
- public PhysicalSchema mapSchema() { return mapSchema; }
- public MaterializedField field() { return field; }
- public String fullName() { return fullName; }
- }
-
- /**
- * Implementation of a tuple name space. Tuples allow both indexed and
- * named access to their members.
- *
- * @param <T> the type of object representing each column
- */
-
- public static class NameSpace<T> {
- private final Map<String,Integer> nameSpace = new HashMap<>();
- private final List<T> columns = new ArrayList<>();
-
- public int add(String key, T value) {
- int index = columns.size();
- nameSpace.put(key, index);
- columns.add(value);
- return index;
- }
-
- public T get(int index) {
- return columns.get(index);
- }
-
- public T get(String key) {
- int index = getIndex(key);
- if (index == -1) {
- return null;
- }
- return get(index);
- }
-
- public int getIndex(String key) {
- Integer index = nameSpace.get(key);
- if (index == null) {
- return -1;
- }
- return index;
- }
-
- public int count() { return columns.size(); }
- }
-
- /**
- * Provides a non-flattened, physical view of the schema. The top-level
- * row includes maps, maps expand to a nested tuple schema. This view
- * corresponds, more-or-less, to the physical storage of vectors in
- * a vector accessible or vector container.
- */
-
- private static class TupleSchemaImpl implements TupleSchema {
-
- private NameSpace<LogicalColumn> columns;
-
- public TupleSchemaImpl(NameSpace<LogicalColumn> ns) {
- this.columns = ns;
- }
-
- @Override
- public MaterializedField column(int index) {
- return logicalColumn(index).field();
- }
-
- public LogicalColumn logicalColumn(int index) { return columns.get(index); }
-
- @Override
- public MaterializedField column(String name) {
- LogicalColumn col = columns.get(name);
- return col == null ? null : col.field();
- }
-
- @Override
- public int columnIndex(String name) {
- return columns.getIndex(name);
- }
-
- @Override
- public int count() { return columns.count(); }
- }
-
- /**
- * Represents the flattened view of the schema used to get and set columns.
- * Represents a left-to-right, depth-first traversal of the row and map
- * columns. Holds only materialized vectors (non-maps). For completeness,
- * provides access to maps also via separate methods, but this is generally
- * of little use.
- */
-
- public static class FlattenedSchema extends TupleSchemaImpl {
- protected final TupleSchemaImpl maps;
-
- public FlattenedSchema(NameSpace<LogicalColumn> cols, NameSpace<LogicalColumn> maps) {
- super(cols);
- this.maps = new TupleSchemaImpl(maps);
- }
-
- public LogicalColumn logicalMap(int index) { return maps.logicalColumn(index); }
- public MaterializedField map(int index) { return maps.column(index); }
- public MaterializedField map(String name) { return maps.column(name); }
- public int mapIndex(String name) { return maps.columnIndex(name); }
- public int mapCount() { return maps.count(); }
- }
-
- /**
- * Physical schema of a row set showing the logical hierarchy of fields
- * with map fields as first-class fields. Map members appear as children
- * under the map, much as they appear in the physical value-vector
- * implementation.
- */
-
- public static class PhysicalSchema {
- protected final NameSpace<LogicalColumn> schema = new NameSpace<>();
-
- public LogicalColumn column(int index) {
- return schema.get(index);
- }
-
- public LogicalColumn column(String name) {
- return schema.get(name);
- }
-
- public int count() { return schema.count(); }
-
- public NameSpace<LogicalColumn> nameSpace() { return schema; }
- }
-
- private static class SchemaExpander {
- private final PhysicalSchema physicalSchema;
- private final NameSpace<LogicalColumn> cols = new NameSpace<>();
- private final NameSpace<LogicalColumn> maps = new NameSpace<>();
-
- public SchemaExpander(BatchSchema schema) {
- physicalSchema = expand("", schema);
- }
-
- private PhysicalSchema expand(String prefix, Iterable<MaterializedField> fields) {
- PhysicalSchema physical = new PhysicalSchema();
- for (MaterializedField field : fields) {
- String name = prefix + field.getName();
- int index;
- LogicalColumn colSchema = new LogicalColumn(name, physical.count(), field);
- physical.schema.add(field.getName(), colSchema);
- PhysicalSchema children = null;
- if (field.getType().getMinorType() == MinorType.MAP) {
- index = maps.add(name, colSchema);
- children = expand(name + ".", field.getChildren());
- } else {
- index = cols.add(name, colSchema);
- }
- colSchema.updateStructure(index, children);
- }
- return physical;
- }
- }
-
- private final BatchSchema batchSchema;
- private final TupleSchemaImpl accessSchema;
- private final FlattenedSchema flatSchema;
- private final PhysicalSchema physicalSchema;
-
- public RowSetSchema(BatchSchema schema) {
- batchSchema = schema;
- SchemaExpander expander = new SchemaExpander(schema);
- physicalSchema = expander.physicalSchema;
- accessSchema = new TupleSchemaImpl(physicalSchema.nameSpace());
- flatSchema = new FlattenedSchema(expander.cols, expander.maps);
- }
-
- /**
- * A hierarchical schema that includes maps, with maps expanding
- * to a nested tuple schema. Not used at present; this is intended
- * to be the bases of non-flattened accessors if we find the need.
- * @return the hierarchical access schema
- */
-
- public TupleSchema hierarchicalAccess() { return accessSchema; }
-
- /**
- * A flattened (left-to-right, depth-first traversal) of the non-map
- * columns in the row. Used to define the column indexes in the
- * get methods for row readers and the set methods for row writers.
- * @return the flattened access schema
- */
-
- public FlattenedSchema flatAccess() { return flatSchema; }
-
- /**
- * Internal physical schema in hierarchical order. Mostly used to create
- * the other schemas, but may be of use in special cases. Has the same
- * structure as the batch schema, but with additional information.
- * @return a tree-structured physical schema
- */
-
- public PhysicalSchema physical() { return physicalSchema; }
-
- /**
- * The batch schema used by the Drill runtime. Represents a tree-structured
- * list of top-level fields, including maps. Maps contain a nested schema.
- * @return the batch schema used by the Drill runtime
- */
-
- public BatchSchema batch() { return batchSchema; }
-
- /**
- * Convert this schema to a new batch schema that includes the specified
- * selection vector mode.
- * @param svMode selection vector mode for the new schema
- * @return the new batch schema
- */
-
- public BatchSchema toBatchSchema(SelectionVectorMode svMode) {
- List<MaterializedField> fields = new ArrayList<>();
- for (MaterializedField field : batchSchema) {
- fields.add(field);
- }
- return new BatchSchema(svMode, fields);
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index 261a9c1..32b61ca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -17,12 +17,18 @@
*/
package org.apache.drill.test.rowSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.accessor.AccessorUtilities;
-import org.apache.drill.exec.vector.accessor.ColumnAccessor.ValueType;
-import org.apache.drill.exec.vector.accessor.ColumnWriter;
-import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.bouncycastle.util.Arrays;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -62,11 +68,42 @@ public class RowSetUtilities {
*/
public static void setFromInt(RowSetWriter rowWriter, int index, int value) {
- ColumnWriter writer = rowWriter.column(index);
- if (writer.valueType() == ValueType.PERIOD) {
- setPeriodFromInt(writer, rowWriter.schema().column(index).getType().getMinorType(), value);
- } else {
- AccessorUtilities.setFromInt(writer, value);
+ ScalarWriter writer = rowWriter.scalar(index);
+ MaterializedField field = rowWriter.schema().column(index);
+ writer.setObject(testDataFromInt(writer.valueType(), field.getType(), value));
+ }
+
+ public static Object testDataFromInt(ValueType valueType, MajorType dataType, int value) {
+ switch (valueType) {
+ case BYTES:
+ return Integer.toHexString(value).getBytes();
+ case DOUBLE:
+ return (double) value;
+ case INTEGER:
+ switch (dataType.getMinorType()) {
+ case BIT:
+ return value & 0x01;
+ case SMALLINT:
+ return value % 32768;
+ case UINT2:
+ return value & 0xFFFF;
+ case TINYINT:
+ return value % 128;
+ case UINT1:
+ return value & 0xFF;
+ default:
+ return value;
+ }
+ case LONG:
+ return (long) value;
+ case STRING:
+ return Integer.toString(value);
+ case DECIMAL:
+ return BigDecimal.valueOf(value, dataType.getScale());
+ case PERIOD:
+ return periodFromInt(dataType.getMinorType(), value);
+ default:
+ throw new IllegalStateException("Unknown writer type: " + valueType);
}
}
@@ -81,26 +118,56 @@ public class RowSetUtilities {
* @param writer column writer for a period column
* @param minorType the Drill data type
* @param value the integer value to apply
+ * @throws VectorOverflowException
*/
- public static void setPeriodFromInt(ColumnWriter writer, MinorType minorType,
- int value) {
+ public static Period periodFromInt(MinorType minorType, int value) {
switch (minorType) {
case INTERVAL:
- writer.setPeriod(Duration.millis(value).toPeriod());
- break;
+ return Duration.millis(value).toPeriod();
case INTERVALYEAR:
- writer.setPeriod(Period.years(value / 12).withMonths(value % 12));
- break;
+ return Period.years(value / 12).withMonths(value % 12);
case INTERVALDAY:
int sec = value % 60;
value = value / 60;
int min = value % 60;
value = value / 60;
- writer.setPeriod(Period.days(value).withMinutes(min).withSeconds(sec));
- break;
+ return Period.days(value).withMinutes(min).withSeconds(sec);
default:
throw new IllegalArgumentException("Writer is not an interval: " + minorType);
}
}
+
+ public static void assertEqualValues(ValueType type, Object expectedObj, Object actualObj) {
+ assertEqualValues(type.toString(), type, expectedObj, actualObj);
+ }
+
+ public static void assertEqualValues(String msg, ValueType type, Object expectedObj, Object actualObj) {
+ switch (type) {
+ case BYTES: {
+ byte expected[] = (byte[]) expectedObj;
+ byte actual[] = (byte[]) actualObj;
+ assertEquals(msg + " - byte lengths differ", expected.length, actual.length);
+ assertTrue(msg, Arrays.areEqual(expected, actual));
+ break;
+ }
+ case DOUBLE:
+ assertEquals(msg, (double) expectedObj, (double) actualObj, 0.0001);
+ break;
+ case INTEGER:
+ case LONG:
+ case STRING:
+ case DECIMAL:
+ assertEquals(msg, expectedObj, actualObj);
+ break;
+ case PERIOD: {
+ Period expected = (Period) expectedObj;
+ Period actual = (Period) actualObj;
+ assertEquals(msg, expected.normalizedStandard(), actual.normalizedStandard());
+ break;
+ }
+ default:
+ throw new IllegalStateException( "Unexpected type: " + type);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java
new file mode 100644
index 0000000..874c0e1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+
+/**
+ * Interface for writing values to a row set. Only available
+ * for newly-created, single, direct row sets. Eventually, if
+ * we want to allow updating a row set, we have to create a
+ * new row set with the updated columns, then merge the new
+ * and old row sets to create a new immutable row set.
+ * <p>
+ * Typical usage:
+ * <pre></code>
+ * void writeABatch() {
+ * RowSetWriter writer = ...
+ * while (! writer.isFull()) {
+ * writer.scalar(0).setInt(10);
+ * writer.scalar(1).setString("foo");
+ * ...
+ * writer.save();
+ * }
+ * }</code></pre>
+ * The above writes until the batch is full, based on size. If values
+ * are large enough to potentially cause vector overflow, do the
+ * following instead:
+ * <pre></code>
+ * void writeABatch() {
+ * RowSetWriter writer = ...
+ * while (! writer.isFull()) {
+ * writer.column(0).setInt(10);
+ * try {
+ * writer.column(1).setString("foo");
+ * } catch (VectorOverflowException e) { break; }
+ * ...
+ * writer.save();
+ * }
+ * // Do something with the partially-written last row.
+ * }</code></pre>
+ * <p>
+ * This writer is for testing, so no provision is available to handle a
+ * partial last row. (Elsewhere n Drill there are classes that handle that case.)
+ */
+
+public interface RowSetWriter extends TupleWriter {
+
+ /**
+ * Write a row of values, given by Java objects. Object type must
+ * match expected column type. Stops writing, and returns false,
+ * if any value causes vector overflow. Value format:
+ * <ul>
+ * <li>For scalars, the value as a suitable Java type (int or
+ * Integer, say, for <tt>INTEGER</tt> values.)</li>
+ * <li>For scalar arrays, an array of a suitable Java primitive type
+ * for scalars. For example, <tt>int[]</tt> for an <tt>INTEGER</tt>
+ * column.</li>
+ * <li>For a Map, an <tt>Object<tt> array with values encoded as above.
+ * (In fact, the list here is the same as the map format.</li>
+ * <li>For a list (repeated map, list of list), an <tt>Object</tt>
+ * array with values encoded as above. (So, for a repeated map, an outer
+ * <tt>Object</tt> map encodes the array, an inner one encodes the
+ * map members.</li>
+ * </ul>
+ *
+ * @param values variable-length argument list of column values
+ */
+
+ void setRow(Object...values);
+
+ /**
+ * Indicates if the current row position is valid for
+ * writing. Will be false on the first row, and all subsequent
+ * rows until either the maximum number of rows are written,
+ * or a vector overflows. After that, will return true. The
+ * method returns false as soon as any column writer overflows
+ * even in the middle of a row write. That is, this writer
+ * does not automatically handle overflow rows because that
+ * added complexity is seldom needed for tests.
+ *
+ * @return true if the current row can be written, false
+ * if not
+ */
+
+ boolean isFull();
+ int rowIndex();
+
+ /**
+ * Saves the current row and moves to the next row.
+ * Done automatically if using <tt>setRow()</tt>.
+ */
+
+ void save();
+
+ /**
+ * Finish writing and finalize the row set being
+ * written.
+ * @return the completed, read-only row set without a
+ * selection vector
+ */
+
+ SingleRowSet done();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java
new file mode 100644
index 0000000..074842d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test.rowSet;
+
+import java.util.List;
+
+import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+
+/**
+ * Implementation of a row set writer. Only available for newly-created,
+ * empty, direct, single row sets. Rewriting is not allowed, nor is writing
+ * to a hyper row set.
+ */
+
+public class RowSetWriterImpl extends AbstractTupleWriter implements RowSetWriter {
+
+ /**
+ * Writer index that points to each row in the row set. The index starts at
+ * the 0th row and advances one row on each increment. This allows writers to
+ * start positioned at the first row. Writes happen in the current row.
+ * Calling <tt>next()</tt> advances to the next position, effectively saving
+ * the current row. The most recent row can be abandoned easily simply by not
+ * calling <tt>next()</tt>. This means that the number of completed rows is
+ * the same as the row index.
+ */
+
+ static class WriterIndexImpl implements ColumnWriterIndex {
+
+ public enum State { OK, VECTOR_OVERFLOW, END_OF_BATCH }
+
+ private int rowIndex = 0;
+ private State state = State.OK;
+
+ @Override
+ public final int vectorIndex() { return rowIndex; }
+
+ public final boolean next() {
+ if (++rowIndex < ValueVector.MAX_ROW_COUNT) {
+ return true;
+ }
+ // Should not call next() again once batch is full.
+ assert rowIndex == ValueVector.MAX_ROW_COUNT;
+ rowIndex = ValueVector.MAX_ROW_COUNT;
+ state = state == State.OK ? State.END_OF_BATCH : state;
+ return false;
+ }
+
+ public int size() {
+ // The index always points to the next slot past the
+ // end of valid rows.
+ return rowIndex;
+ }
+
+ public boolean valid() { return state == State.OK; }
+
+ public boolean hasOverflow() { return state == State.VECTOR_OVERFLOW; }
+
+ @Override
+ public final void nextElement() { }
+
+ @Override
+ public void rollover() {
+ throw new UnsupportedOperationException("Rollover not supported in the row set writer.");
+ }
+
+ @Override
+ public int rowStartIndex() { return rowIndex; }
+
+ @Override
+ public ColumnWriterIndex outerIndex() { return null; }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("[")
+ .append(getClass().getSimpleName())
+ .append(" state = ")
+ .append(state)
+ .append(", rowIndex = ")
+ .append(rowIndex)
+ .append("]")
+ .toString();
+ }
+ }
+
+ private final WriterIndexImpl writerIndex;
+ private final ExtendableRowSet rowSet;
+
+ protected RowSetWriterImpl(ExtendableRowSet rowSet, TupleMetadata schema, WriterIndexImpl index, List<AbstractObjectWriter> writers) {
+ super(schema, writers);
+ this.rowSet = rowSet;
+ this.writerIndex = index;
+ bindIndex(index);
+ startWrite();
+ startRow();
+ }
+
+ @Override
+ public void setRow(Object...values) {
+ setObject(values);
+ save();
+ }
+
+ @Override
+ public int rowIndex() { return writerIndex.vectorIndex(); }
+
+ @Override
+ public void save() {
+ endArrayValue();
+ saveRow();
+
+ // For convenience, start a new row after each save.
+ // The last (unused) row is abandoned when the batch is full.
+
+ if (writerIndex.next()) {
+ startRow();
+ }
+ }
+
+ @Override
+ public boolean isFull( ) { return ! writerIndex.valid(); }
+
+ @Override
+ public SingleRowSet done() {
+ endWrite();
+ rowSet.container().setRecordCount(writerIndex.vectorIndex());
+ return rowSet;
+ }
+
+ @Override
+ public int lastWriteIndex() {
+ return writerIndex.vectorIndex();
+ }
+}