You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/12 04:48:30 UTC

[08/10] Add support for RepeatedMapVector, MapVector and RepeatedListVector.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 5f26054..039445b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -62,9 +62,9 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-  
+
   private static final boolean EXTRA_DEBUG_1 = false;
-  private static final boolean EXTRA_DEBUG_2 = false; 
+  private static final boolean EXTRA_DEBUG_2 = false;
   private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch.  This likely means that a single value is too long for a varlen field.";
   private boolean first = true;
   private boolean newSchema = false;
@@ -88,7 +88,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   List<VectorAllocator> wsAllocators = Lists.newArrayList();  // allocators for the workspace vectors
   ErrorCollector collector = new ErrorCollectorImpl();
-  
+
   private MaterializedField[] materializedValueFields;
   private boolean allFlushed = false;
 
@@ -102,13 +102,13 @@ public abstract class HashAggTemplate implements HashAggregator {
       aggrValuesContainer = new VectorContainer();
 
       ValueVector vector ;
-      
-      for(int i = 0; i < materializedValueFields.length; i++) { 
+
+      for(int i = 0; i < materializedValueFields.length; i++) {
         MaterializedField outputField = materializedValueFields[i];
         // Create a type-specific ValueVector for this value
         vector = TypeHelper.getNewVector(outputField, allocator) ;
         VectorAllocator.getAllocator(vector, 50 /* avg. width */).alloc(HashTable.BATCH_SIZE) ;
-        
+
         aggrValuesContainer.add(vector) ;
       }
 
@@ -124,8 +124,8 @@ public abstract class HashAggTemplate implements HashAggregator {
       setupInterior(incoming, outgoing, aggrValuesContainer);
     }
 
-    private boolean outputValues() { 
-      for (int i = 0; i <= maxOccupiedIdx; i++) { 
+    private boolean outputValues() {
+      for (int i = 0; i <= maxOccupiedIdx; i++) {
         if (outputRecordValues(i, outputCount) ) {
           if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", outputCount) ;
           outputCount++;
@@ -139,7 +139,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     private void clear() {
       aggrValuesContainer.clear();
     }
-    
+
     // Code-generated methods (implemented in HashAggBatch)
 
     @RuntimeOverridden
@@ -155,19 +155,19 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   @Override
   public void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, RecordBatch outgoing,
-                    LogicalExpression[] valueExprs, 
+                    LogicalExpression[] valueExprs,
                     List<TypedFieldId> valueFieldIds,
                     TypedFieldId[] groupByOutFieldIds,
-                    VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators) 
+                    VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators)
     throws SchemaChangeException, ClassTransformationException, IOException {
-    
+
     if (valueExprs == null || valueFieldIds == null) {
       throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
     }
     if (valueFieldIds.size() < valueExprs.length) {
       throw new IllegalArgumentException("Wrong number of workspace variables.");
     }
-     
+
     this.context = context;
     this.allocator = allocator;
     this.incoming = incoming;
@@ -175,11 +175,11 @@ public abstract class HashAggTemplate implements HashAggregator {
     this.keyAllocators = keyAllocators;
     this.valueAllocators = valueAllocators;
     this.outgoing = outgoing;
-    
+
     this.hashAggrConfig = hashAggrConfig;
 
-    // currently, hash aggregation is only applicable if there are group-by expressions. 
-    // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no 
+    // currently, hash aggregation is only applicable if there are group-by expressions.
+    // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no
     // need to create hash table.  However, for plain aggregations with DISTINCT ..
     //      e.g SELECT COUNT(DISTINCT a1) FROM t1 ;
     // we need to build a hash table on the aggregation column a1.
@@ -188,14 +188,14 @@ public abstract class HashAggTemplate implements HashAggregator {
       throw new IllegalArgumentException("Currently, hash aggregation is only applicable if there are group-by expressions.");
     }
 
-    this.htIdxHolder = new IntHolder(); 
+    this.htIdxHolder = new IntHolder();
     materializedValueFields = new MaterializedField[valueFieldIds.size()];
 
     if (valueFieldIds.size() > 0) {
       int i = 0;
-      FieldReference ref = new FieldReference("dummy", ExpressionPosition.UNKNOWN, valueFieldIds.get(0).getType());
+      FieldReference ref = new FieldReference("dummy", ExpressionPosition.UNKNOWN, valueFieldIds.get(0).getIntermediateType());
       for (TypedFieldId id : valueFieldIds) {
-        materializedValueFields[i++] = MaterializedField.create(ref, id.getType());
+        materializedValueFields[i++] = MaterializedField.create(ref, id.getIntermediateType());
       }
     }
 
@@ -203,7 +203,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ;
 
     batchHolders = new ArrayList<BatchHolder>();
-    addBatchHolder(); 
+    addBatchHolder();
 
     doSetup(incoming);
   }
@@ -211,21 +211,21 @@ public abstract class HashAggTemplate implements HashAggregator {
   @Override
   public AggOutcome doWork() {
     try{
-      // Note: Keeping the outer and inner try blocks here to maintain some similarity with 
-      // StreamingAggregate which does somethings conditionally in the outer try block. 
+      // Note: Keeping the outer and inner try blocks here to maintain some similarity with
+      // StreamingAggregate which does somethings conditionally in the outer try block.
       // In the future HashAggregate may also need to perform some actions conditionally
-      // in the outer try block. 
+      // in the outer try block.
 
       outside: while(true) {
         // loop through existing records, aggregating the values as necessary.
         if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
         for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
           if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
-          checkGroupAndAggrValues(currentIndex); 
+          checkGroupAndAggrValues(currentIndex);
         }
 
         if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex);
-        
+
         try{
 
           while(true){
@@ -239,10 +239,10 @@ public abstract class HashAggTemplate implements HashAggregator {
             case NOT_YET:
               this.outcome = out;
               return AggOutcome.RETURN_OUTCOME;
-              
+
             case OK_NEW_SCHEMA:
               if(EXTRA_DEBUG_1) logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
-              newSchema = true;              
+              newSchema = true;
               this.cleanup();
               // TODO: new schema case needs to be handled appropriately
               return AggOutcome.UPDATE_AGGREGATOR;
@@ -254,20 +254,20 @@ public abstract class HashAggTemplate implements HashAggregator {
               } else {
                 checkGroupAndAggrValues(currentIndex);
                 incIndex();
-                
+
                 if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop");
                 continue outside;
               }
 
             case NONE:
               outcome = out;
-              outputKeysAndValues() ; 
-              
+              outputKeysAndValues() ;
+
               // cleanup my internal state since there is nothing more to return
               this.cleanup();
               // cleanup incoming batch since output of aggregation does not need
               // any references to the incoming
-              
+
               incoming.cleanup();
               return setOkAndReturn();
 
@@ -294,7 +294,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     // now otherwise downstream operators will break.
     // TODO: allow outputting arbitrarily large number of records in batches
     assert (numGroupedRecords < Character.MAX_VALUE);
-    
+
     for (VectorAllocator a : keyAllocators) {
       if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords);
       a.alloc(numGroupedRecords);
@@ -320,14 +320,14 @@ public abstract class HashAggTemplate implements HashAggregator {
   public void cleanup(){
     htable.clear();
     htable = null;
-    htIdxHolder = null; 
+    htIdxHolder = null;
     materializedValueFields = null;
 
     for (BatchHolder bh : batchHolders) {
       bh.clear();
     }
     batchHolders.clear();
-    batchHolders = null; 
+    batchHolders = null;
   }
 
   private AggOutcome tooBigFailure(){
@@ -335,7 +335,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     this.outcome = IterOutcome.STOP;
     return AggOutcome.CLEANUP_AND_RETURN;
   }
-  
+
   private final AggOutcome setOkAndReturn(){
     if(first){
       this.outcome = IterOutcome.OK_NEW_SCHEMA;
@@ -356,20 +356,20 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
     currentIndex = getVectorIndex(underlyingIndex);
   }
-  
+
   private final void resetIndex(){
     underlyingIndex = -1;
     incIndex();
   }
 
   private void addBatchHolder() {
-    BatchHolder bh = new BatchHolder(); 
+    BatchHolder bh = new BatchHolder();
     batchHolders.add(bh);
 
     if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
 
     int batchIdx = batchHolders.size() - 1;
-    bh.setup(batchIdx); 
+    bh.setup(batchIdx);
   }
 
   private boolean outputKeysAndValues() {
@@ -392,20 +392,20 @@ public abstract class HashAggTemplate implements HashAggregator {
     return allFlushed;
   }
 
-  // Check if a group is present in the hash table; if not, insert it in the hash table. 
-  // The htIdxHolder contains the index of the group in the hash table container; this same 
-  // index is also used for the aggregation values maintained by the hash aggregate. 
+  // Check if a group is present in the hash table; if not, insert it in the hash table.
+  // The htIdxHolder contains the index of the group in the hash table container; this same
+  // index is also used for the aggregation values maintained by the hash aggregate.
   private boolean checkGroupAndAggrValues(int incomingRowIdx) {
     if (incomingRowIdx < 0) {
       throw new IllegalArgumentException("Invalid incoming row index.");
     }
 
-    /** for debugging 
+    /** for debugging
     Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
     BigIntVector vv0 = null;
     BigIntHolder holder = null;
 
-    if (tmp != null) { 
+    if (tmp != null) {
       vv0 = ((BigIntVector) tmp);
       holder = new BigIntHolder();
       holder.value = vv0.getAccessor().get(incomingRowIdx) ;
@@ -432,7 +432,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         //  logger.debug("group-by key = {} already present at hash table index = {}", holder.value, currentIdx) ;
         //}
 
-      } 
+      }
       else if (putStatus == HashTable.PutStatus.KEY_ADDED) {
         if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ;
 
@@ -441,17 +441,17 @@ public abstract class HashAggTemplate implements HashAggregator {
         //  logger.debug("group-by key = {} added at hash table index = {}", holder.value, currentIdx) ;
         //}
       }
-      
+
       if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
         numGroupedRecords++;
         return true;
       }
-      
-    } 
+
+    }
 
     return false;
   }
- 
+
   // Code-generated methods (implemented in HashAggBatch)
   public abstract void doSetup(@Named("incoming") RecordBatch incoming);
   public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index 34845b3..3e6def1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -28,12 +28,12 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 
 public class InternalBatch implements Iterable<VectorWrapper<?>>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InternalBatch.class);
-  
+
   private final VectorContainer container;
   private final BatchSchema schema;
   private final SelectionVector2 sv2;
   private final SelectionVector4 sv4;
-  
+
   public InternalBatch(RecordBatch incoming){
     switch(incoming.getSchema().getSelectionVectorMode()){
     case FOUR_BYTE:
@@ -42,7 +42,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
       break;
     case TWO_BYTE:
       this.sv4 = null;
-      this.sv2 = incoming.getSelectionVector2().clone(); 
+      this.sv2 = incoming.getSelectionVector2().clone();
       break;
     default:
       this.sv4 = null;
@@ -74,9 +74,9 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
     if(sv4 != null) sv4.clear();
     container.clear();
   }
-  
-  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
-    return container.getValueAccessorById(fieldId, clazz);
+
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds){
+    return container.getValueAccessorById(clazz, fieldIds);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 883052a..72d0462 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -348,8 +348,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                 container.add(v);
                 allocators.add(RemovingRecordBatch.getAllocator4(v));
 
-                JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+                JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
+                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId));
                 g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
                   .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
                   .arg(outIndex)
@@ -376,8 +376,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                 container.add(v);
                 allocators.add(RemovingRecordBatch.getAllocator4(v));
 
-                JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false));
+                JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), false, fieldId));
+                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, outputFieldId));
 
                 g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index baa232e..c07878a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -43,7 +43,7 @@ public final class JoinStatus {
   private int rightPosition;
   private int svRightPosition;
   private IterOutcome lastRight;
-  
+
   private int outputPosition;
   public RightSourceMode rightSourceMode = RightSourceMode.INCOMING;
   public MergeJoinBatch outputBatch;
@@ -54,7 +54,7 @@ public final class JoinStatus {
   public boolean ok = true;
   private boolean initialSet = false;
   private boolean leftRepeating = false;
-  
+
   public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) {
     super();
     this.left = left;
@@ -70,7 +70,7 @@ public final class JoinStatus {
       initialSet = true;
     }
   }
-  
+
   public final void advanceLeft(){
     leftPosition++;
   }
@@ -90,6 +90,10 @@ public final class JoinStatus {
     return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
   }
 
+  public final int getRightCount(){
+    return right.getRecordCount();
+  }
+
   public final void setRightPosition(int pos) {
     rightPosition = pos;
   }
@@ -176,7 +180,7 @@ public final class JoinStatus {
     }
     if(b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) b.getSelectionVector2().clear();
   }
-  
+
   /**
    * Check if the left record position can advance by one in the current batch.
    */
@@ -230,5 +234,5 @@ public final class JoinStatus {
   private boolean eitherMatches(IterOutcome outcome){
     return lastLeft == outcome || lastRight == outcome;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index f43934e..af0d378 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -97,8 +97,8 @@ public abstract class JoinTemplate implements JoinWorker {
           while (status.isLeftPositionAllowed()) {
             if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
               return false;
-            
-            status.incOutputPos();  
+
+            status.incOutputPos();
             status.advanceLeft();
           }
         }
@@ -113,7 +113,7 @@ public abstract class JoinTemplate implements JoinWorker {
       case -1:
         // left key < right key
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
-          if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) 
+          if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
             return false;
           status.incOutputPos();
         }
@@ -135,7 +135,7 @@ public abstract class JoinTemplate implements JoinWorker {
                  doCompareNextLeftKey(status.getLeftPosition()) != 0)
           // this record marks the end of repeated keys
           status.notifyLeftStoppedRepeating();
-        
+
         boolean crossedBatchBoundaries = false;
         int initialRightPosition = status.getRightPosition();
         do {
@@ -143,11 +143,11 @@ public abstract class JoinTemplate implements JoinWorker {
           if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
             return false;
 
-          if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) 
+          if (!doCopyRight(status.getRightPosition(), status.getOutPosition()))
             return false;
-          
+
           status.incOutputPos();
-          
+
           // If the left key has duplicates and we're about to cross a boundary in the right batch, add the
           // right table's record batch to the sv4 builder before calling next.  These records will need to be
           // copied again for each duplicate left key.
@@ -170,7 +170,7 @@ public abstract class JoinTemplate implements JoinWorker {
           status.notifyLeftStoppedRepeating();
         } else if (status.isLeftRepeating() && crossedBatchBoundaries) {
           try {
-            // build the right batches and 
+            // build the right batches and
             status.outputBatch.batchBuilder.build();
             status.setSV4AdvanceMode();
           } catch (SchemaChangeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index a2c424f..3d496d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -350,10 +350,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   public TypedFieldId getValueVectorId(SchemaPath path) {
     return outgoingContainer.getValueVectorId(path);
   }
-  
+
   @Override
-  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
-    return outgoingContainer.getValueAccessorById(fieldId, clazz);
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return outgoingContainer.getValueAccessorById(clazz, ids);
   }
 
   @Override
@@ -373,7 +373,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
   /**
    * Creates a generate class which implements the copy and compare methods.
-   * 
+   *
    * @return instance of a new merger based on generated code
    * @throws SchemaChangeException
    */
@@ -443,8 +443,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         // declare incoming value vector and assign it to the array
         JVar inVV = cg.declareVectorValueSetupAndMember("incomingBatches[" + batchIdx + "]",
           new TypedFieldId(vv.getField().getType(),
-            fieldIdx,
-            false));
+            false,
+            fieldIdx));
 
         // add vv to initialization list (e.g. { vv1, vv2, vv3 } )
         incomingVectorInitBatch.add(inVV);
@@ -501,11 +501,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType();
         Class cmpVectorClass = TypeHelper.getValueVectorClass(minor, mode);
 
+        JExpression arr = JExpr.newArray(cg.getModel().INT).add(JExpr.lit(vvRead.getFieldId().getFieldIds()[0]));
         comparisonVectorInitBatch.add(
             ((JExpression) incomingBatchesVar.component(JExpr.lit(b)))
                .invoke("getValueAccessorById")
-                 .arg(JExpr.lit(vvRead.getFieldId().getFieldId()))
                  .arg(cg.getModel()._ref(cmpVectorClass).boxify().dotclass())
+                 .arg(arr)
                    .invoke("getValueVector"));
 
       }
@@ -583,8 +584,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       // declare outgoing value vectors
       JVar outgoingVV = cg.declareVectorValueSetupAndMember("outgoingBatch",
                                                             new TypedFieldId(vvOut.getField().getType(),
-                                                                             fieldIdx,
-                                                                             vvOut.isHyper()));
+                                                                             vvOut.isHyper(), fieldIdx));
 
       // assign to the appropriate slot in the outgoingVector array (in order of iteration)
       cg.getSetupBlock().assign(outgoingVectors.component(JExpr.lit(fieldIdx)), outgoingVV);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
index dd7011a..3398443 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
@@ -82,7 +82,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
     this.svMode = incoming.getSchema().getSelectionVectorMode();
     this.outBatch = outgoing;
     this.outputField = outputField;
-    partitionValues = (IntVector) outBatch.getValueAccessorById(outBatch.getValueVectorId(outputField).getFieldId(), IntVector.class).getValueVector();
+    partitionValues = (IntVector) outBatch.getValueAccessorById(IntVector.class, outBatch.getValueVectorId(outputField).getFieldIds()).getValueVector();
     switch(svMode){
     case FOUR_BYTE:
     case TWO_BYTE:
@@ -98,7 +98,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
   public abstract int doEval(@Named("inIndex") int inIndex, @Named("partitionIndex") int partitionIndex);
 
 
-  
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 6e115a7..deef25f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -219,8 +219,8 @@ public class OutgoingRecordBatch implements VectorAccessible {
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
-    return vectorContainer.getValueAccessorById(fieldId, clazz);
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
+    return vectorContainer.getValueAccessorById(clazz, fieldIds);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 6048085..bcd484c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -94,7 +94,7 @@ public class PartitionSenderRootExec implements RootExec {
 
     if (!ok) {
       stop();
-      
+
       return false;
     }
 
@@ -153,7 +153,7 @@ public class PartitionSenderRootExec implements RootExec {
   }
 
 
-  
+
   private void generatePartitionFunction() throws SchemaChangeException {
 
     LogicalExpression filterExpression = operator.getExpr();
@@ -166,7 +166,7 @@ public class PartitionSenderRootExec implements RootExec {
     }
 
     cg.addExpr(new ReturnValueExpression(expr));
-    
+
     try {
       Partitioner p = context.getImplementationClass(cg);
       p.setup(context, incoming, outgoing);
@@ -214,7 +214,7 @@ public class PartitionSenderRootExec implements RootExec {
                                           "outgoingVectors");
 
     // create 2d array and build initialization list.  For example:
-    //     outgoingVectors = new ValueVector[][] { 
+    //     outgoingVectors = new ValueVector[][] {
     //                              new ValueVector[] {vv1, vv2},
     //                              new ValueVector[] {vv3, vv4}
     //                       });
@@ -229,8 +229,8 @@ public class PartitionSenderRootExec implements RootExec {
         // declare outgoing value vector and assign it to the array
         JVar outVV = cg.declareVectorValueSetupAndMember("outgoing[" + batchId + "]",
                                                          new TypedFieldId(vv.getField().getType(),
-                                                                          fieldId,
-                                                                          false));
+                                                                          false,
+                                                                          fieldId));
         // add vv to initialization list (e.g. { vv1, vv2, vv3 } )
         outgoingVectorInitBatch.add(outVV);
         ++fieldId;
@@ -248,8 +248,8 @@ public class PartitionSenderRootExec implements RootExec {
     for (VectorWrapper<?> vvIn : incoming) {
       // declare incoming value vectors
       JVar incomingVV = cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(),
-                                                                                         fieldId,
-                                                                                         vvIn.isHyper()));
+          vvIn.isHyper(),
+          fieldId));
 
       // generate the copyFrom() invocation with explicit cast to the appropriate type
       Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(),
@@ -307,7 +307,7 @@ public class PartitionSenderRootExec implements RootExec {
       }
     }
   }
-  
+
   public void stop() {
     logger.debug("Partition sender stopping.");
     ok = false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 347092a..b94f403 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -20,9 +20,7 @@ package org.apache.drill.exec.physical.impl.project;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
-import com.sun.codemodel.JExpr;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.FieldReference;
@@ -31,7 +29,6 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -44,7 +41,6 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
@@ -52,12 +48,13 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import com.carrotsearch.hppc.IntOpenHashSet;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
 
 public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -92,6 +89,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
     int incomingRecordCount = incoming.getRecordCount();
     for(ValueVector v : this.allocationVectors){
       AllocationHelper.allocate(v, incomingRecordCount, 250);
+//      v.allocateNew();
     }
     int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
     if (outputRecords < incomingRecordCount) {
@@ -177,14 +175,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
 
     final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
 
-    Set<Integer> transferFieldIds = new HashSet();
+    IntOpenHashSet transferFieldIds = new IntOpenHashSet();
 
     boolean isAnyWildcard = isAnyWildcard(exprs);
 
     if(isAnyWildcard){
       for(VectorWrapper<?> wrapper : incoming){
         ValueVector vvIn = wrapper.getValueVector();
-        String name = vvIn.getField().getDef().getName(vvIn.getField().getDef().getNameCount() - 1).getName();
+
+        String name = vvIn.getField().getPath().getLastSegment().getNameSegment().getPath();
         FieldReference ref = new FieldReference(name);
         TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
         transfers.add(tp);
@@ -202,17 +201,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
 
         // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
         if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+                && !((ValueVectorReadExpression) expr).hasReadPath()
                 && !isAnyWildcard
-                &&!transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldId())
-                && !((ValueVectorReadExpression) expr).isArrayElement()) {
+                && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])
+                && !((ValueVectorReadExpression) expr).hasReadPath()) {
           ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
-          ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
+          TypedFieldId id = vectorRead.getFieldId();
+          ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
           Preconditions.checkNotNull(incoming);
 
           TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
           transfers.add(tp);
           container.add(tp.getTo());
-          transferFieldIds.add(vectorRead.getFieldId().getFieldId());
+          transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
 //          logger.debug("Added transfer.");
         }else{
           // need to do evaluation.
@@ -221,6 +222,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
           TypedFieldId fid = container.add(vector);
           ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
           HoldingContainer hc = cg.addExpr(write);
+
+
           cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
           logger.debug("Added eval.");
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 60e5993..aa0ecf6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -33,12 +33,12 @@ import com.google.common.collect.ImmutableList;
 
 public abstract class ProjectorTemplate implements Projector {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectorTemplate.class);
-  
+
   private ImmutableList<TransferPair> transfers;
   private SelectionVector2 vector2;
   private SelectionVector4 vector4;
   private SelectionVectorMode svMode;
-  
+
   public ProjectorTemplate() throws SchemaChangeException{
   }
 
@@ -47,18 +47,18 @@ public abstract class ProjectorTemplate implements Projector {
     switch(svMode){
     case FOUR_BYTE:
       throw new UnsupportedOperationException();
-      
-      
+
+
     case TWO_BYTE:
       final int count = recordCount;
       for(int i = 0; i < count; i++, firstOutputIndex++){
         doEval(vector2.getIndex(i), firstOutputIndex);
       }
       return recordCount;
-      
-      
+
+
     case NONE:
-      
+
       final int countN = recordCount;
       int i;
       for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) {
@@ -76,8 +76,9 @@ public abstract class ProjectorTemplate implements Projector {
           t.transfer();
       }
       return recordCount;
-      
-      
+
+
+
     default:
       throw new UnsupportedOperationException();
     }
@@ -86,7 +87,7 @@ public abstract class ProjectorTemplate implements Projector {
   @Override
   public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException{
 
-    this.svMode = incoming.getSchema().getSelectionVectorMode(); 
+    this.svMode = incoming.getSchema().getSelectionVectorMode();
     switch(svMode){
     case FOUR_BYTE:
       this.vector4 = incoming.getSelectionVector4();
@@ -103,7 +104,7 @@ public abstract class ProjectorTemplate implements Projector {
   public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
 
-  
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 4018991..62af0b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -86,7 +86,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
     default:
       throw new UnsupportedOperationException();
     }
-    
+
     container.buildSchema(SelectionVectorMode.NONE);
 
   }
@@ -156,12 +156,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   public void cleanup(){
     super.cleanup();
   }
-  
+
   private class StraightCopier implements Copier{
 
     private List<TransferPair> pairs = Lists.newArrayList();
     private List<ValueVector> out = Lists.newArrayList();
-    
+
     @Override
     public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators){
       for(VectorWrapper<?> vv : incoming){
@@ -183,7 +183,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
     public List<ValueVector> getOut() {
       return out;
     }
-    
+
   }
 
   private Copier getStraightCopier(){
@@ -192,10 +192,10 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
     container.addCollection(copier.getOut());
     return copier;
   }
-  
+
   private Copier getGenerated2Copier() throws SchemaChangeException{
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
-    
+
     List<VectorAllocator> allocators = Lists.newArrayList();
     for(VectorWrapper<?> i : incoming){
       ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
@@ -218,12 +218,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
     return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this);
   }
-  
+
   public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{
 
     List<VectorAllocator> allocators = Lists.newArrayList();
     for(VectorWrapper<?> i : batch){
-      
+
       ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
       container.add(v);
       allocators.add(getAllocator4(v));
@@ -239,20 +239,20 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
   }
-  
+
   public static void generateCopies(ClassGenerator g, VectorAccessible batch, boolean hyper){
     // we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all.
     int fieldId = 0;
-    
+
     JExpression inIndex = JExpr.direct("inIndex");
     JExpression outIndex = JExpr.direct("outIndex");
     g.rotateBlock();
     for(VectorWrapper<?> vv : batch){
-      JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), fieldId, vv.isHyper()));
-      JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+      JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId));
+      JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId));
 
       if(hyper){
-        
+
         g.getEvalBlock()._if(
             outVV
             .invoke("copyFromSafe")
@@ -268,20 +268,20 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
       }else{
         g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
       }
-      
-      
+
+
       fieldId++;
     }
     g.rotateBlock();
     g.getEvalBlock()._return(JExpr.TRUE);
   }
-  
+
 
   @Override
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);
   }
-  
+
   public static VectorAllocator getAllocator4(ValueVector outgoing){
     if(outgoing instanceof FixedWidthVector){
       return new FixedVectorAllocator((FixedWidthVector) outgoing);
@@ -291,6 +291,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
       throw new UnsupportedOperationException();
     }
   }
-  
-  
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 8d3a3e5..f96a1bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -100,9 +100,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
     validateReadState();
-    return incoming.getValueAccessorById(fieldId, clazz);
+    return incoming.getValueAccessorById(clazz, ids);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index d87a9f5..a546852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -242,8 +242,8 @@ public class BatchGroup implements VectorAccessible {
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
-    return currentContainer.getValueAccessorById(fieldId, clazz);
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return currentContainer.getValueAccessorById(clazz, ids);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 930f851..4b6c37d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -373,12 +373,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       ValueVector[] vectors = new ValueVector[batchGroupList.size() * 2];
       int i = 0;
       for (BatchGroup group : batchGroupList) {
-        vectors[i++] = group.getValueAccessorById(group.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
-                field.getValueClass()).getValueVector();
+        vectors[i++] = group.getValueAccessorById(
+            field.getValueClass(),
+            group.getValueVectorId(field.getPath()).getFieldIds()
+                ).getValueVector();
         if (group.hasSecond()) {
           VectorContainer c = group.getSecondContainer();
-          vectors[i++] = c.getValueAccessorById(c.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
-                  field.getValueClass()).getValueVector();
+          vectors[i++] = c.getValueAccessorById(
+              field.getValueClass(),
+              c.getValueVectorId(field.getPath()).getFieldIds()
+                  ).getValueVector();
         } else {
           vectors[i] = vectors[i - 1].getTransferPair().getTo(); //this vector should never be used. Just want to avoid having null elements in the hyper vector
           i++;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 214f81c..844d6db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 
 public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
-  
+
   protected final VectorContainer container = new VectorContainer();
   protected final T popConfig;
   protected final FragmentContext context;
@@ -43,7 +43,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     this.popConfig = popConfig;
     this.oContext = new OperatorContext(popConfig, context);
   }
-  
+
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
     return container.iterator();
@@ -67,14 +67,14 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   public void kill() {
     killIncoming();
   }
-  
+
   protected abstract void killIncoming();
-  
+
   public void cleanup(){
     container.clear();
     oContext.close();
   }
- 
+
   @Override
   public SelectionVector2 getSelectionVector2() {
     throw new UnsupportedOperationException();
@@ -91,16 +91,16 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
-    return container.getValueAccessorById(fieldId, clazz);
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return container.getValueAccessorById(clazz, ids);
   }
 
-  
+
   @Override
   public WritableBatch getWritableBatch() {
 //    logger.debug("Getting writable batch.");
     WritableBatch batch = WritableBatch.get(this);
     return batch;
-    
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index a6a4621..b44a233 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -20,21 +20,25 @@ package org.apache.drill.exec.record;
 import java.lang.reflect.Array;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+import org.apache.drill.exec.vector.complex.MapVector;
 
 
 public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
-  
+
   private T[] vectors;
   private MaterializedField f;
   private final boolean releasable;
-  
+
   public HyperVectorWrapper(MaterializedField f, T[] v){
     this(f, v, true);
   }
-  
+
   public HyperVectorWrapper(MaterializedField f, T[] v, boolean releasable){
     assert(v.length > 0);
     this.f = f;
@@ -72,9 +76,51 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
   public void clear() {
     if(!releasable) return;
     for(T x : vectors){
-      x.clear();  
+      x.clear();
+    }
+  }
+
+  @Override
+  public VectorWrapper<?> getChildWrapper(int[] ids) {
+    if(ids.length == 1) return this;
+
+    ValueVector[] vectors = new ValueVector[this.vectors.length];
+    int index = 0;
+
+    for(ValueVector v : this.vectors){
+      ValueVector vector = v;
+      for(int i = 1; i < ids.length; i++){
+        MapVector map = (MapVector) vector;
+        vector = map.getVectorById(ids[i]);
+      }
+      vectors[index] = vector;
+      index++;
+    }
+    return new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors);
+  }
+
+  @Override
+  public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) {
+    ValueVector v = vectors[0];
+    if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null;
+
+    if(v instanceof AbstractContainerVector){
+      // we're looking for a multi path.
+      AbstractContainerVector c = (AbstractContainerVector) v;
+      TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+      builder.intermediateType(v.getField().getType());
+      builder.hyper();
+      builder.addId(id);
+      return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild());
+
+    }else{
+      return TypedFieldId.newBuilder() //
+          .intermediateType(v.getField().getType()) //
+          .finalType(v.getField().getType()) //
+          .addId(id) //
+          .hyper() //
+          .build();
     }
-    
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index d93e258..439552f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -17,39 +17,58 @@
  */
 package org.apache.drill.exec.record;
 
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 
-import com.beust.jcommander.internal.Lists;
+import com.google.hive12.common.collect.Lists;
 
 public class MaterializedField{
-  private final FieldDef def;
+  private SchemaPath path;
+  private MajorType type;
+  private List<MaterializedField> children = Lists.newArrayList();
 
-  public MaterializedField(FieldDef def) {
-    this.def = def;
+  private MaterializedField(SchemaPath path, MajorType type) {
+    super();
+    this.path = path;
+    this.type = type;
   }
 
-  public static MaterializedField create(FieldDef def){
-    return new MaterializedField(def);
+  public static MaterializedField create(SerializedField serField){
+    return new MaterializedField(SchemaPath.create(serField.getNamePart()), serField.getMajorType());
+  }
+
+  public SerializedField.Builder getAsBuilder(){
+    return SerializedField.newBuilder() //
+        .setMajorType(type) //
+        .setNamePart(path.getAsNamePart());
+  }
+
+  public void addChild(MaterializedField field){
+    children.add(field);
   }
 
   public MaterializedField clone(FieldReference ref){
-    return create(ref, def.getMajorType());
+    return create(ref, type);
+  }
+
+  public String getLastName(){
+    PathSegment seg = path.getRootSegment();
+    while(seg.getChild() != null) seg = seg.getChild();
+    return seg.getNameSegment().getPath();
+  }
+
+
+  // TODO: rewrite without as direct match rather than conversion then match.
+  public boolean matches(SerializedField field){
+    MaterializedField f = create(field);
+    return f.equals(this);
   }
 
   public static MaterializedField create(String path, MajorType type){
@@ -58,43 +77,20 @@ public class MaterializedField{
   }
 
   public static MaterializedField create(SchemaPath path, MajorType type) {
-    FieldDef.Builder b = FieldDef.newBuilder();
-    b.setMajorType(type);
-    addSchemaPathToFieldDef(path, b);
-    return create(b.build());
-  }
-
-  private static void addSchemaPathToFieldDef(SchemaPath path, FieldDef.Builder builder) {
-    for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
-      NamePart.Builder b = NamePart.newBuilder();
-      if (p.isArray()) {
-        b.setType(Type.ARRAY);
-      } else {
-        b.setName(p.getNameSegment().getPath().toString());
-        b.setType(Type.NAME);
-      }
-      builder.addName(b.build());
-      if(p.isLastPath()) break;
-    }
+    return new MaterializedField(path, type);
   }
 
-  public FieldDef getDef() {
-    return def;
+  public SchemaPath getPath(){
+    return path;
   }
 
+  /**
+   * Get the schema path.  Deprecated, use getPath() instead.
+   * @return the SchemaPath of this field.
+   */
+  @Deprecated
   public SchemaPath getAsSchemaPath(){
-    List<NamePart> nameList = Lists.newArrayList(def.getNameList());
-    Collections.reverse(nameList);
-    PathSegment seg = null;
-    for(NamePart p : nameList){
-      if(p.getType() == NamePart.Type.ARRAY){
-        throw new UnsupportedOperationException();
-      }else{
-        seg = new NameSegment(p.getName(), seg);
-      }
-    }
-    if( !(seg instanceof NameSegment) ) throw new UnsupportedOperationException();
-    return new SchemaPath( (NameSegment) seg);
+    return path;
   }
 
 //  public String getName(){
@@ -119,29 +115,29 @@ public class MaterializedField{
 //  }
 
   public int getWidth() {
-    return def.getMajorType().getWidth();
+    return type.getWidth();
   }
 
   public MajorType getType() {
-    return def.getMajorType();
+    return type;
   }
 
   public int getScale() {
-      return def.getMajorType().getScale();
+      return type.getScale();
   }
   public int getPrecision() {
-      return def.getMajorType().getPrecision();
+      return type.getPrecision();
   }
   public boolean isNullable() {
-    return def.getMajorType().getMode() == DataMode.OPTIONAL;
+    return type.getMode() == DataMode.OPTIONAL;
   }
 
   public DataMode getDataMode() {
-    return def.getMajorType().getMode();
+    return type.getMode();
   }
 
   public MaterializedField getOtherNullableVersion(){
-    MajorType mt = def.getMajorType();
+    MajorType mt = type;
     DataMode newDataMode = null;
     switch(mt.getMode()){
     case OPTIONAL:
@@ -153,7 +149,7 @@ public class MaterializedField{
     default:
       throw new UnsupportedOperationException();
     }
-    return new MaterializedField(def.toBuilder().setMajorType(mt.toBuilder().setMode(newDataMode).build()).build());
+    return new MaterializedField(path, mt.toBuilder().setMode(newDataMode).build());
   }
 
   public Class<?> getValueClass() {
@@ -161,33 +157,19 @@ public class MaterializedField{
   }
 
   public boolean matches(SchemaPath path) {
-    Iterator<NamePart> iter = def.getNameList().iterator();
+    if(!path.isSimplePath()) return false;
 
-    for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
-      if(p == null) break;
-      if (!iter.hasNext()) return false;
-      NamePart n = iter.next();
-
-      if (p.isArray()) {
-        if (n.getType() == Type.ARRAY) continue;
-        return false;
-      } else {
-        if (p.getNameSegment().getPath().equalsIgnoreCase(n.getName())) continue;
-        return false;
-      }
-
-    }
-    // we've reviewed all path segments. confirm that we don't have any extra name parts.
-    return !iter.hasNext();
+    return this.path.equals(path);
   }
 
 
-
   @Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;
-    result = prime * result + ((def == null) ? 0 : def.hashCode());
+    result = prime * result + ((children == null) ? 0 : children.hashCode());
+    result = prime * result + ((path == null) ? 0 : path.hashCode());
+    result = prime * result + ((type == null) ? 0 : type.hashCode());
     return result;
   }
 
@@ -200,20 +182,30 @@ public class MaterializedField{
     if (getClass() != obj.getClass())
       return false;
     MaterializedField other = (MaterializedField) obj;
-    if (def == null) {
-      if (other.def != null)
+    if (children == null) {
+      if (other.children != null)
+        return false;
+    } else if (!children.equals(other.children))
+      return false;
+    if (path == null) {
+      if (other.path != null)
+        return false;
+    } else if (!path.equals(other.path))
+      return false;
+    if (type == null) {
+      if (other.type != null)
         return false;
-    } else if (!def.equals(other.def))
+    } else if (!type.equals(other.type))
       return false;
     return true;
   }
 
   @Override
   public String toString() {
-    return "MaterializedField [" + def.toString() + "]";
+    return "MaterializedField [path=" + path + ", type=" + type + "]";
   }
 
   public String toExpr(){
-    return this.getAsSchemaPath().toExpr();
+    return path.toExpr();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 31283c6..60fdd4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -27,7 +27,7 @@ import org.apache.drill.exec.vector.ValueVector;
  * A record batch contains a set of field values for a particular range of records. In the case of a record batch
  * composed of ValueVectors, ideally a batch fits within L2 cache (~256k per core). The set of value vectors do not
  * change unless the next() IterOutcome is a *_NEW_SCHEMA type.
- * 
+ *
  * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids
  * provided utilizing getValueVectorId();
  */
@@ -56,21 +56,21 @@ public interface RecordBatch extends VectorAccessible {
   /**
    * Access the FragmentContext of the current query fragment. Useful for reporting failure information or other query
    * level information.
-   * 
+   *
    * @return
    */
   public FragmentContext getContext();
 
   /**
    * Provide the schema of the current RecordBatch. This changes if and only if a *_NEW_SCHEMA IterOutcome is provided.
-   * 
+   *
    * @return
    */
   public BatchSchema getSchema();
 
   /**
    * Provide the number of records that are within this record count
-   * 
+   *
    * @return
    */
   public int getRecordCount();
@@ -89,7 +89,7 @@ public interface RecordBatch extends VectorAccessible {
    * Get the value vector type and id for the given schema path. The TypedFieldId should store a fieldId which is the
    * same as the ordinal position of the field within the Iterator provided this classes implementation of
    * Iterable<ValueVector>.
-   * 
+   *
    * @param path
    *          The path where the vector should be located.
    * @return The local field id associated with this vector. If no field matches this path, this will return a null
@@ -97,24 +97,24 @@ public interface RecordBatch extends VectorAccessible {
    */
   public abstract TypedFieldId getValueVectorId(SchemaPath path);
   @Override
-  public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
+  public abstract VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids);
 
   /**
    * Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
    * IterOutcome.NONE, the consumer should no longer next(). Behavior at this point is undetermined and likely to throw
    * an exception.
-   * 
+   *
    * @return An IterOutcome describing the result of the iteration.
    */
   public IterOutcome next();
 
   /**
    * Get a writable version of this batch. Takes over owernship of existing buffers.
-   * 
+   *
    * @return
    */
   public WritableBatch getWritableBatch();
-  
+
   public void cleanup();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ed450af..10d959f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -23,13 +23,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import javax.jdo.metadata.FieldMetadata;
+
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Maps;
@@ -63,25 +64,24 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     boolean schemaChanged = schema == null;
 //    logger.info("Load, ThreadID: {}", Thread.currentThread().getId(), new RuntimeException("For Stack Trace Only"));
 //    System.out.println("Load, ThreadId: " + Thread.currentThread().getId());
-    Map<FieldDef, ValueVector> oldFields = Maps.newHashMap();
+    Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
     for(VectorWrapper<?> w : container){
       ValueVector v = w.getValueVector();
-      oldFields.put(v.getField().getDef(), v);
+      oldFields.put(v.getField(), v);
     }
 
     VectorContainer newVectors = new VectorContainer();
 
-    List<FieldMetadata> fields = def.getFieldList();
+    List<SerializedField> fields = def.getFieldList();
 
     int bufOffset = 0;
-    for (FieldMetadata fmd : fields) {
-      FieldDef fieldDef = fmd.getDef();
+    for (SerializedField fmd : fields) {
+      MaterializedField fieldDef = MaterializedField.create(fmd);
       ValueVector v = oldFields.remove(fieldDef);
       if(v == null) {
         // if we arrive here, we didn't have a matching vector.
         schemaChanged = true;
-        MaterializedField m = new MaterializedField(fieldDef);
-        v = TypeHelper.getNewVector(m, allocator);
+        v = TypeHelper.getNewVector(fieldDef, allocator);
       }
       if (fmd.getValueCount() == 0){
         v.clear();
@@ -136,8 +136,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     return valueCount;
   }
 
-  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
-    return container.getValueAccessorById(fieldId, clazz);
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids){
+    return container.getValueAccessorById(clazz, ids);
   }
 
   public WritableBatch getWritableBatch(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index b7a8248..692fe62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -17,11 +17,15 @@
  */
 package org.apache.drill.exec.record;
 
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+import org.apache.drill.exec.vector.complex.MapVector;
 
 public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class);
-  
+
   private T v;
 
   public SimpleVectorWrapper(T v){
@@ -53,8 +57,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
   public boolean isHyper() {
     return false;
   }
-  
-  
+
   @SuppressWarnings("unchecked")
   @Override
   public VectorWrapper<T> cloneAndTransfer() {
@@ -71,4 +74,56 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
   public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v){
     return new SimpleVectorWrapper<T>(v);
   }
+
+
+  @Override
+  public VectorWrapper<?> getChildWrapper(int[] ids) {
+    if(ids.length == 1) return this;
+
+    ValueVector vector = v;
+
+    for(int i = 1; i < ids.length; i++){
+      MapVector map = (MapVector) vector;
+      vector = map.getVectorById(ids[i]);
+    }
+
+    return new SimpleVectorWrapper<ValueVector>(vector);
+  }
+
+  @Override
+  public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) {
+    if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null;
+    PathSegment seg = expectedPath.getRootSegment();
+
+    if(v instanceof AbstractContainerVector){
+      // we're looking for a multi path.
+      AbstractContainerVector c = (AbstractContainerVector) v;
+      TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+      builder.intermediateType(v.getField().getType());
+      builder.addId(id);
+      return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild());
+
+    }else{
+      TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+      builder.intermediateType(v.getField().getType());
+      builder.addId(id);
+      builder.finalType(v.getField().getType());
+      if(seg.isLastPath()){
+        return builder.build();
+      }else{
+        PathSegment child = seg.getChild();
+        if(child.isArray() && child.isLastPath()){
+          builder.remainder(child);
+          builder.withIndex();
+          return builder.build();
+        }else{
+          return null;
+        }
+
+      }
+
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
index ba2c7b2..9645be9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
@@ -23,5 +23,5 @@ public interface TransferPair {
   public void transfer();
   public void splitAndTransfer(int startIndex, int length);
   public ValueVector getTo();
-  public void copyValue(int from, int to);
+  public boolean copyValueSafe(int from, int to);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
index 0fbd0ae..24a8251 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
@@ -17,34 +17,174 @@
  */
 package org.apache.drill.exec.record;
 
+import java.util.Arrays;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.carrotsearch.hppc.IntArrayList;
+import com.google.common.base.Preconditions;
 
 public class TypedFieldId {
-  final MajorType type;
-  final int fieldId;
+  final MajorType finalType;
+  final MajorType secondaryFinal;
+  final MajorType intermediateType;
+  final int[] fieldIds;
   final boolean isHyperReader;
+  final PathSegment remainder;
+
+  public TypedFieldId(MajorType type, int... fieldIds){
+    this(type, type, type, false, null, fieldIds);
+  }
 
-  public TypedFieldId(MajorType type, int fieldId){
-    this(type, fieldId, false);
+  public TypedFieldId(MajorType type, IntArrayList breadCrumb, PathSegment remainder){
+    this(type, type, type, false, remainder, breadCrumb.toArray());
   }
-  
-  public TypedFieldId(MajorType type, int fieldId, boolean isHyper) {
+
+  public TypedFieldId(MajorType type, boolean isHyper, int... fieldIds){
+    this(type, type, type, isHyper, null, fieldIds);
+  }
+
+  public TypedFieldId(MajorType intermediateType, MajorType secondaryFinal, MajorType finalType, boolean isHyper, PathSegment remainder, int... fieldIds) {
     super();
-    this.type = type;
-    this.fieldId = fieldId;
+    this.intermediateType = intermediateType;
+    this.finalType = finalType;
+    this.secondaryFinal = secondaryFinal;
+    this.fieldIds = fieldIds;
     this.isHyperReader = isHyper;
+    this.remainder = remainder;
+  }
+
+
+
+  public TypedFieldId cloneWithChild(int id){
+    int[] fieldIds = ArrayUtils.add(this.fieldIds, id);
+    return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds);
+  }
+
+  public PathSegment getLastSegment(){
+    if(remainder == null) return null;
+    PathSegment seg = remainder;
+    while(seg.getChild() != null){
+      seg = seg.getChild();
+    }
+    return seg;
+  }
+
+  public TypedFieldId cloneWithRemainder(PathSegment remainder){
+    return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds);
+  }
+
+  public boolean hasRemainder(){
+    return remainder != null;
+  }
+
+  public PathSegment getRemainder(){
+    return remainder;
   }
 
   public boolean isHyperReader(){
     return isHyperReader;
   }
-  
-  public MajorType getType() {
-    return type;
+
+  public MajorType getIntermediateType() {
+    return intermediateType;
+  }
+
+  public Class<? extends ValueVector> getIntermediateClass(){
+    return (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(intermediateType.getMinorType(), intermediateType.getMode());
+  }
+
+  public MajorType getFinalType(){
+    return finalType;
   }
 
-  public int getFieldId() {
-    return fieldId;
+  public int[] getFieldIds() {
+    return fieldIds;
+  }
+
+
+
+  public MajorType getSecondaryFinal() {
+    return secondaryFinal;
+  }
+
+  public static Builder newBuilder(){
+    return new Builder();
+  }
+
+  public static class Builder{
+    final IntArrayList ids = new IntArrayList();
+    MajorType finalType;
+    MajorType intermediateType;
+    PathSegment remainder;
+    boolean hyperReader = false;
+    boolean withIndex = false;
+
+    public Builder addId(int id){
+      ids.add(id);
+      return this;
+    }
+
+    public Builder withIndex(){
+      withIndex = true;
+      return this;
+    }
+
+    public Builder remainder(PathSegment remainder){
+      this.remainder = remainder;
+      return this;
+    }
+
+    public Builder hyper(){
+      this.hyperReader = true;
+      return this;
+    }
+
+    public Builder finalType(MajorType finalType){
+      this.finalType = finalType;
+      return this;
+    }
+
+    public Builder intermediateType(MajorType intermediateType){
+      this.intermediateType = intermediateType;
+      return this;
+    }
+
+    public TypedFieldId build(){
+      Preconditions.checkNotNull(intermediateType);
+      Preconditions.checkNotNull(finalType);
+
+      if(intermediateType == null) intermediateType = finalType;
+      MajorType actualFinalType = finalType;
+      MajorType secondaryFinal = finalType;
+
+      // if this has an index, switch to required type for output
+      if(withIndex && intermediateType == finalType) actualFinalType = finalType.toBuilder().setMode(DataMode.REQUIRED).build();
+
+      // if this isn't a direct access, switch the final type to nullable as offsets may be null.
+      // TODO: there is a bug here with some things.
+      if(intermediateType != finalType) actualFinalType = finalType.toBuilder().setMode(DataMode.OPTIONAL).build();
+
+      return new TypedFieldId(intermediateType, secondaryFinal, actualFinalType, hyperReader, remainder, ids.toArray());
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + Arrays.hashCode(fieldIds);
+    result = prime * result + ((finalType == null) ? 0 : finalType.hashCode());
+    result = prime * result + ((intermediateType == null) ? 0 : intermediateType.hashCode());
+    result = prime * result + (isHyperReader ? 1231 : 1237);
+    result = prime * result + ((remainder == null) ? 0 : remainder.hashCode());
+    result = prime * result + ((secondaryFinal == null) ? 0 : secondaryFinal.hashCode());
+    return result;
   }
 
   @Override
@@ -56,20 +196,32 @@ public class TypedFieldId {
     if (getClass() != obj.getClass())
       return false;
     TypedFieldId other = (TypedFieldId) obj;
-    if (fieldId != other.fieldId)
+    if (!Arrays.equals(fieldIds, other.fieldIds))
       return false;
-    if (type == null) {
-      if (other.type != null)
+    if (finalType == null) {
+      if (other.finalType != null)
         return false;
-    } else if (!type.equals(other.type))
+    } else if (!finalType.equals(other.finalType))
+      return false;
+    if (intermediateType == null) {
+      if (other.intermediateType != null)
+        return false;
+    } else if (!intermediateType.equals(other.intermediateType))
+      return false;
+    if (isHyperReader != other.isHyperReader)
+      return false;
+    if (remainder == null) {
+      if (other.remainder != null)
+        return false;
+    } else if (!remainder.equals(other.remainder))
+      return false;
+    if (secondaryFinal == null) {
+      if (other.secondaryFinal != null)
+        return false;
+    } else if (!secondaryFinal.equals(other.secondaryFinal))
       return false;
     return true;
   }
 
-  @Override
-  public String toString() {
-    return "TypedFieldId [type=" + type + ", fieldId=" + fieldId + ", isSuperReader=" + isHyperReader + "]";
-  }
 
-  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index a8100b2..474a0a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.vector.ValueVector;
  * To change this template use File | Settings | File Templates.
  */
 public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
-  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds);
   public TypedFieldId getValueVectorId(SchemaPath path);
   public BatchSchema getSchema();
   public int getRecordCount();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 25289a8..1c7714e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -22,13 +22,15 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
 
 import com.beust.jcommander.internal.Lists;
 import com.google.common.base.Preconditions;
 
-public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccessible {
+public class VectorContainer extends AbstractMapVector implements Iterable<VectorWrapper<?>>, VectorAccessible {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
 
   protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
@@ -61,6 +63,10 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
     add(vv, releasable);
   }
 
+  public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){
+    return null;
+  }
+
   /**
    * Get a set of transferred clones of this container. Note that this guarantees that the vectors in the cloned
    * container have the same TypedFieldIds as the existing container, allowing interchangeability in generated code. In
@@ -94,7 +100,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
     schema = null;
     int i = wrappers.size();
     wrappers.add(SimpleVectorWrapper.create(vv));
-    return new TypedFieldId(vv.getField().getType(), i, false);
+    return new TypedFieldId(vv.getField().getType(), i);
   }
 
   public void add(ValueVector[] hyperVector) {
@@ -129,29 +135,33 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
   public TypedFieldId getValueVectorId(SchemaPath path) {
     for (int i = 0; i < wrappers.size(); i++) {
       VectorWrapper<?> va = wrappers.get(i);
-      SchemaPath w = va.getField().getAsSchemaPath();
-      if (w.equals(path)){
-        return new TypedFieldId(va.getField().getType(), i, va.isHyper());
+      TypedFieldId id = va.getFieldIdIfMatches(i, path);
+      if(id != null){
+        return id;
       }
     }
 
-    if(path.getRootSegment().isNamed() && path.getRootSegment().getNameSegment().getPath().equals("_MAP") && path.getRootSegment().isLastPath()) throw new UnsupportedOperationException("Drill does not yet support map references.");
     return null;
   }
 
 
+
+
   @Override
-  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
-    VectorWrapper<?> va = wrappers.get(fieldId);
-    if(va!= null && clazz == null){
-      return (VectorWrapper<?>) va;
-    }
-    if (va != null && va.getVectorClass() != clazz) {
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
+    Preconditions.checkArgument(fieldIds.length >= 1);
+    VectorWrapper<?> va = wrappers.get(fieldIds[0]);
+
+    if(va == null) return null;
+
+    if (fieldIds.length == 1 && clazz != null && !clazz.isAssignableFrom(va.getVectorClass())) {
       throw new IllegalStateException(String.format(
           "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
           clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
     }
-    return (VectorWrapper<?>) va;
+
+    return (VectorWrapper<?>) va.getChildWrapper(fieldIds);
+
   }
 
   public BatchSchema getSchema() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index 401b50e..dc8ffe5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -17,11 +17,14 @@
  */
 package org.apache.drill.exec.record;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.vector.ValueVector;
 
 
 public interface VectorWrapper<T extends ValueVector> {
 
+
+
   public Class<T> getVectorClass();
   public MaterializedField getField();
   public T getValueVector();
@@ -29,4 +32,11 @@ public interface VectorWrapper<T extends ValueVector> {
   public boolean isHyper();
   public void clear();
   public VectorWrapper<T> cloneAndTransfer();
+  public VectorWrapper<?> getChildWrapper(int[] ids);
+
+  /**
+   * Traverse the object graph and determine whether the provided SchemaPath matches data within the Wrapper.  If so, return a TypedFieldId associated with this path.
+   * @return TypedFieldId
+   */
+  public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath);
 }