You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/12 04:48:30 UTC
[08/10] Add support for RepeatedMapVector,
MapVector and RepeatedListVector.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 5f26054..039445b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -62,9 +62,9 @@ public abstract class HashAggTemplate implements HashAggregator {
private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-
+
private static final boolean EXTRA_DEBUG_1 = false;
- private static final boolean EXTRA_DEBUG_2 = false;
+ private static final boolean EXTRA_DEBUG_2 = false;
private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field.";
private boolean first = true;
private boolean newSchema = false;
@@ -88,7 +88,7 @@ public abstract class HashAggTemplate implements HashAggregator {
List<VectorAllocator> wsAllocators = Lists.newArrayList(); // allocators for the workspace vectors
ErrorCollector collector = new ErrorCollectorImpl();
-
+
private MaterializedField[] materializedValueFields;
private boolean allFlushed = false;
@@ -102,13 +102,13 @@ public abstract class HashAggTemplate implements HashAggregator {
aggrValuesContainer = new VectorContainer();
ValueVector vector ;
-
- for(int i = 0; i < materializedValueFields.length; i++) {
+
+ for(int i = 0; i < materializedValueFields.length; i++) {
MaterializedField outputField = materializedValueFields[i];
// Create a type-specific ValueVector for this value
vector = TypeHelper.getNewVector(outputField, allocator) ;
VectorAllocator.getAllocator(vector, 50 /* avg. width */).alloc(HashTable.BATCH_SIZE) ;
-
+
aggrValuesContainer.add(vector) ;
}
@@ -124,8 +124,8 @@ public abstract class HashAggTemplate implements HashAggregator {
setupInterior(incoming, outgoing, aggrValuesContainer);
}
- private boolean outputValues() {
- for (int i = 0; i <= maxOccupiedIdx; i++) {
+ private boolean outputValues() {
+ for (int i = 0; i <= maxOccupiedIdx; i++) {
if (outputRecordValues(i, outputCount) ) {
if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", outputCount) ;
outputCount++;
@@ -139,7 +139,7 @@ public abstract class HashAggTemplate implements HashAggregator {
private void clear() {
aggrValuesContainer.clear();
}
-
+
// Code-generated methods (implemented in HashAggBatch)
@RuntimeOverridden
@@ -155,19 +155,19 @@ public abstract class HashAggTemplate implements HashAggregator {
@Override
public void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, RecordBatch outgoing,
- LogicalExpression[] valueExprs,
+ LogicalExpression[] valueExprs,
List<TypedFieldId> valueFieldIds,
TypedFieldId[] groupByOutFieldIds,
- VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators)
+ VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators)
throws SchemaChangeException, ClassTransformationException, IOException {
-
+
if (valueExprs == null || valueFieldIds == null) {
throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
}
if (valueFieldIds.size() < valueExprs.length) {
throw new IllegalArgumentException("Wrong number of workspace variables.");
}
-
+
this.context = context;
this.allocator = allocator;
this.incoming = incoming;
@@ -175,11 +175,11 @@ public abstract class HashAggTemplate implements HashAggregator {
this.keyAllocators = keyAllocators;
this.valueAllocators = valueAllocators;
this.outgoing = outgoing;
-
+
this.hashAggrConfig = hashAggrConfig;
- // currently, hash aggregation is only applicable if there are group-by expressions.
- // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no
+ // currently, hash aggregation is only applicable if there are group-by expressions.
+ // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no
// need to create hash table. However, for plain aggregations with DISTINCT ..
// e.g SELECT COUNT(DISTINCT a1) FROM t1 ;
// we need to build a hash table on the aggregation column a1.
@@ -188,14 +188,14 @@ public abstract class HashAggTemplate implements HashAggregator {
throw new IllegalArgumentException("Currently, hash aggregation is only applicable if there are group-by expressions.");
}
- this.htIdxHolder = new IntHolder();
+ this.htIdxHolder = new IntHolder();
materializedValueFields = new MaterializedField[valueFieldIds.size()];
if (valueFieldIds.size() > 0) {
int i = 0;
- FieldReference ref = new FieldReference("dummy", ExpressionPosition.UNKNOWN, valueFieldIds.get(0).getType());
+ FieldReference ref = new FieldReference("dummy", ExpressionPosition.UNKNOWN, valueFieldIds.get(0).getIntermediateType());
for (TypedFieldId id : valueFieldIds) {
- materializedValueFields[i++] = MaterializedField.create(ref, id.getType());
+ materializedValueFields[i++] = MaterializedField.create(ref, id.getIntermediateType());
}
}
@@ -203,7 +203,7 @@ public abstract class HashAggTemplate implements HashAggregator {
this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ;
batchHolders = new ArrayList<BatchHolder>();
- addBatchHolder();
+ addBatchHolder();
doSetup(incoming);
}
@@ -211,21 +211,21 @@ public abstract class HashAggTemplate implements HashAggregator {
@Override
public AggOutcome doWork() {
try{
- // Note: Keeping the outer and inner try blocks here to maintain some similarity with
- // StreamingAggregate which does somethings conditionally in the outer try block.
+ // Note: Keeping the outer and inner try blocks here to maintain some similarity with
+ // StreamingAggregate which does somethings conditionally in the outer try block.
// In the future HashAggregate may also need to perform some actions conditionally
- // in the outer try block.
+ // in the outer try block.
outside: while(true) {
// loop through existing records, aggregating the values as necessary.
if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
- checkGroupAndAggrValues(currentIndex);
+ checkGroupAndAggrValues(currentIndex);
}
if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex);
-
+
try{
while(true){
@@ -239,10 +239,10 @@ public abstract class HashAggTemplate implements HashAggregator {
case NOT_YET:
this.outcome = out;
return AggOutcome.RETURN_OUTCOME;
-
+
case OK_NEW_SCHEMA:
if(EXTRA_DEBUG_1) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
- newSchema = true;
+ newSchema = true;
this.cleanup();
// TODO: new schema case needs to be handled appropriately
return AggOutcome.UPDATE_AGGREGATOR;
@@ -254,20 +254,20 @@ public abstract class HashAggTemplate implements HashAggregator {
} else {
checkGroupAndAggrValues(currentIndex);
incIndex();
-
+
if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop");
continue outside;
}
case NONE:
outcome = out;
- outputKeysAndValues() ;
-
+ outputKeysAndValues() ;
+
// cleanup my internal state since there is nothing more to return
this.cleanup();
// cleanup incoming batch since output of aggregation does not need
// any references to the incoming
-
+
incoming.cleanup();
return setOkAndReturn();
@@ -294,7 +294,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// now otherwise downstream operators will break.
// TODO: allow outputting arbitrarily large number of records in batches
assert (numGroupedRecords < Character.MAX_VALUE);
-
+
for (VectorAllocator a : keyAllocators) {
if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords);
a.alloc(numGroupedRecords);
@@ -320,14 +320,14 @@ public abstract class HashAggTemplate implements HashAggregator {
public void cleanup(){
htable.clear();
htable = null;
- htIdxHolder = null;
+ htIdxHolder = null;
materializedValueFields = null;
for (BatchHolder bh : batchHolders) {
bh.clear();
}
batchHolders.clear();
- batchHolders = null;
+ batchHolders = null;
}
private AggOutcome tooBigFailure(){
@@ -335,7 +335,7 @@ public abstract class HashAggTemplate implements HashAggregator {
this.outcome = IterOutcome.STOP;
return AggOutcome.CLEANUP_AND_RETURN;
}
-
+
private final AggOutcome setOkAndReturn(){
if(first){
this.outcome = IterOutcome.OK_NEW_SCHEMA;
@@ -356,20 +356,20 @@ public abstract class HashAggTemplate implements HashAggregator {
}
currentIndex = getVectorIndex(underlyingIndex);
}
-
+
private final void resetIndex(){
underlyingIndex = -1;
incIndex();
}
private void addBatchHolder() {
- BatchHolder bh = new BatchHolder();
+ BatchHolder bh = new BatchHolder();
batchHolders.add(bh);
if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
int batchIdx = batchHolders.size() - 1;
- bh.setup(batchIdx);
+ bh.setup(batchIdx);
}
private boolean outputKeysAndValues() {
@@ -392,20 +392,20 @@ public abstract class HashAggTemplate implements HashAggregator {
return allFlushed;
}
- // Check if a group is present in the hash table; if not, insert it in the hash table.
- // The htIdxHolder contains the index of the group in the hash table container; this same
- // index is also used for the aggregation values maintained by the hash aggregate.
+ // Check if a group is present in the hash table; if not, insert it in the hash table.
+ // The htIdxHolder contains the index of the group in the hash table container; this same
+ // index is also used for the aggregation values maintained by the hash aggregate.
private boolean checkGroupAndAggrValues(int incomingRowIdx) {
if (incomingRowIdx < 0) {
throw new IllegalArgumentException("Invalid incoming row index.");
}
- /** for debugging
+ /** for debugging
Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
BigIntVector vv0 = null;
BigIntHolder holder = null;
- if (tmp != null) {
+ if (tmp != null) {
vv0 = ((BigIntVector) tmp);
holder = new BigIntHolder();
holder.value = vv0.getAccessor().get(incomingRowIdx) ;
@@ -432,7 +432,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// logger.debug("group-by key = {} already present at hash table index = {}", holder.value, currentIdx) ;
//}
- }
+ }
else if (putStatus == HashTable.PutStatus.KEY_ADDED) {
if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ;
@@ -441,17 +441,17 @@ public abstract class HashAggTemplate implements HashAggregator {
// logger.debug("group-by key = {} added at hash table index = {}", holder.value, currentIdx) ;
//}
}
-
+
if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
numGroupedRecords++;
return true;
}
-
- }
+
+ }
return false;
}
-
+
// Code-generated methods (implemented in HashAggBatch)
public abstract void doSetup(@Named("incoming") RecordBatch incoming);
public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index 34845b3..3e6def1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -28,12 +28,12 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
public class InternalBatch implements Iterable<VectorWrapper<?>>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InternalBatch.class);
-
+
private final VectorContainer container;
private final BatchSchema schema;
private final SelectionVector2 sv2;
private final SelectionVector4 sv4;
-
+
public InternalBatch(RecordBatch incoming){
switch(incoming.getSchema().getSelectionVectorMode()){
case FOUR_BYTE:
@@ -42,7 +42,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
break;
case TWO_BYTE:
this.sv4 = null;
- this.sv2 = incoming.getSelectionVector2().clone();
+ this.sv2 = incoming.getSelectionVector2().clone();
break;
default:
this.sv4 = null;
@@ -74,9 +74,9 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
if(sv4 != null) sv4.clear();
container.clear();
}
-
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
- return container.getValueAccessorById(fieldId, clazz);
+
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds){
+ return container.getValueAccessorById(clazz, fieldIds);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 883052a..72d0462 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -348,8 +348,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
container.add(v);
allocators.add(RemovingRecordBatch.getAllocator4(v));
- JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+ JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId));
g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
.arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
.arg(outIndex)
@@ -376,8 +376,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
container.add(v);
allocators.add(RemovingRecordBatch.getAllocator4(v));
- JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false));
+ JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), false, fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, outputFieldId));
g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index baa232e..c07878a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -43,7 +43,7 @@ public final class JoinStatus {
private int rightPosition;
private int svRightPosition;
private IterOutcome lastRight;
-
+
private int outputPosition;
public RightSourceMode rightSourceMode = RightSourceMode.INCOMING;
public MergeJoinBatch outputBatch;
@@ -54,7 +54,7 @@ public final class JoinStatus {
public boolean ok = true;
private boolean initialSet = false;
private boolean leftRepeating = false;
-
+
public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) {
super();
this.left = left;
@@ -70,7 +70,7 @@ public final class JoinStatus {
initialSet = true;
}
}
-
+
public final void advanceLeft(){
leftPosition++;
}
@@ -90,6 +90,10 @@ public final class JoinStatus {
return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
}
+ public final int getRightCount(){
+ return right.getRecordCount();
+ }
+
public final void setRightPosition(int pos) {
rightPosition = pos;
}
@@ -176,7 +180,7 @@ public final class JoinStatus {
}
if(b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) b.getSelectionVector2().clear();
}
-
+
/**
* Check if the left record position can advance by one in the current batch.
*/
@@ -230,5 +234,5 @@ public final class JoinStatus {
private boolean eitherMatches(IterOutcome outcome){
return lastLeft == outcome || lastRight == outcome;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index f43934e..af0d378 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -97,8 +97,8 @@ public abstract class JoinTemplate implements JoinWorker {
while (status.isLeftPositionAllowed()) {
if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
return false;
-
- status.incOutputPos();
+
+ status.incOutputPos();
status.advanceLeft();
}
}
@@ -113,7 +113,7 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
+ if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
return false;
status.incOutputPos();
}
@@ -135,7 +135,7 @@ public abstract class JoinTemplate implements JoinWorker {
doCompareNextLeftKey(status.getLeftPosition()) != 0)
// this record marks the end of repeated keys
status.notifyLeftStoppedRepeating();
-
+
boolean crossedBatchBoundaries = false;
int initialRightPosition = status.getRightPosition();
do {
@@ -143,11 +143,11 @@ public abstract class JoinTemplate implements JoinWorker {
if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
return false;
- if (!doCopyRight(status.getRightPosition(), status.getOutPosition()))
+ if (!doCopyRight(status.getRightPosition(), status.getOutPosition()))
return false;
-
+
status.incOutputPos();
-
+
// If the left key has duplicates and we're about to cross a boundary in the right batch, add the
// right table's record batch to the sv4 builder before calling next. These records will need to be
// copied again for each duplicate left key.
@@ -170,7 +170,7 @@ public abstract class JoinTemplate implements JoinWorker {
status.notifyLeftStoppedRepeating();
} else if (status.isLeftRepeating() && crossedBatchBoundaries) {
try {
- // build the right batches and
+ // build the right batches and
status.outputBatch.batchBuilder.build();
status.setSV4AdvanceMode();
} catch (SchemaChangeException e) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index a2c424f..3d496d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -350,10 +350,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public TypedFieldId getValueVectorId(SchemaPath path) {
return outgoingContainer.getValueVectorId(path);
}
-
+
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return outgoingContainer.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return outgoingContainer.getValueAccessorById(clazz, ids);
}
@Override
@@ -373,7 +373,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
/**
* Creates a generate class which implements the copy and compare methods.
- *
+ *
* @return instance of a new merger based on generated code
* @throws SchemaChangeException
*/
@@ -443,8 +443,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// declare incoming value vector and assign it to the array
JVar inVV = cg.declareVectorValueSetupAndMember("incomingBatches[" + batchIdx + "]",
new TypedFieldId(vv.getField().getType(),
- fieldIdx,
- false));
+ false,
+ fieldIdx));
// add vv to initialization list (e.g. { vv1, vv2, vv3 } )
incomingVectorInitBatch.add(inVV);
@@ -501,11 +501,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType();
Class cmpVectorClass = TypeHelper.getValueVectorClass(minor, mode);
+ JExpression arr = JExpr.newArray(cg.getModel().INT).add(JExpr.lit(vvRead.getFieldId().getFieldIds()[0]));
comparisonVectorInitBatch.add(
((JExpression) incomingBatchesVar.component(JExpr.lit(b)))
.invoke("getValueAccessorById")
- .arg(JExpr.lit(vvRead.getFieldId().getFieldId()))
.arg(cg.getModel()._ref(cmpVectorClass).boxify().dotclass())
+ .arg(arr)
.invoke("getValueVector"));
}
@@ -583,8 +584,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// declare outgoing value vectors
JVar outgoingVV = cg.declareVectorValueSetupAndMember("outgoingBatch",
new TypedFieldId(vvOut.getField().getType(),
- fieldIdx,
- vvOut.isHyper()));
+ vvOut.isHyper(), fieldIdx));
// assign to the appropriate slot in the outgoingVector array (in order of iteration)
cg.getSetupBlock().assign(outgoingVectors.component(JExpr.lit(fieldIdx)), outgoingVV);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
index dd7011a..3398443 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
@@ -82,7 +82,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
this.svMode = incoming.getSchema().getSelectionVectorMode();
this.outBatch = outgoing;
this.outputField = outputField;
- partitionValues = (IntVector) outBatch.getValueAccessorById(outBatch.getValueVectorId(outputField).getFieldId(), IntVector.class).getValueVector();
+ partitionValues = (IntVector) outBatch.getValueAccessorById(IntVector.class, outBatch.getValueVectorId(outputField).getFieldIds()).getValueVector();
switch(svMode){
case FOUR_BYTE:
case TWO_BYTE:
@@ -98,7 +98,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
public abstract int doEval(@Named("inIndex") int inIndex, @Named("partitionIndex") int partitionIndex);
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 6e115a7..deef25f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -219,8 +219,8 @@ public class OutgoingRecordBatch implements VectorAccessible {
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return vectorContainer.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
+ return vectorContainer.getValueAccessorById(clazz, fieldIds);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 6048085..bcd484c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -94,7 +94,7 @@ public class PartitionSenderRootExec implements RootExec {
if (!ok) {
stop();
-
+
return false;
}
@@ -153,7 +153,7 @@ public class PartitionSenderRootExec implements RootExec {
}
-
+
private void generatePartitionFunction() throws SchemaChangeException {
LogicalExpression filterExpression = operator.getExpr();
@@ -166,7 +166,7 @@ public class PartitionSenderRootExec implements RootExec {
}
cg.addExpr(new ReturnValueExpression(expr));
-
+
try {
Partitioner p = context.getImplementationClass(cg);
p.setup(context, incoming, outgoing);
@@ -214,7 +214,7 @@ public class PartitionSenderRootExec implements RootExec {
"outgoingVectors");
// create 2d array and build initialization list. For example:
- // outgoingVectors = new ValueVector[][] {
+ // outgoingVectors = new ValueVector[][] {
// new ValueVector[] {vv1, vv2},
// new ValueVector[] {vv3, vv4}
// });
@@ -229,8 +229,8 @@ public class PartitionSenderRootExec implements RootExec {
// declare outgoing value vector and assign it to the array
JVar outVV = cg.declareVectorValueSetupAndMember("outgoing[" + batchId + "]",
new TypedFieldId(vv.getField().getType(),
- fieldId,
- false));
+ false,
+ fieldId));
// add vv to initialization list (e.g. { vv1, vv2, vv3 } )
outgoingVectorInitBatch.add(outVV);
++fieldId;
@@ -248,8 +248,8 @@ public class PartitionSenderRootExec implements RootExec {
for (VectorWrapper<?> vvIn : incoming) {
// declare incoming value vectors
JVar incomingVV = cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(),
- fieldId,
- vvIn.isHyper()));
+ vvIn.isHyper(),
+ fieldId));
// generate the copyFrom() invocation with explicit cast to the appropriate type
Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(),
@@ -307,7 +307,7 @@ public class PartitionSenderRootExec implements RootExec {
}
}
}
-
+
public void stop() {
logger.debug("Partition sender stopping.");
ok = false;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 347092a..b94f403 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -20,9 +20,7 @@ package org.apache.drill.exec.physical.impl.project;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-import com.sun.codemodel.JExpr;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.FieldReference;
@@ -31,7 +29,6 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
@@ -44,7 +41,6 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
@@ -52,12 +48,13 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
+import com.carrotsearch.hppc.IntOpenHashSet;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -92,6 +89,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
int incomingRecordCount = incoming.getRecordCount();
for(ValueVector v : this.allocationVectors){
AllocationHelper.allocate(v, incomingRecordCount, 250);
+// v.allocateNew();
}
int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
if (outputRecords < incomingRecordCount) {
@@ -177,14 +175,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- Set<Integer> transferFieldIds = new HashSet();
+ IntOpenHashSet transferFieldIds = new IntOpenHashSet();
boolean isAnyWildcard = isAnyWildcard(exprs);
if(isAnyWildcard){
for(VectorWrapper<?> wrapper : incoming){
ValueVector vvIn = wrapper.getValueVector();
- String name = vvIn.getField().getDef().getName(vvIn.getField().getDef().getNameCount() - 1).getName();
+
+ String name = vvIn.getField().getPath().getLastSegment().getNameSegment().getPath();
FieldReference ref = new FieldReference(name);
TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
transfers.add(tp);
@@ -202,17 +201,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
// add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+ && !((ValueVectorReadExpression) expr).hasReadPath()
&& !isAnyWildcard
- &&!transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldId())
- && !((ValueVectorReadExpression) expr).isArrayElement()) {
+ && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])
+ && !((ValueVectorReadExpression) expr).hasReadPath()) {
ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
- ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
+ TypedFieldId id = vectorRead.getFieldId();
+ ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
Preconditions.checkNotNull(incoming);
TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
transfers.add(tp);
container.add(tp.getTo());
- transferFieldIds.add(vectorRead.getFieldId().getFieldId());
+ transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
// logger.debug("Added transfer.");
}else{
// need to do evaluation.
@@ -221,6 +222,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
TypedFieldId fid = container.add(vector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
HoldingContainer hc = cg.addExpr(write);
+
+
cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
logger.debug("Added eval.");
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 60e5993..aa0ecf6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -33,12 +33,12 @@ import com.google.common.collect.ImmutableList;
public abstract class ProjectorTemplate implements Projector {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectorTemplate.class);
-
+
private ImmutableList<TransferPair> transfers;
private SelectionVector2 vector2;
private SelectionVector4 vector4;
private SelectionVectorMode svMode;
-
+
public ProjectorTemplate() throws SchemaChangeException{
}
@@ -47,18 +47,18 @@ public abstract class ProjectorTemplate implements Projector {
switch(svMode){
case FOUR_BYTE:
throw new UnsupportedOperationException();
-
-
+
+
case TWO_BYTE:
final int count = recordCount;
for(int i = 0; i < count; i++, firstOutputIndex++){
doEval(vector2.getIndex(i), firstOutputIndex);
}
return recordCount;
-
-
+
+
case NONE:
-
+
final int countN = recordCount;
int i;
for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) {
@@ -76,8 +76,9 @@ public abstract class ProjectorTemplate implements Projector {
t.transfer();
}
return recordCount;
-
-
+
+
+
default:
throw new UnsupportedOperationException();
}
@@ -86,7 +87,7 @@ public abstract class ProjectorTemplate implements Projector {
@Override
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{
- this.svMode = incoming.getSchema().getSelectionVectorMode();
+ this.svMode = incoming.getSchema().getSelectionVectorMode();
switch(svMode){
case FOUR_BYTE:
this.vector4 = incoming.getSelectionVector4();
@@ -103,7 +104,7 @@ public abstract class ProjectorTemplate implements Projector {
public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 4018991..62af0b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -86,7 +86,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
default:
throw new UnsupportedOperationException();
}
-
+
container.buildSchema(SelectionVectorMode.NONE);
}
@@ -156,12 +156,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
public void cleanup(){
super.cleanup();
}
-
+
private class StraightCopier implements Copier{
private List<TransferPair> pairs = Lists.newArrayList();
private List<ValueVector> out = Lists.newArrayList();
-
+
@Override
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators){
for(VectorWrapper<?> vv : incoming){
@@ -183,7 +183,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
public List<ValueVector> getOut() {
return out;
}
-
+
}
private Copier getStraightCopier(){
@@ -192,10 +192,10 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
container.addCollection(copier.getOut());
return copier;
}
-
+
private Copier getGenerated2Copier() throws SchemaChangeException{
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
-
+
List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : incoming){
ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
@@ -218,12 +218,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this);
}
-
+
public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{
List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : batch){
-
+
ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
container.add(v);
allocators.add(getAllocator4(v));
@@ -239,20 +239,20 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
}
-
+
public static void generateCopies(ClassGenerator g, VectorAccessible batch, boolean hyper){
// we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all.
int fieldId = 0;
-
+
JExpression inIndex = JExpr.direct("inIndex");
JExpression outIndex = JExpr.direct("outIndex");
g.rotateBlock();
for(VectorWrapper<?> vv : batch){
- JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), fieldId, vv.isHyper()));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+ JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId));
if(hyper){
-
+
g.getEvalBlock()._if(
outVV
.invoke("copyFromSafe")
@@ -268,20 +268,20 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}else{
g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
}
-
-
+
+
fieldId++;
}
g.rotateBlock();
g.getEvalBlock()._return(JExpr.TRUE);
}
-
+
@Override
public WritableBatch getWritableBatch() {
return WritableBatch.get(this);
}
-
+
public static VectorAllocator getAllocator4(ValueVector outgoing){
if(outgoing instanceof FixedWidthVector){
return new FixedVectorAllocator((FixedWidthVector) outgoing);
@@ -291,6 +291,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
throw new UnsupportedOperationException();
}
}
-
-
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 8d3a3e5..f96a1bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -100,9 +100,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
validateReadState();
- return incoming.getValueAccessorById(fieldId, clazz);
+ return incoming.getValueAccessorById(clazz, ids);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index d87a9f5..a546852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -242,8 +242,8 @@ public class BatchGroup implements VectorAccessible {
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return currentContainer.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return currentContainer.getValueAccessorById(clazz, ids);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 930f851..4b6c37d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -373,12 +373,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
ValueVector[] vectors = new ValueVector[batchGroupList.size() * 2];
int i = 0;
for (BatchGroup group : batchGroupList) {
- vectors[i++] = group.getValueAccessorById(group.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
- field.getValueClass()).getValueVector();
+ vectors[i++] = group.getValueAccessorById(
+ field.getValueClass(),
+ group.getValueVectorId(field.getPath()).getFieldIds()
+ ).getValueVector();
if (group.hasSecond()) {
VectorContainer c = group.getSecondContainer();
- vectors[i++] = c.getValueAccessorById(c.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
- field.getValueClass()).getValueVector();
+ vectors[i++] = c.getValueAccessorById(
+ field.getValueClass(),
+ c.getValueVectorId(field.getPath()).getFieldIds()
+ ).getValueVector();
} else {
vectors[i] = vectors[i - 1].getTransferPair().getTo(); //this vector should never be used. Just want to avoid having null elements in the hyper vector
i++;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 214f81c..844d6db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
-
+
protected final VectorContainer container = new VectorContainer();
protected final T popConfig;
protected final FragmentContext context;
@@ -43,7 +43,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
this.popConfig = popConfig;
this.oContext = new OperatorContext(popConfig, context);
}
-
+
@Override
public Iterator<VectorWrapper<?>> iterator() {
return container.iterator();
@@ -67,14 +67,14 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
public void kill() {
killIncoming();
}
-
+
protected abstract void killIncoming();
-
+
public void cleanup(){
container.clear();
oContext.close();
}
-
+
@Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
@@ -91,16 +91,16 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return container.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return container.getValueAccessorById(clazz, ids);
}
-
+
@Override
public WritableBatch getWritableBatch() {
// logger.debug("Getting writable batch.");
WritableBatch batch = WritableBatch.get(this);
return batch;
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index a6a4621..b44a233 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -20,21 +20,25 @@ package org.apache.drill.exec.record;
import java.lang.reflect.Array;
import com.google.common.base.Preconditions;
+
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+import org.apache.drill.exec.vector.complex.MapVector;
public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
-
+
private T[] vectors;
private MaterializedField f;
private final boolean releasable;
-
+
public HyperVectorWrapper(MaterializedField f, T[] v){
this(f, v, true);
}
-
+
public HyperVectorWrapper(MaterializedField f, T[] v, boolean releasable){
assert(v.length > 0);
this.f = f;
@@ -72,9 +76,51 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
public void clear() {
if(!releasable) return;
for(T x : vectors){
- x.clear();
+ x.clear();
+ }
+ }
+
+ @Override
+ public VectorWrapper<?> getChildWrapper(int[] ids) {
+ if(ids.length == 1) return this;
+
+ ValueVector[] vectors = new ValueVector[this.vectors.length];
+ int index = 0;
+
+ for(ValueVector v : this.vectors){
+ ValueVector vector = v;
+ for(int i = 1; i < ids.length; i++){
+ MapVector map = (MapVector) vector;
+ vector = map.getVectorById(ids[i]);
+ }
+ vectors[index] = vector;
+ index++;
+ }
+ return new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors);
+ }
+
+ @Override
+ public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) {
+ ValueVector v = vectors[0];
+ if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null;
+
+ if(v instanceof AbstractContainerVector){
+ // we're looking for a multi path.
+ AbstractContainerVector c = (AbstractContainerVector) v;
+ TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+ builder.intermediateType(v.getField().getType());
+ builder.hyper();
+ builder.addId(id);
+ return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild());
+
+ }else{
+ return TypedFieldId.newBuilder() //
+ .intermediateType(v.getField().getType()) //
+ .finalType(v.getField().getType()) //
+ .addId(id) //
+ .hyper() //
+ .build();
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index d93e258..439552f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -17,39 +17,58 @@
*/
package org.apache.drill.exec.record;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
-import com.beust.jcommander.internal.Lists;
+import com.google.hive12.common.collect.Lists;
public class MaterializedField{
- private final FieldDef def;
+ private SchemaPath path;
+ private MajorType type;
+ private List<MaterializedField> children = Lists.newArrayList();
- public MaterializedField(FieldDef def) {
- this.def = def;
+ private MaterializedField(SchemaPath path, MajorType type) {
+ super();
+ this.path = path;
+ this.type = type;
}
- public static MaterializedField create(FieldDef def){
- return new MaterializedField(def);
+ public static MaterializedField create(SerializedField serField){
+ return new MaterializedField(SchemaPath.create(serField.getNamePart()), serField.getMajorType());
+ }
+
+ public SerializedField.Builder getAsBuilder(){
+ return SerializedField.newBuilder() //
+ .setMajorType(type) //
+ .setNamePart(path.getAsNamePart());
+ }
+
+ public void addChild(MaterializedField field){
+ children.add(field);
}
public MaterializedField clone(FieldReference ref){
- return create(ref, def.getMajorType());
+ return create(ref, type);
+ }
+
+ public String getLastName(){
+ PathSegment seg = path.getRootSegment();
+ while(seg.getChild() != null) seg = seg.getChild();
+ return seg.getNameSegment().getPath();
+ }
+
+
+ // TODO: rewrite without as direct match rather than conversion then match.
+ public boolean matches(SerializedField field){
+ MaterializedField f = create(field);
+ return f.equals(this);
}
public static MaterializedField create(String path, MajorType type){
@@ -58,43 +77,20 @@ public class MaterializedField{
}
public static MaterializedField create(SchemaPath path, MajorType type) {
- FieldDef.Builder b = FieldDef.newBuilder();
- b.setMajorType(type);
- addSchemaPathToFieldDef(path, b);
- return create(b.build());
- }
-
- private static void addSchemaPathToFieldDef(SchemaPath path, FieldDef.Builder builder) {
- for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
- NamePart.Builder b = NamePart.newBuilder();
- if (p.isArray()) {
- b.setType(Type.ARRAY);
- } else {
- b.setName(p.getNameSegment().getPath().toString());
- b.setType(Type.NAME);
- }
- builder.addName(b.build());
- if(p.isLastPath()) break;
- }
+ return new MaterializedField(path, type);
}
- public FieldDef getDef() {
- return def;
+ public SchemaPath getPath(){
+ return path;
}
+ /**
+ * Get the schema path. Deprecated, use getPath() instead.
+ * @return the SchemaPath of this field.
+ */
+ @Deprecated
public SchemaPath getAsSchemaPath(){
- List<NamePart> nameList = Lists.newArrayList(def.getNameList());
- Collections.reverse(nameList);
- PathSegment seg = null;
- for(NamePart p : nameList){
- if(p.getType() == NamePart.Type.ARRAY){
- throw new UnsupportedOperationException();
- }else{
- seg = new NameSegment(p.getName(), seg);
- }
- }
- if( !(seg instanceof NameSegment) ) throw new UnsupportedOperationException();
- return new SchemaPath( (NameSegment) seg);
+ return path;
}
// public String getName(){
@@ -119,29 +115,29 @@ public class MaterializedField{
// }
public int getWidth() {
- return def.getMajorType().getWidth();
+ return type.getWidth();
}
public MajorType getType() {
- return def.getMajorType();
+ return type;
}
public int getScale() {
- return def.getMajorType().getScale();
+ return type.getScale();
}
public int getPrecision() {
- return def.getMajorType().getPrecision();
+ return type.getPrecision();
}
public boolean isNullable() {
- return def.getMajorType().getMode() == DataMode.OPTIONAL;
+ return type.getMode() == DataMode.OPTIONAL;
}
public DataMode getDataMode() {
- return def.getMajorType().getMode();
+ return type.getMode();
}
public MaterializedField getOtherNullableVersion(){
- MajorType mt = def.getMajorType();
+ MajorType mt = type;
DataMode newDataMode = null;
switch(mt.getMode()){
case OPTIONAL:
@@ -153,7 +149,7 @@ public class MaterializedField{
default:
throw new UnsupportedOperationException();
}
- return new MaterializedField(def.toBuilder().setMajorType(mt.toBuilder().setMode(newDataMode).build()).build());
+ return new MaterializedField(path, mt.toBuilder().setMode(newDataMode).build());
}
public Class<?> getValueClass() {
@@ -161,33 +157,19 @@ public class MaterializedField{
}
public boolean matches(SchemaPath path) {
- Iterator<NamePart> iter = def.getNameList().iterator();
+ if(!path.isSimplePath()) return false;
- for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
- if(p == null) break;
- if (!iter.hasNext()) return false;
- NamePart n = iter.next();
-
- if (p.isArray()) {
- if (n.getType() == Type.ARRAY) continue;
- return false;
- } else {
- if (p.getNameSegment().getPath().equalsIgnoreCase(n.getName())) continue;
- return false;
- }
-
- }
- // we've reviewed all path segments. confirm that we don't have any extra name parts.
- return !iter.hasNext();
+ return this.path.equals(path);
}
-
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((def == null) ? 0 : def.hashCode());
+ result = prime * result + ((children == null) ? 0 : children.hashCode());
+ result = prime * result + ((path == null) ? 0 : path.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
return result;
}
@@ -200,20 +182,30 @@ public class MaterializedField{
if (getClass() != obj.getClass())
return false;
MaterializedField other = (MaterializedField) obj;
- if (def == null) {
- if (other.def != null)
+ if (children == null) {
+ if (other.children != null)
+ return false;
+ } else if (!children.equals(other.children))
+ return false;
+ if (path == null) {
+ if (other.path != null)
+ return false;
+ } else if (!path.equals(other.path))
+ return false;
+ if (type == null) {
+ if (other.type != null)
return false;
- } else if (!def.equals(other.def))
+ } else if (!type.equals(other.type))
return false;
return true;
}
@Override
public String toString() {
- return "MaterializedField [" + def.toString() + "]";
+ return "MaterializedField [path=" + path + ", type=" + type + "]";
}
public String toExpr(){
- return this.getAsSchemaPath().toExpr();
+ return path.toExpr();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 31283c6..60fdd4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -27,7 +27,7 @@ import org.apache.drill.exec.vector.ValueVector;
* A record batch contains a set of field values for a particular range of records. In the case of a record batch
* composed of ValueVectors, ideally a batch fits within L2 cache (~256k per core). The set of value vectors do not
* change unless the next() IterOutcome is a *_NEW_SCHEMA type.
- *
+ *
* A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids
* provided utilizing getValueVectorId();
*/
@@ -56,21 +56,21 @@ public interface RecordBatch extends VectorAccessible {
/**
* Access the FragmentContext of the current query fragment. Useful for reporting failure information or other query
* level information.
- *
+ *
* @return
*/
public FragmentContext getContext();
/**
* Provide the schema of the current RecordBatch. This changes if and only if a *_NEW_SCHEMA IterOutcome is provided.
- *
+ *
* @return
*/
public BatchSchema getSchema();
/**
* Provide the number of records that are within this record count
- *
+ *
* @return
*/
public int getRecordCount();
@@ -89,7 +89,7 @@ public interface RecordBatch extends VectorAccessible {
* Get the value vector type and id for the given schema path. The TypedFieldId should store a fieldId which is the
* same as the ordinal position of the field within the Iterator provided this classes implementation of
* Iterable<ValueVector>.
- *
+ *
* @param path
* The path where the vector should be located.
* @return The local field id associated with this vector. If no field matches this path, this will return a null
@@ -97,24 +97,24 @@ public interface RecordBatch extends VectorAccessible {
*/
public abstract TypedFieldId getValueVectorId(SchemaPath path);
@Override
- public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
+ public abstract VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids);
/**
* Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
* IterOutcome.NONE, the consumer should no longer next(). Behavior at this point is undetermined and likely to throw
* an exception.
- *
+ *
* @return An IterOutcome describing the result of the iteration.
*/
public IterOutcome next();
/**
* Get a writable version of this batch. Takes over owernship of existing buffers.
- *
+ *
* @return
*/
public WritableBatch getWritableBatch();
-
+
public void cleanup();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ed450af..10d959f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -23,13 +23,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import javax.jdo.metadata.FieldMetadata;
+
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Maps;
@@ -63,25 +64,24 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
boolean schemaChanged = schema == null;
// logger.info("Load, ThreadID: {}", Thread.currentThread().getId(), new RuntimeException("For Stack Trace Only"));
// System.out.println("Load, ThreadId: " + Thread.currentThread().getId());
- Map<FieldDef, ValueVector> oldFields = Maps.newHashMap();
+ Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
for(VectorWrapper<?> w : container){
ValueVector v = w.getValueVector();
- oldFields.put(v.getField().getDef(), v);
+ oldFields.put(v.getField(), v);
}
VectorContainer newVectors = new VectorContainer();
- List<FieldMetadata> fields = def.getFieldList();
+ List<SerializedField> fields = def.getFieldList();
int bufOffset = 0;
- for (FieldMetadata fmd : fields) {
- FieldDef fieldDef = fmd.getDef();
+ for (SerializedField fmd : fields) {
+ MaterializedField fieldDef = MaterializedField.create(fmd);
ValueVector v = oldFields.remove(fieldDef);
if(v == null) {
// if we arrive here, we didn't have a matching vector.
schemaChanged = true;
- MaterializedField m = new MaterializedField(fieldDef);
- v = TypeHelper.getNewVector(m, allocator);
+ v = TypeHelper.getNewVector(fieldDef, allocator);
}
if (fmd.getValueCount() == 0){
v.clear();
@@ -136,8 +136,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
return valueCount;
}
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
- return container.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids){
+ return container.getValueAccessorById(clazz, ids);
}
public WritableBatch getWritableBatch(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index b7a8248..692fe62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -17,11 +17,15 @@
*/
package org.apache.drill.exec.record;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+import org.apache.drill.exec.vector.complex.MapVector;
public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class);
-
+
private T v;
public SimpleVectorWrapper(T v){
@@ -53,8 +57,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
public boolean isHyper() {
return false;
}
-
-
+
@SuppressWarnings("unchecked")
@Override
public VectorWrapper<T> cloneAndTransfer() {
@@ -71,4 +74,56 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v){
return new SimpleVectorWrapper<T>(v);
}
+
+
+ @Override
+ public VectorWrapper<?> getChildWrapper(int[] ids) {
+ if(ids.length == 1) return this;
+
+ ValueVector vector = v;
+
+ for(int i = 1; i < ids.length; i++){
+ MapVector map = (MapVector) vector;
+ vector = map.getVectorById(ids[i]);
+ }
+
+ return new SimpleVectorWrapper<ValueVector>(vector);
+ }
+
+ @Override
+ public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) {
+ if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null;
+ PathSegment seg = expectedPath.getRootSegment();
+
+ if(v instanceof AbstractContainerVector){
+ // we're looking for a multi path.
+ AbstractContainerVector c = (AbstractContainerVector) v;
+ TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+ builder.intermediateType(v.getField().getType());
+ builder.addId(id);
+ return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild());
+
+ }else{
+ TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+ builder.intermediateType(v.getField().getType());
+ builder.addId(id);
+ builder.finalType(v.getField().getType());
+ if(seg.isLastPath()){
+ return builder.build();
+ }else{
+ PathSegment child = seg.getChild();
+ if(child.isArray() && child.isLastPath()){
+ builder.remainder(child);
+ builder.withIndex();
+ return builder.build();
+ }else{
+ return null;
+ }
+
+ }
+
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
index ba2c7b2..9645be9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
@@ -23,5 +23,5 @@ public interface TransferPair {
public void transfer();
public void splitAndTransfer(int startIndex, int length);
public ValueVector getTo();
- public void copyValue(int from, int to);
+ public boolean copyValueSafe(int from, int to);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
index 0fbd0ae..24a8251 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
@@ -17,34 +17,174 @@
*/
package org.apache.drill.exec.record;
+import java.util.Arrays;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.carrotsearch.hppc.IntArrayList;
+import com.google.common.base.Preconditions;
public class TypedFieldId {
- final MajorType type;
- final int fieldId;
+ final MajorType finalType;
+ final MajorType secondaryFinal;
+ final MajorType intermediateType;
+ final int[] fieldIds;
final boolean isHyperReader;
+ final PathSegment remainder;
+
+ public TypedFieldId(MajorType type, int... fieldIds){
+ this(type, type, type, false, null, fieldIds);
+ }
- public TypedFieldId(MajorType type, int fieldId){
- this(type, fieldId, false);
+ public TypedFieldId(MajorType type, IntArrayList breadCrumb, PathSegment remainder){
+ this(type, type, type, false, remainder, breadCrumb.toArray());
}
-
- public TypedFieldId(MajorType type, int fieldId, boolean isHyper) {
+
+ public TypedFieldId(MajorType type, boolean isHyper, int... fieldIds){
+ this(type, type, type, isHyper, null, fieldIds);
+ }
+
+ public TypedFieldId(MajorType intermediateType, MajorType secondaryFinal, MajorType finalType, boolean isHyper, PathSegment remainder, int... fieldIds) {
super();
- this.type = type;
- this.fieldId = fieldId;
+ this.intermediateType = intermediateType;
+ this.finalType = finalType;
+ this.secondaryFinal = secondaryFinal;
+ this.fieldIds = fieldIds;
this.isHyperReader = isHyper;
+ this.remainder = remainder;
+ }
+
+
+
+ public TypedFieldId cloneWithChild(int id){
+ int[] fieldIds = ArrayUtils.add(this.fieldIds, id);
+ return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds);
+ }
+
+ public PathSegment getLastSegment(){
+ if(remainder == null) return null;
+ PathSegment seg = remainder;
+ while(seg.getChild() != null){
+ seg = seg.getChild();
+ }
+ return seg;
+ }
+
+ public TypedFieldId cloneWithRemainder(PathSegment remainder){
+ return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds);
+ }
+
+ public boolean hasRemainder(){
+ return remainder != null;
+ }
+
+ public PathSegment getRemainder(){
+ return remainder;
}
public boolean isHyperReader(){
return isHyperReader;
}
-
- public MajorType getType() {
- return type;
+
+ public MajorType getIntermediateType() {
+ return intermediateType;
+ }
+
+ public Class<? extends ValueVector> getIntermediateClass(){
+ return (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(intermediateType.getMinorType(), intermediateType.getMode());
+ }
+
+ public MajorType getFinalType(){
+ return finalType;
}
- public int getFieldId() {
- return fieldId;
+ public int[] getFieldIds() {
+ return fieldIds;
+ }
+
+
+
+ public MajorType getSecondaryFinal() {
+ return secondaryFinal;
+ }
+
+ public static Builder newBuilder(){
+ return new Builder();
+ }
+
+ public static class Builder{
+ final IntArrayList ids = new IntArrayList();
+ MajorType finalType;
+ MajorType intermediateType;
+ PathSegment remainder;
+ boolean hyperReader = false;
+ boolean withIndex = false;
+
+ public Builder addId(int id){
+ ids.add(id);
+ return this;
+ }
+
+ public Builder withIndex(){
+ withIndex = true;
+ return this;
+ }
+
+ public Builder remainder(PathSegment remainder){
+ this.remainder = remainder;
+ return this;
+ }
+
+ public Builder hyper(){
+ this.hyperReader = true;
+ return this;
+ }
+
+ public Builder finalType(MajorType finalType){
+ this.finalType = finalType;
+ return this;
+ }
+
+ public Builder intermediateType(MajorType intermediateType){
+ this.intermediateType = intermediateType;
+ return this;
+ }
+
+ public TypedFieldId build(){
+ Preconditions.checkNotNull(intermediateType);
+ Preconditions.checkNotNull(finalType);
+
+ if(intermediateType == null) intermediateType = finalType;
+ MajorType actualFinalType = finalType;
+ MajorType secondaryFinal = finalType;
+
+ // if this has an index, switch to required type for output
+ if(withIndex && intermediateType == finalType) actualFinalType = finalType.toBuilder().setMode(DataMode.REQUIRED).build();
+
+ // if this isn't a direct access, switch the final type to nullable as offsets may be null.
+ // TODO: there is a bug here with some things.
+ if(intermediateType != finalType) actualFinalType = finalType.toBuilder().setMode(DataMode.OPTIONAL).build();
+
+ return new TypedFieldId(intermediateType, secondaryFinal, actualFinalType, hyperReader, remainder, ids.toArray());
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(fieldIds);
+ result = prime * result + ((finalType == null) ? 0 : finalType.hashCode());
+ result = prime * result + ((intermediateType == null) ? 0 : intermediateType.hashCode());
+ result = prime * result + (isHyperReader ? 1231 : 1237);
+ result = prime * result + ((remainder == null) ? 0 : remainder.hashCode());
+ result = prime * result + ((secondaryFinal == null) ? 0 : secondaryFinal.hashCode());
+ return result;
}
@Override
@@ -56,20 +196,32 @@ public class TypedFieldId {
if (getClass() != obj.getClass())
return false;
TypedFieldId other = (TypedFieldId) obj;
- if (fieldId != other.fieldId)
+ if (!Arrays.equals(fieldIds, other.fieldIds))
return false;
- if (type == null) {
- if (other.type != null)
+ if (finalType == null) {
+ if (other.finalType != null)
return false;
- } else if (!type.equals(other.type))
+ } else if (!finalType.equals(other.finalType))
+ return false;
+ if (intermediateType == null) {
+ if (other.intermediateType != null)
+ return false;
+ } else if (!intermediateType.equals(other.intermediateType))
+ return false;
+ if (isHyperReader != other.isHyperReader)
+ return false;
+ if (remainder == null) {
+ if (other.remainder != null)
+ return false;
+ } else if (!remainder.equals(other.remainder))
+ return false;
+ if (secondaryFinal == null) {
+ if (other.secondaryFinal != null)
+ return false;
+ } else if (!secondaryFinal.equals(other.secondaryFinal))
return false;
return true;
}
- @Override
- public String toString() {
- return "TypedFieldId [type=" + type + ", fieldId=" + fieldId + ", isSuperReader=" + isHyperReader + "]";
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index a8100b2..474a0a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.vector.ValueVector;
* To change this template use File | Settings | File Templates.
*/
public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds);
public TypedFieldId getValueVectorId(SchemaPath path);
public BatchSchema getSchema();
public int getRecordCount();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 25289a8..1c7714e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -22,13 +22,15 @@ import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
import com.beust.jcommander.internal.Lists;
import com.google.common.base.Preconditions;
-public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccessible {
+public class VectorContainer extends AbstractMapVector implements Iterable<VectorWrapper<?>>, VectorAccessible {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
@@ -61,6 +63,10 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
add(vv, releasable);
}
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){
+ return null;
+ }
+
/**
* Get a set of transferred clones of this container. Note that this guarantees that the vectors in the cloned
* container have the same TypedFieldIds as the existing container, allowing interchangeability in generated code. In
@@ -94,7 +100,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
schema = null;
int i = wrappers.size();
wrappers.add(SimpleVectorWrapper.create(vv));
- return new TypedFieldId(vv.getField().getType(), i, false);
+ return new TypedFieldId(vv.getField().getType(), i);
}
public void add(ValueVector[] hyperVector) {
@@ -129,29 +135,33 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
public TypedFieldId getValueVectorId(SchemaPath path) {
for (int i = 0; i < wrappers.size(); i++) {
VectorWrapper<?> va = wrappers.get(i);
- SchemaPath w = va.getField().getAsSchemaPath();
- if (w.equals(path)){
- return new TypedFieldId(va.getField().getType(), i, va.isHyper());
+ TypedFieldId id = va.getFieldIdIfMatches(i, path);
+ if(id != null){
+ return id;
}
}
- if(path.getRootSegment().isNamed() && path.getRootSegment().getNameSegment().getPath().equals("_MAP") && path.getRootSegment().isLastPath()) throw new UnsupportedOperationException("Drill does not yet support map references.");
return null;
}
+
+
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- VectorWrapper<?> va = wrappers.get(fieldId);
- if(va!= null && clazz == null){
- return (VectorWrapper<?>) va;
- }
- if (va != null && va.getVectorClass() != clazz) {
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
+ Preconditions.checkArgument(fieldIds.length >= 1);
+ VectorWrapper<?> va = wrappers.get(fieldIds[0]);
+
+ if(va == null) return null;
+
+ if (fieldIds.length == 1 && clazz != null && !clazz.isAssignableFrom(va.getVectorClass())) {
throw new IllegalStateException(String.format(
"Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
}
- return (VectorWrapper<?>) va;
+
+ return (VectorWrapper<?>) va.getChildWrapper(fieldIds);
+
}
public BatchSchema getSchema() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index 401b50e..dc8ffe5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -17,11 +17,14 @@
*/
package org.apache.drill.exec.record;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.vector.ValueVector;
public interface VectorWrapper<T extends ValueVector> {
+
+
public Class<T> getVectorClass();
public MaterializedField getField();
public T getValueVector();
@@ -29,4 +32,11 @@ public interface VectorWrapper<T extends ValueVector> {
public boolean isHyper();
public void clear();
public VectorWrapper<T> cloneAndTransfer();
+ public VectorWrapper<?> getChildWrapper(int[] ids);
+
+ /**
+ * Traverse the object graph and determine whether the provided SchemaPath matches data within the Wrapper. If so, return a TypedFieldId associated with this path.
+ * @return TypedFieldId
+ */
+ public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath);
}