You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/28 02:30:38 UTC

[3/5] DRILL-185: Extend code/clean code generator and add first aggregate functions.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggTemplate.java
new file mode 100644
index 0000000..25e3675
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggTemplate.java
@@ -0,0 +1,288 @@
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.aggregate.AggBatch.AggOutcome;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
+public abstract class AggTemplate implements Aggregator {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Aggregator.class);
+  private static final boolean EXTRA_DEBUG = false;
+  private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch.  This likely means that a single value is too long for a varlen field.";
+  private IterOutcome lastOutcome = null;
+  private boolean first = true;
+  private boolean newSchema = false;
+  private int previousIndex = 0;
+  private int underlyingIndex = 0;
+  private int currentIndex;
+  private int addedRecordCount = 0;
+  private boolean pendingOutput = false;
+  private IterOutcome outcome;
+  private int outputCount = 0;
+  private RecordBatch incoming;
+  private BatchSchema schema;
+  private RecordBatch outgoing;
+  private VectorAllocator[] allocators;
+  private FragmentContext context;
+  private InternalBatch remainderBatch;
+
+
+  @Override
+  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException {
+    this.allocators = allocators;
+    this.context = context;
+    this.incoming = incoming;
+    this.schema = incoming.getSchema();
+    this.allocators = allocators;
+    this.outgoing = outgoing;
+    setupInterior(incoming, outgoing);
+    this.currentIndex = this.getVectorIndex(underlyingIndex);
+  }
+
+  
+  private void allocateOutgoing() {
+    for (VectorAllocator a : allocators) {
+      if(EXTRA_DEBUG) logger.debug("Allocating {} with {} records.", a, 20000);
+      a.alloc(20000);
+    }
+  }
+  
+  @Override
+  public IterOutcome getOutcome() {
+    return outcome;
+  }
+
+  @Override
+  public int getOutputCount() {
+    return outputCount;
+  }
+
+  private AggOutcome tooBigFailure(){
+    context.fail(new Exception(TOO_BIG_ERROR));
+    this.outcome = IterOutcome.STOP;
+    return AggOutcome.CLEANUP_AND_RETURN;
+  }
+  
+  @Override
+  public AggOutcome doWork() {
+    try{ // outside loop to ensure that first is set to false after the first run.
+      
+      // if we're in the first state, allocate outgoing.
+      if(first){
+        allocateOutgoing();
+      }
+      
+      // pick up a remainder batch if we have one.
+      if(remainderBatch != null){
+        if (!outputToBatch( previousIndex )) return tooBigFailure();
+        remainderBatch.clear();
+        remainderBatch = null;
+        return setOkAndReturn();
+      }
+      
+      
+      // setup for new output and pick any remainder.
+      if (pendingOutput) {
+        allocateOutgoing();
+        pendingOutput = false;
+        if (!outputToBatch( previousIndex)) return tooBigFailure();
+      }
+  
+      if(newSchema){
+        return AggOutcome.UPDATE_AGGREGATOR;
+      }
+      
+      if(lastOutcome != null){
+        outcome = lastOutcome;
+        return AggOutcome.CLEANUP_AND_RETURN;
+      }
+      
+      outside: while(true){
+      // loop through existing records, adding as necessary.
+        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
+          if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+          if (isSame( previousIndex, currentIndex )) {
+            if(EXTRA_DEBUG) logger.debug("Values were found the same, adding.");
+            addRecordInc(currentIndex);
+          } else {
+            if(EXTRA_DEBUG) logger.debug("Values were different, outputting previous batch.");
+            if (outputToBatch(previousIndex)) {
+              if(EXTRA_DEBUG) logger.debug("Output successful.");
+              addRecordInc(currentIndex);
+            } else {
+              if(EXTRA_DEBUG) logger.debug("Output failed.");
+              if(outputCount == 0) return tooBigFailure();
+              
+              // mark the pending output but move forward for the next cycle.
+              pendingOutput = true;
+              previousIndex = currentIndex;
+              incIndex();
+              return setOkAndReturn();
+              
+            }
+          }
+          previousIndex = currentIndex;
+        }
+        
+        
+        InternalBatch previous = null;
+        
+        try{
+          while(true){
+            previous = new InternalBatch(incoming);
+            IterOutcome out = incoming.next();
+            if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
+            switch(out){
+            case NONE:
+              lastOutcome = out;
+              if(addedRecordCount > 0){
+                if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
+                if(EXTRA_DEBUG) logger.debug("Received no more batches, returning.");
+                return setOkAndReturn();
+              }else{
+                outcome = out;
+                return AggOutcome.CLEANUP_AND_RETURN;
+              }
+              
+
+              
+            case NOT_YET:
+              this.outcome = out;
+              return AggOutcome.RETURN_OUTCOME;
+              
+            case OK_NEW_SCHEMA:
+              if(EXTRA_DEBUG) logger.debug("Received new schema.");
+              if(addedRecordCount > 0){
+                if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
+                if(EXTRA_DEBUG) logger.debug("Wrote out end of previous batch, returning.");
+                newSchema = true;
+                return setOkAndReturn();
+              }
+              cleanup();
+              return AggOutcome.UPDATE_AGGREGATOR;   
+            case OK:
+              resetIndex();
+              if(incoming.getRecordCount() == 0){
+                continue;
+              }else{
+                if(isSamePrev(previousIndex , previous, currentIndex)){
+                  if(EXTRA_DEBUG) logger.debug("New value was same as last value of previous batch, adding.");
+                  addRecordInc(currentIndex);
+                  previousIndex = currentIndex;
+                  incIndex();
+                  if(EXTRA_DEBUG) logger.debug("Continuing outside");
+                  continue outside;
+                }else{ // not the same
+                  if(EXTRA_DEBUG) logger.debug("This is not the same as the previous, add record and continue outside.");
+                  previousIndex = currentIndex;
+                  if(addedRecordCount > 0){
+                    if( !outputToBatchPrev( previous, previousIndex, outputCount) ){
+                      remainderBatch = previous;
+                      return setOkAndReturn(); 
+                    }
+                    continue outside;
+                  }
+                }
+              }
+            case STOP:
+            default:
+              lastOutcome = out;
+              outcome = out;
+              return AggOutcome.CLEANUP_AND_RETURN;
+            }
+
+
+        }
+        }finally{
+          // make sure to clear previous if we haven't saved it.
+          if(remainderBatch == null && previous != null){
+            previous.clear();
+          }
+        }
+      }
+    }finally{
+      if(first) first = !first;
+    }
+    
+  }
+  
+  
+  private final void incIndex(){
+    underlyingIndex++;
+    if(underlyingIndex >= incoming.getRecordCount()){
+      currentIndex = Integer.MAX_VALUE;
+      return;
+    }
+    currentIndex = getVectorIndex(underlyingIndex);
+  }
+  
+  private final void resetIndex(){
+    underlyingIndex = -1;
+    incIndex();
+  }
+  
+  private final AggOutcome setOkAndReturn(){
+    if(first){
+      this.outcome = IterOutcome.OK_NEW_SCHEMA;
+    }else{
+      this.outcome = IterOutcome.OK;
+    }
+    for(VectorWrapper<?> v : outgoing){
+      v.getValueVector().getMutator().setValueCount(outputCount);
+    }
+    return AggOutcome.RETURN_OUTCOME;
+  }
+
+  private final boolean outputToBatch(int inIndex){
+    boolean success = outputRecordKeys(inIndex, outputCount) //
+        && outputRecordValues(outputCount) //
+        && resetValues();
+    if(success){
+      if(EXTRA_DEBUG) logger.debug("Outputting values to {}", outputCount);
+      outputCount++;
+      addedRecordCount = 0;
+    }
+    
+    return success;
+  }
+
+  private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex){
+    boolean success = outputRecordKeysPrev(b1, inIndex, outIndex) //
+        && outputRecordValues(outIndex) //
+        && resetValues();
+    if(success){
+      outputCount++;
+      addedRecordCount = 0;
+    }
+    
+    return success;
+  }
+  
+  private void addRecordInc(int index){
+    addRecord(index);
+    this.addedRecordCount++;
+  }
+  
+  @Override
+  public void cleanup(){
+    if(remainderBatch != null) remainderBatch.clear(); 
+  }
+
+
+  public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
+  public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
+  public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
+  public abstract void addRecord(@Named("index") int index);
+  public abstract boolean outputRecordKeys(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract boolean outputRecordKeysPrev(@Named("previous") InternalBatch previous, @Named("previousIndex") int previousIndex, @Named("outIndex") int outIndex);
+  public abstract boolean outputRecordValues(@Named("outIndex") int outIndex);
+  public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
+  public abstract boolean resetValues();
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/Aggregator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/Aggregator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/Aggregator.java
new file mode 100644
index 0000000..fdda8c8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/Aggregator.java
@@ -0,0 +1,26 @@
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.aggregate.AggBatch.AggOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
+public interface Aggregator {
+
+  public static TemplateClassDefinition<Aggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<Aggregator>(Aggregator.class, AggTemplate.class);
+  
+  public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
+      VectorAllocator[] allocators) throws SchemaChangeException;
+
+  public abstract IterOutcome getOutcome();
+
+  public abstract int getOutputCount();
+
+  public abstract AggOutcome doWork();
+
+  public abstract void cleanup();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
new file mode 100644
index 0000000..2f9cabe
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
@@ -0,0 +1,9 @@
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+public interface BatchIterator {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchIterator.class);
+  
+  public IterOutcome next();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
new file mode 100644
index 0000000..343dbe5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -0,0 +1,65 @@
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public class InternalBatch implements Iterable<VectorWrapper<?>>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InternalBatch.class);
+  
+  private final VectorContainer container;
+  private final BatchSchema schema;
+  private final SelectionVector2 sv2;
+  private final SelectionVector4 sv4;
+  
+  public InternalBatch(RecordBatch incoming){
+    switch(incoming.getSchema().getSelectionVectorMode()){
+    case FOUR_BYTE:
+      this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
+      this.sv2 = null;
+      break;
+    case TWO_BYTE:
+      this.sv4 = null;
+      this.sv2 = incoming.getSelectionVector2().clone(); 
+      break;
+    default:
+      this.sv4 = null;
+      this.sv2 = null;
+    }
+    this.schema = incoming.getSchema();
+    this.container = VectorContainer.getTransferClone(incoming);
+  }
+
+  public BatchSchema getSchema() {
+    return schema;
+  }
+
+  public SelectionVector2 getSv2() {
+    return sv2;
+  }
+
+  public SelectionVector4 getSv4() {
+    return sv4;
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return container.iterator();
+  }
+
+  public void clear(){
+    if(sv2 != null) sv2.clear();
+    if(sv4 != null) sv4.clear();
+    container.clear();
+  }
+  
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
+    return container.getVectorAccessor(fieldId, clazz);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
deleted file mode 100644
index 0fad224..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.drill.exec.physical.impl.filter;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface FilterEvaluator {
-  public void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  public boolean doEval(int inIndex, int outIndex);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
index 18aa484..a7e8f0c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
@@ -1,5 +1,7 @@
 package org.apache.drill.exec.physical.impl.filter;
 
+import javax.inject.Named;
+
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -78,6 +80,7 @@ public abstract class FilterTemplate implements Filterer{
     outgoingSelectionVector.setRecordCount(svIndex);
   }
   
-  protected abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  protected abstract boolean doEval(int inIndex, int outIndex);
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+  public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
index fb08ef3..fec147f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -12,7 +12,6 @@ public interface Filterer {
   public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
   public void filterBatch(int recordCount);
   
-  public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION = new TemplateClassDefinition<Filterer>( //
-      Filterer.class, "org.apache.drill.exec.physical.impl.filter.FilterTemplate", FilterEvaluator.class, FilterSignature.class);
+  public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate.class);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index bc1ef4e..0cefc52 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -19,25 +19,33 @@
 package org.apache.drill.exec.physical.impl.partitionsender;
 
 import java.util.Iterator;
-import java.util.List;
 
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.GeneralRPCProtos;
-import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.bit.BitTunnel;
-import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.apache.drill.exec.work.foreman.ErrorHelper;
 
+import com.google.common.base.Preconditions;
+
 /**
  * OutgoingRecordBatch is a holder of value vectors which are to be sent to another host.  Thus,
  * next() will never be called on this object.  When a record batch is ready to send (e.g. nearing size
@@ -121,9 +129,9 @@ public class OutgoingRecordBatch implements RecordBatch {
     // NOTE: the value vector is directly referenced by generated code; therefore references
     // must remain valid.
     recordCount = 0;
-    for (VectorWrapper v : vectorContainer) {
+    for (VectorWrapper<?> v : vectorContainer) {
       logger.debug("Reallocating vv to capacity " + incoming.getRecordCount() + " after flush.");
-      getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount());
+      VectorAllocator.getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount());
     }
     if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
     return true;
@@ -139,14 +147,14 @@ public class OutgoingRecordBatch implements RecordBatch {
     vectorContainer = new VectorContainer();
 
     SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
-    for (VectorWrapper v : incoming) {
+    for (VectorWrapper<?> v : incoming) {
 
       // add field to the output schema
       bldr.addField(v.getField());
 
       // allocate a new value vector
       ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator());
-      getAllocator(v.getValueVector(), outgoingVector).alloc(recordCapacity);
+      VectorAllocator.getAllocator(v.getValueVector(), outgoingVector).alloc(recordCapacity);
       vectorContainer.add(outgoingVector);
       logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector());
     }
@@ -162,7 +170,7 @@ public class OutgoingRecordBatch implements RecordBatch {
     isLast = false;
     recordCount = 0;
     recordCapacity = 0;
-    for (VectorWrapper v : vectorContainer)
+    for (VectorWrapper<?> v : vectorContainer)
       v.getValueVector().clear();
   }
 
@@ -227,49 +235,6 @@ public class OutgoingRecordBatch implements RecordBatch {
     return WritableBatch.get(this);
   }
 
-  private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
-    if(outgoing instanceof FixedWidthVector){
-      return new FixedVectorAllocator((FixedWidthVector) outgoing);
-    }else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){
-      return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing);
-    }else{
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private class FixedVectorAllocator implements VectorAllocator{
-    FixedWidthVector out;
-
-    public FixedVectorAllocator(FixedWidthVector out) {
-      super();
-      this.out = out;
-    }
-
-    public void alloc(int recordCount){
-      out.allocateNew(recordCount);
-      out.getMutator().setValueCount(recordCount);
-    }
-  }
-
-  private class VariableVectorAllocator implements VectorAllocator{
-    VariableWidthVector in;
-    VariableWidthVector out;
-
-    public VariableVectorAllocator(VariableWidthVector in, VariableWidthVector out) {
-      super();
-      this.in = in;
-      this.out = out;
-    }
-
-    public void alloc(int recordCount){
-      out.allocateNew(in.getByteCapacity(), recordCount);
-      out.getMutator().setValueCount(recordCount);
-    }
-  }
-
-  public interface VectorAllocator{
-    public void alloc(int recordCount);
-  }  
   
   private StatusHandler statusHandler = new StatusHandler();
   private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 992ffdf..aa96d3f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -31,9 +31,5 @@ public interface Partitioner {
 
   public abstract void partitionBatch(RecordBatch incoming);
 
-  public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION =
-      new TemplateClassDefinition<>(Partitioner.class,
-                                    "org.apache.drill.exec.physical.impl.partitionsender.PartitionerTemplate",
-                                    PartitionerEvaluator.class,
-                                    PartitionerInnerSignature.class);
+  public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerEvaluator.java
deleted file mode 100644
index 8c92fdc..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerEvaluator.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.physical.impl.partitionsender;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface PartitionerEvaluator {
-  public void doSetup(FragmentContext context, RecordBatch incoming, OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
-  public void doEval(int inIndex, int outIndex);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 7198c3a..4fdd740 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -18,6 +18,8 @@
 
 package org.apache.drill.exec.physical.impl.partitionsender;
 
+import javax.inject.Named;
+
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
@@ -51,7 +53,8 @@ public abstract class PartitionerTemplate implements Partitioner {
 
   }
 
-  protected abstract void doSetup(FragmentContext context, RecordBatch incoming, OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
-  protected abstract void doEval(int inIndex, int outIndex);
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
deleted file mode 100644
index 75632e7..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.drill.exec.physical.impl.project;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface ProjectEvaluator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectEvaluator.class);
-  
-  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  public abstract void doEval(int inIndex, int outIndex) throws SchemaChangeException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 8f06290..c44d988 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -5,12 +5,8 @@ import java.util.List;
 
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
@@ -19,14 +15,12 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.TypeHelper;
@@ -76,7 +70,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
     for(int i =0; i < exprs.size(); i++){
       final NamedExpression namedExpression = exprs.get(i);
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector);
-      final MaterializedField outputField = getMaterializedField(namedExpression.getRef(), expr);
+      final MaterializedField outputField = MaterializedField.create(namedExpression.getRef(), expr.getMajorType());
       if(collector.hasErrors()){
         throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
       }
@@ -87,7 +81,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
         ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
         Preconditions.checkNotNull(incoming);
 
-        TransferPair tp = vvIn.getTransferPair();
+        TransferPair tp = vvIn.getTransferPair(namedExpression.getRef());
         transfers.add(tp);
         container.add(tp.getTo());
         logger.debug("Added transfer.");
@@ -95,7 +89,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
         // need to do evaluation.
         ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
         allocationVectors.add(vector);
-        ValueVectorWriteExpression write = new ValueVectorWriteExpression(container.add(vector), expr);
+        TypedFieldId fid = container.add(vector);
+        ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
         cg.addExpr(write);
         logger.debug("Added eval.");
       }
@@ -112,29 +107,5 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
     }
   }
   
-  private MaterializedField getMaterializedField(FieldReference reference, LogicalExpression expr){
-    return new MaterializedField(getFieldDef(reference, expr.getMajorType()));
-  }
-
-  private FieldDef getFieldDef(SchemaPath path, MajorType type){
-    return FieldDef //
-        .newBuilder() //
-        .addAllName(getNameParts(path.getRootSegment())) //
-        .setMajorType(type) //
-        .build();
-  }
-  
-  private List<NamePart> getNameParts(PathSegment seg){
-    List<NamePart> parts = Lists.newArrayList();
-    while(seg != null){
-      if(seg.isArray()){
-        parts.add(NamePart.newBuilder().setType(Type.ARRAY).build());
-      }else{
-        parts.add(NamePart.newBuilder().setType(Type.NAME).setName(seg.getNameSegment().getPath().toString()).build());
-      }
-      seg = seg.getChild();
-    }
-    return parts;
-  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
index ba83e61..327f8b8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
@@ -3,7 +3,6 @@ package org.apache.drill.exec.physical.impl.project;
 import java.util.List;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.compile.sig.DefaultGeneratorSignature;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
@@ -12,11 +11,8 @@ import org.apache.drill.exec.record.TransferPair;
 public interface Projector {
 
   public abstract void setup(FragmentContext context, RecordBatch incoming,  RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException;
-
-  
   public abstract int projectRecords(int recordCount, int firstOutputIndex);
 
-  public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>( //
-      Projector.class, "org.apache.drill.exec.physical.impl.project.ProjectorTemplate", ProjectEvaluator.class);
+  public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 5f15c2d..0f4b90d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -2,6 +2,8 @@ package org.apache.drill.exec.physical.impl.project;
 
 import java.util.List;
 
+import javax.inject.Named;
+
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -71,8 +73,9 @@ public abstract class ProjectorTemplate implements Projector {
     doSetup(context, incoming, outgoing);
   }
 
-  protected abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  protected abstract void doEval(int inIndex, int outIndex);
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
 
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
deleted file mode 100644
index 86d0d61..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.drill.exec.physical.impl.sort;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface Comparator {
-  
-  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  public abstract int doEval(int inIndex, int outIndex);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
deleted file mode 100644
index 02fffa5..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package org.apache.drill.exec.physical.impl.sort;
-
-import java.util.List;
-
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.IfExpression;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
-import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
-import org.apache.drill.common.expression.ValueExpressions.LongExpression;
-import org.apache.drill.common.expression.ValueExpressions.QuotedString;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
-import org.apache.drill.exec.expr.ValueVectorReadExpression;
-
-import com.google.common.collect.Lists;
-
-public class ReadIndexRewriter implements ExprVisitor<LogicalExpression, String, RuntimeException> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReadIndexRewriter.class);
-
-  private String batchName;
-  
-  
-  @Override
-  public LogicalExpression visitUnknown(LogicalExpression e, String newIndexName) {
-    if (e instanceof ValueVectorReadExpression) {
-      ValueVectorReadExpression old = (ValueVectorReadExpression) e;
-      return new ValueVectorReadExpression(old.getTypedFieldId(), newIndexName);
-    } else {
-      throw new UnsupportedOperationException(String.format(
-          "ReadIndex rewriter doesn't know how to rewrite expression of type %s.", e.getClass().getName()));
-    }
-  }
-
-  @Override
-  public LogicalExpression visitFunctionCall(FunctionCall call, String newIndexName) {
-    List<LogicalExpression> args = Lists.newArrayList();
-    for (int i = 0; i < call.args.size(); ++i) {
-      LogicalExpression newExpr = call.args.get(i).accept(this, null);
-      args.add(newExpr);
-    }
-
-    return new FunctionCall(call.getDefinition(), args, call.getPosition());
-  }
-
-  @Override
-  public LogicalExpression visitIfExpression(IfExpression ifExpr, String newIndexName) {
-    List<IfExpression.IfCondition> conditions = Lists.newArrayList(ifExpr.conditions);
-    LogicalExpression newElseExpr = ifExpr.elseExpression.accept(this, null);
-
-    for (int i = 0; i < conditions.size(); ++i) {
-      IfExpression.IfCondition condition = conditions.get(i);
-
-      LogicalExpression newCondition = condition.condition.accept(this, null);
-      LogicalExpression newExpr = condition.expression.accept(this, null);
-      conditions.set(i, new IfExpression.IfCondition(newCondition, newExpr));
-    }
-
-    return IfExpression.newBuilder().setElse(newElseExpr).addConditions(conditions).build();
-  }
-
-  @Override
-  public LogicalExpression visitSchemaPath(SchemaPath path, String newIndexName) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public LogicalExpression visitLongConstant(LongExpression intExpr, String value) throws RuntimeException {
-    return intExpr;
-  }
-
-  @Override
-  public LogicalExpression visitDoubleConstant(DoubleExpression dExpr, String value) throws RuntimeException {
-    return dExpr;
-  }
-
-  @Override
-  public LogicalExpression visitBooleanConstant(BooleanExpression e, String value) throws RuntimeException {
-    return e;
-  }
-
-  @Override
-  public LogicalExpression visitQuotedStringConstant(QuotedString e, String value) throws RuntimeException {
-    return e;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index a21af09..7e44429 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -14,7 +14,7 @@ import com.google.common.collect.Lists;
 /**
  * Holds the data for a particular record batch for later manipulation.
  */
-class RecordBatchData {
+public class RecordBatchData {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class);
   
   final List<ValueVector> vectors = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index c9bd55d..8cffda3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -9,6 +9,7 @@ import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
@@ -32,6 +33,10 @@ import com.sun.codemodel.JExpr;
 public class SortBatch extends AbstractRecordBatch<Sort> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatch.class);
 
+  public static final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+  public static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+  public static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+
   private static long MAX_SORT_BYTES = 8l * 1024 * 1024 * 1024;
 
   private final RecordBatch incoming;
@@ -47,7 +52,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
 
   @Override
   public int getRecordCount() {
-    return sv4.getLength();
+    return sv4.getCount();
   }
 
   @Override
@@ -127,18 +132,18 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
   
   private Sorter createNewSorter() throws ClassTransformationException, IOException, SchemaChangeException{
     CodeGenerator<Sorter> g = new CodeGenerator<Sorter>(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-    g.setMappingSet(SortSignature.MAIN_MAPPING);
+    g.setMappingSet(MAIN_MAPPING);
     
     for(OrderDef od : popConfig.getOrderings()){
       // first, we rewrite the evaluation stack for each side of the comparison.
       ErrorCollector collector = new ErrorCollectorImpl(); 
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), this, collector);
       if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-      g.setMappingSet(SortSignature.LEFT_MAPPING);
+      g.setMappingSet(LEFT_MAPPING);
       HoldingContainer left = g.addExpr(expr, false);
-      g.setMappingSet(SortSignature.RIGHT_MAPPING);
+      g.setMappingSet(RIGHT_MAPPING);
       HoldingContainer right = g.addExpr(expr, false);
-      g.setMappingSet(SortSignature.MAIN_MAPPING);
+      g.setMappingSet(MAIN_MAPPING);
       
       // next we wrap the two comparison sides and add the expression block for the comparison.
       FunctionCall f = new FunctionCall(ComparatorFunctions.COMPARE_TO, ImmutableList.of((LogicalExpression) new HoldingContainerExpression(left), new HoldingContainerExpression(right)), ExpressionPosition.UNKNOWN);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortSignature.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortSignature.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortSignature.java
deleted file mode 100644
index 7614f3e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortSignature.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.drill.exec.physical.impl.sort;
-
-import javax.inject.Named;
-
-import org.apache.drill.exec.compile.sig.CodeGeneratorSignature;
-import org.apache.drill.exec.compile.sig.DefaultGeneratorSignature;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface SortSignature extends CodeGeneratorSignature{
-  
-  public static final MappingSet MAIN_MAPPING = new MappingSet("null", "null", DefaultGeneratorSignature.DEFAULT_SCALAR_MAP, DefaultGeneratorSignature.DEFAULT_SCALAR_MAP);
-  public static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", "null", DefaultGeneratorSignature.DEFAULT_SCALAR_MAP, DefaultGeneratorSignature.DEFAULT_SCALAR_MAP);
-  public static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", "null", DefaultGeneratorSignature.DEFAULT_SCALAR_MAP, DefaultGeneratorSignature.DEFAULT_SCALAR_MAP);
-
-  public void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
-  public int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
index d312fb4..ddd066f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -1,5 +1,7 @@
 package org.apache.drill.exec.physical.impl.sort;
 
+import javax.inject.Named;
+
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
@@ -40,6 +42,7 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{
     return doEval(sv1, sv2);
   }
 
-  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  public abstract int doEval(int leftIndex, int rightIndex);
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+  public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
index 1a76423..9cf98e3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
@@ -11,7 +11,6 @@ public interface Sorter {
   public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException;
   public void sort(SelectionVector4 vector4, VectorContainer container);
   
-  public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>( //
-      Sorter.class, "org.apache.drill.exec.physical.impl.sort.SortTemplate", Comparator.class, SortSignature.class);
+  public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>(Sorter.class, SortTemplate.class);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 363bbee..7495e31 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -1,18 +1,14 @@
 package org.apache.drill.exec.physical.impl.svremover;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.compile.sig.DefaultGeneratorSignature;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public interface Copier {
-  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>( //
-      Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate2", CopyEvaluator.class);
-
-  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>( //
-      Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate4", CopyEvaluator.class);
+  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate2.class);
+  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate4.class);
 
   public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException;
   public abstract void copyRecords();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
index 4dc38f2..1f99f04 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -1,10 +1,12 @@
 package org.apache.drill.exec.physical.impl.svremover;
 
+import javax.inject.Named;
+
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public abstract class CopierTemplate2 implements Copier{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
@@ -36,8 +38,9 @@ public abstract class CopierTemplate2 implements Copier{
     }
   }
   
-  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  public abstract void doEval(int incoming, int outgoing);
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
         
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
index 2cf033e..6bf952d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -1,11 +1,13 @@
 package org.apache.drill.exec.physical.impl.svremover;
 
+import javax.inject.Named;
+
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public abstract class CopierTemplate4 implements Copier{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
@@ -30,18 +32,18 @@ public abstract class CopierTemplate4 implements Copier{
 
   @Override
   public void copyRecords(){
-    final int recordCount = sv4.getLength();
+    final int recordCount = sv4.getCount();
     allocateVectors(recordCount);
     int outgoingPosition = 0;
-    final int end = sv4.getStart() + sv4.getLength();
-    for(int svIndex = sv4.getStart(); svIndex < end; svIndex++, outgoingPosition++){
+    for(int svIndex = 0; svIndex < sv4.getCount(); svIndex++, outgoingPosition++){
       int deRefIndex = sv4.get(svIndex);
       doEval(deRefIndex, outgoingPosition);
     }
   }
   
-  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  public abstract void doEval(int incoming, int outgoing);
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
         
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopyEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopyEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopyEvaluator.java
deleted file mode 100644
index 25c51bb..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopyEvaluator.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.drill.exec.physical.impl.svremover;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface CopyEvaluator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopyEvaluator.class);
-  
-  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing);
-  public abstract void doEval(int incoming, int outgoing);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index e4fd9a0..521061c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -19,14 +19,14 @@ import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.allocator.FixedVectorAllocator;
+import org.apache.drill.exec.vector.allocator.VariableEstimatedVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.sun.codemodel.JBlock;
-import com.sun.codemodel.JClass;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JType;
 import com.sun.codemodel.JVar;
 
 public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
@@ -118,7 +118,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
     for(VectorWrapper<?> i : incoming){
       ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
       container.add(v);
-      allocators.add(getAllocator(i.getValueVector(), v));
+      allocators.add(VectorAllocator.getAllocator(i.getValueVector(), v));
     }
 
     try {
@@ -203,62 +203,4 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   }
   
   
-  private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
-    if(outgoing instanceof FixedWidthVector){
-      return new FixedVectorAllocator((FixedWidthVector) outgoing);
-    }else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){
-      return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing);
-    }else{
-      throw new UnsupportedOperationException();
-    }
-  }
-  
-  private class FixedVectorAllocator implements VectorAllocator{
-    FixedWidthVector out;
-    
-    public FixedVectorAllocator(FixedWidthVector out) {
-      super();
-      this.out = out;
-    }
-
-    public void alloc(int recordCount){
-      out.allocateNew(recordCount);
-      out.getMutator().setValueCount(recordCount);
-    }
-  }
-  
-  private class VariableEstimatedVector implements VectorAllocator{
-    VariableWidthVector out;
-    int avgWidth;
-    
-    public VariableEstimatedVector(VariableWidthVector out, int avgWidth) {
-      super();
-      this.out = out;
-      this.avgWidth = avgWidth;
-    }
-    
-    public void alloc(int recordCount){
-      out.allocateNew(avgWidth * recordCount, recordCount);
-      out.getMutator().setValueCount(recordCount);
-    }
-  }
-  private class VariableVectorAllocator implements VectorAllocator{
-    VariableWidthVector in;
-    VariableWidthVector out;
-    
-    public VariableVectorAllocator(VariableWidthVector in, VariableWidthVector out) {
-      super();
-      this.in = in;
-      this.out = out;
-    }
-
-    public void alloc(int recordCount){
-      out.allocateNew(in.getByteCapacity(), recordCount);
-      out.getMutator().setValueCount(recordCount);
-    }
-  }
-  
-  public interface VectorAllocator{
-    public void alloc(int recordCount);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index e8a5cf8..bbab867 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -1,7 +1,10 @@
 package org.apache.drill.exec.record;
 
+import java.lang.reflect.Array;
+
 import org.apache.drill.exec.vector.ValueVector;
 
+
 public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
   
@@ -48,6 +51,18 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
     
   }
   
+  @Override
+  @SuppressWarnings("unchecked")
+  public VectorWrapper<T> cloneAndTransfer() {
+    T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length);
+    for(int i =0; i < newVectors.length; i++){
+      TransferPair tp = vectors[i].getTransferPair();
+      tp.transfer();
+      newVectors[i] = (T) tp.getTo();
+    }
+    return new HyperVectorWrapper<T>(f, newVectors);
+  }
+
   public static <T extends ValueVector> HyperVectorWrapper<T> create(MaterializedField f, T[] v){
     return new HyperVectorWrapper<T>(f, v);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index abde61a..d974441 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -18,7 +18,10 @@
 package org.apache.drill.exec.record;
 
 import java.util.Iterator;
+import java.util.List;
 
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -28,6 +31,8 @@ import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
 import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
 import org.apache.drill.exec.vector.TypeHelper;
 
+import com.google.common.collect.Lists;
+
 public class MaterializedField{
   private final FieldDef def;
 
@@ -39,6 +44,10 @@ public class MaterializedField{
     return new MaterializedField(def);
   }
   
+  public MaterializedField clone(FieldReference ref){
+    return create(ref, def.getMajorType());
+  }
+  
   public static MaterializedField create(SchemaPath path, MajorType type) {
     FieldDef.Builder b = FieldDef.newBuilder();
     b.setMajorType(type);
@@ -172,5 +181,4 @@ public class MaterializedField{
     return "MaterializedField [" + def.toString() + "]";
   }
 
-  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index 94700a2..62ca8a4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -38,6 +38,14 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
   }
   
   
+  @SuppressWarnings("unchecked")
+  @Override
+  public VectorWrapper<T> cloneAndTransfer() {
+    TransferPair tp = v.getTransferPair();
+    tp.transfer();
+    return new SimpleVectorWrapper<T>((T) tp.getTo());
+  }
+
   @Override
   public void release() {
     v.clear();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
index a90e2d8..2035cac 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
@@ -5,4 +5,5 @@ import org.apache.drill.exec.vector.ValueVector;
 public interface TransferPair {
   public void transfer();
   public ValueVector getTo();
+  public void copyValue(int from, int to);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 57aad79..25036fc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -29,16 +29,35 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> {
       add(vArr);
     }
   }
-  
-  public void addHyperList(List<ValueVector> vectors){
+
+  public void addHyperList(List<ValueVector> vectors) {
     schema = null;
     ValueVector[] vv = new ValueVector[vectors.size()];
-    for(int i =0; i < vv.length; i++){
+    for (int i = 0; i < vv.length; i++) {
       vv[i] = vectors.get(i);
     }
     add(vv);
   }
 
+  /**
+   * Get a set of transferred clones of this container. Note that this guarantees that the vectors in the cloned
+   * container have the same TypedFieldIds as the existing container, allowing interchangeability in generated code.
+   * 
+   * @param incoming The RecordBatch iterator the contains the batch we should take over. 
+   * @return A cloned vector container.
+   */
+  public static VectorContainer getTransferClone(RecordBatch incoming) {
+    VectorContainer vc = new VectorContainer();
+    for (VectorWrapper<?> w : incoming) {
+      vc.cloneAndTransfer(w);
+    }
+    return vc;
+  }
+
+  private void cloneAndTransfer(VectorWrapper<?> wrapper) {
+    wrappers.add(wrapper.cloneAndTransfer());
+  }
+
   public void addCollection(Iterable<ValueVector> vectors) {
     schema = null;
     for (ValueVector vv : vectors) {
@@ -101,11 +120,13 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> {
     return (VectorWrapper<T>) va;
   }
 
-  public BatchSchema getSchema(){
-    Preconditions.checkNotNull(schema, "Schema is currently null.  You must call buildSchema(SelectionVectorMode) before this container can return a schema.");
+  public BatchSchema getSchema() {
+    Preconditions
+        .checkNotNull(schema,
+            "Schema is currently null.  You must call buildSchema(SelectionVectorMode) before this container can return a schema.");
     return schema;
   }
-  
+
   public void buildSchema(SelectionVectorMode mode) {
     SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(mode);
     for (VectorWrapper<?> v : wrappers) {
@@ -121,7 +142,8 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> {
 
   public void clear() {
     // TODO: figure out a better approach for this.
-    // we don't clear schema because we want empty batches to carry previous schema to avoid extra schema update for no data.
+    // we don't clear schema because we want empty batches to carry previous schema to avoid extra schema update for no
+    // data.
     // schema = null;
     for (VectorWrapper<?> w : wrappers) {
       w.release();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index e40dee4..1c5308e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -11,4 +11,5 @@ public interface VectorWrapper<T extends ValueVector> {
   public T[] getValueVectors();
   public boolean isHyper();
   public void release();
+  public VectorWrapper<T> cloneAndTransfer();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 2020f92..a778b1d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -42,7 +42,7 @@ public class SelectionVector4 {
     return recordCount;
   }
   
-  public int getCurrentCount(){
+  public int getCount(){
     return length;
   }
   
@@ -54,15 +54,23 @@ public class SelectionVector4 {
   }
   
   public int get(int index){
-    return vector.getInt(index*4);
+    return vector.getInt( (start+index)*4);
   }
-
-  public int getStart() {
-    return start;
-  }
-
-  public int getLength() {
-    return length;
+  
+  /**
+   * Caution: This method shares the underlying buffer between this vector and the newly created one.
+   * @return Newly created single batch SelectionVector4.
+   * @throws SchemaChangeException 
+   */
+  public SelectionVector4 createNewWrapperCurrent(){
+    try {
+      vector.retain();
+      SelectionVector4 sv4 = new SelectionVector4(vector, length, length);
+      sv4.start = this.start;
+      return sv4;
+    } catch (SchemaChangeException e) {
+      throw new IllegalStateException("This shouldn't happen.");
+    }
   }
   
   public boolean next(){
@@ -73,5 +81,9 @@ public class SelectionVector4 {
     return true;
   }
   
+  public void clear(){
+    this.vector.clear();
+  }
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 024aa21..c5375b2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -87,14 +87,15 @@ public class MockRecordReader implements RecordReader {
 
   @Override
   public int next() {
+    if(recordsRead >= this.config.getRecords()) return 0;
+    
+    int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead);
     
-    int recordSetSize = Math.min(batchRecordCount, this.config.getRecords()- recordsRead);
-
     recordsRead += recordSetSize;
     for(ValueVector v : valueVectors){
       AllocationHelper.allocate(v, recordSetSize, 50, 5);
       
-      logger.debug("MockRecordReader:  Generating random data for VV of type " + v.getClass().getName());
+//      logger.debug(String.format("MockRecordReader:  Generating %d records of random data for VV of type %s.", recordSetSize, v.getClass().getName()));
       ValueVector.Mutator m = v.getMutator();
       m.setValueCount(recordSetSize);
       m.generateTestData();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 661ceba..358334e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -1,5 +1,6 @@
 package org.apache.drill.exec.vector;
 
+import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 
@@ -24,6 +25,10 @@ abstract class BaseValueVector implements ValueVector{
     return field;
   }
   
+  public MaterializedField getField(FieldReference ref){
+    return getField().clone(ref);
+  }
+  
   abstract class BaseAccessor implements ValueVector.Accessor{
     public abstract int getValueCount();
     public void reset(){}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 77ba98e..8d21298 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -2,12 +2,9 @@ package org.apache.drill.exec.vector;
 
 import io.netty.buffer.ByteBuf;
 
-import java.util.Random;
-
+import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
-import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
 
@@ -98,10 +95,14 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     return new Accessor();
   }
 
-  public TransferPair getTransferPair() {
-    return new TransferImpl();
+  public TransferPair getTransferPair(){
+    return new TransferImpl(getField());
+  }
+  public TransferPair getTransferPair(FieldReference ref){
+    return new TransferImpl(getField().clone(ref));
   }
 
+
   public void transferTo(BitVector target) {
     target.data = data;
     target.data.retain();
@@ -112,8 +113,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   private class TransferImpl implements TransferPair {
     BitVector to;
 
-    public TransferImpl() {
-      this.to = new BitVector(getField(), allocator);
+    public TransferImpl(MaterializedField field) {
+      this.to = new BitVector(field, allocator);
     }
 
     public BitVector getTo() {
@@ -123,6 +124,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     public void transfer() {
       transferTo(to);
     }
+
+    @Override
+    public void copyValue(int fromIndex, int toIndex) {
+      to.copyFrom(fromIndex, toIndex, BitVector.this);
+    }
   }
 
   public class Accessor extends BaseAccessor {
@@ -199,6 +205,12 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     final void set(int index, NullableBitHolder holder) {
       set(index, holder.value);
     }
+    
+    public boolean setSafe(int index, int value) {
+      if(index >= getValueCapacity()) return false;
+      set(index, value);
+      return true;
+    }
 
     public final void setValueCount(int valueCount) {
       BitVector.this.valueCount = valueCount;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index dfe8e8c..7d86067 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 
+import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
@@ -58,6 +59,9 @@ public interface ValueVector extends Closeable {
    * @return 
    */
   public TransferPair getTransferPair();
+  
+  
+  public TransferPair getTransferPair(FieldReference ref);
 
   /**
    * Given the current buffer allocation, return the maximum number of values that this buffer can contain.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/FixedVectorAllocator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/FixedVectorAllocator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/FixedVectorAllocator.java
new file mode 100644
index 0000000..987dcf9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/FixedVectorAllocator.java
@@ -0,0 +1,24 @@
+package org.apache.drill.exec.vector.allocator;
+
+import org.apache.drill.exec.vector.FixedWidthVector;
+
+public class FixedVectorAllocator extends VectorAllocator{
+  FixedWidthVector out;
+  
+  public FixedVectorAllocator(FixedWidthVector out) {
+    super();
+    this.out = out;
+  }
+
+  public void alloc(int recordCount){
+    out.allocateNew(recordCount);
+    out.getMutator().setValueCount(recordCount);
+  }
+
+  @Override
+  public String toString() {
+    return "FixedVectorAllocator [out=" + out + "]";
+  }
+  
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VariableEstimatedVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VariableEstimatedVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VariableEstimatedVector.java
new file mode 100644
index 0000000..7b9f4a7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VariableEstimatedVector.java
@@ -0,0 +1,19 @@
+package org.apache.drill.exec.vector.allocator;
+
+import org.apache.drill.exec.vector.VariableWidthVector;
+
+public class VariableEstimatedVector extends VectorAllocator{
+  VariableWidthVector out;
+  int avgWidth;
+  
+  public VariableEstimatedVector(VariableWidthVector out, int avgWidth) {
+    super();
+    this.out = out;
+    this.avgWidth = avgWidth;
+  }
+  
+  public void alloc(int recordCount){
+    out.allocateNew(avgWidth * recordCount, recordCount);
+    out.getMutator().setValueCount(recordCount);
+  }
+}
\ No newline at end of file