You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/06 23:46:31 UTC

[2/3] Sort operator using HyperBatch concept. Supports single and multikey asc/desc sort. One comparator function definition. Add abstract record batch, vector container and vector wrapper. Various fixes to return to client when query fails.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
new file mode 100644
index 0000000..daf96fc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.collect.ArrayListMultimap;
+
+public class SortRecordBatchBuilder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
+  
+  private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
+  private final VectorContainer container;
+
+  private int recordCount;
+  private long runningBytes;
+  private long runningBatches;
+  private final long maxBytes;
+  private SelectionVector4 sv4;
+  final PreAllocator svAllocator;
+  
+  public SortRecordBatchBuilder(BufferAllocator a, long maxBytes, VectorContainer container){
+    this.maxBytes = maxBytes;
+    this.svAllocator = a.getPreAllocator();
+    this.container = container;
+  }
+  
+  private long getSize(RecordBatch batch){
+    long bytes = 0;
+    for(VectorWrapper<?> v : batch){
+      bytes += v.getValueVector().getBufferSize();
+    }
+    return bytes;
+  }
+  
+  /**
+   * Add another record batch to the set of record batches.  
+   * @param batch
+   * @return True if the requested add completed successfully.  Returns false in the case that this builder is full and cannot receive additional packages. 
+   * @throws SchemaChangeException
+   */
+  public boolean add(RecordBatch batch){
+    if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+    if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
+
+    long batchBytes = getSize(batch);
+    if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
+    if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch.
+    if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false;  // sv allocation available.
+      
+   
+    RecordBatchData bd = new RecordBatchData(batch);
+    runningBytes += batchBytes;
+    batches.put(batch.getSchema(), bd);
+    recordCount += bd.getRecordCount();
+    return true;
+  }
+
+  public void build(FragmentContext context) throws SchemaChangeException{
+    container.clear();
+    if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema.");
+    if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+    sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
+    BatchSchema schema = batches.keySet().iterator().next();
+    List<RecordBatchData> data = batches.get(schema);
+    
+    // now we're going to generate the sv4 pointers
+    switch(schema.getSelectionVectorMode()){
+    case NONE: {
+      int index = 0;
+      int recordBatchId = 0;
+      for(RecordBatchData d : data){
+        for(int i =0; i < d.getRecordCount(); i++, index++){
+          sv4.set(index, recordBatchId, i);
+        }
+        recordBatchId++;
+      }
+      break;
+    }
+    case TWO_BYTE: {
+      int index = 0;
+      int recordBatchId = 0;
+      for(RecordBatchData d : data){
+        for(int i =0; i < d.getRecordCount(); i++, index++){
+          sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
+        }
+        // might as well drop the selection vector since we'll stop using it now.
+        d.getSv2().clear();
+        recordBatchId++;
+      }
+      break;
+    }
+    default:
+      throw new UnsupportedOperationException();
+    }
+    
+    // next, we'll create lists of each of the vector types.
+    ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
+    for(RecordBatchData rbd : batches.values()){
+      for(ValueVector v : rbd.vectors){
+        vectors.put(v.getField(), v);
+      }
+    }
+    
+    for(MaterializedField f : vectors.keySet()){
+      List<ValueVector> v = vectors.get(f);
+      container.addHyperList(v);
+    }
+    
+    container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+  }
+
+  public SelectionVector4 getSv4() {
+    return sv4;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
new file mode 100644
index 0000000..c45f500
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -0,0 +1,45 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+public abstract class SortTemplate implements Sorter, IndexedSortable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortTemplate.class);
+  
+  private SelectionVector4 vector4;
+  
+  
+  public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException{
+    // we pass in the local hyperBatch since that is where we'll be reading data.
+    vector4 = hyperBatch.getSelectionVector4();
+    doSetup(context, hyperBatch, null);
+  }
+  
+  @Override
+  public void sort(SelectionVector4 vector4, VectorContainer container){
+    QuickSort qs = new QuickSort();
+    qs.sort(this, 0, vector4.getTotalCount());
+  }
+
+  @Override
+  public void swap(int sv0, int sv1) {
+    int tmp = vector4.get(sv0);
+    vector4.set(sv0, vector4.get(sv1));
+    vector4.set(sv1, tmp);
+  }
+  
+  @Override
+  public int compare(int inIndex, int outIndex) {
+    int sv1 = vector4.get(inIndex);
+    int sv2 = vector4.get(outIndex);
+    return doEval(sv1, sv2);
+  }
+
+  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  public abstract int doEval(int inIndex, int outIndex);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
new file mode 100644
index 0000000..bc4fae5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
@@ -0,0 +1,17 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public interface Sorter {
+  public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException;
+  public void sort(SelectionVector4 vector4, VectorContainer container);
+  
+  public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>( //
+      Sorter.class, "org.apache.drill.exec.physical.impl.sort.SortTemplate", Comparator.class, int.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 55d6ba2..ce17a2b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -8,8 +8,11 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
 public interface Copier {
-  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION = new TemplateClassDefinition<Copier>( //
-      Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate", CopyEvaluator.class, null);
+  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>( //
+      Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate2", CopyEvaluator.class, null);
+
+  public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>( //
+      Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate4", CopyEvaluator.class, null);
 
   public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException;
   public abstract void copyRecords();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
deleted file mode 100644
index 6a0e2c3..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.drill.exec.physical.impl.svremover;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-
-public abstract class CopierTemplate implements Copier{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate.class);
-  
-  private SelectionVector2 sv2;
-  private VectorAllocator[] allocators;
-  
-  @Override
-  public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
-    this.allocators = allocators;
-    this.sv2 = incoming.getSelectionVector2();
-    doSetup(context, incoming, outgoing);
-  }
-  
-  private void allocateVectors(int recordCount){
-    for(VectorAllocator a : allocators){
-      a.alloc(recordCount);
-    }
-  }
-  
-  @Override
-  public void copyRecords(){
-    final int recordCount = sv2.getCount();
-    allocateVectors(recordCount);
-    int outgoingPosition = 0;
-    
-    for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
-      doEval(svIndex, outgoingPosition);
-    }
-  }
-  
-  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
-  public abstract void doEval(int incoming, int outgoing);
-        
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
new file mode 100644
index 0000000..4dc38f2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -0,0 +1,43 @@
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+public abstract class CopierTemplate2 implements Copier{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
+  
+  private SelectionVector2 sv2;
+  private VectorAllocator[] allocators;
+  
+  @Override
+  public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
+    this.allocators = allocators;
+    this.sv2 = incoming.getSelectionVector2();
+    doSetup(context, incoming, outgoing);
+  }
+  
+  private void allocateVectors(int recordCount){
+    for(VectorAllocator a : allocators){
+      a.alloc(recordCount);
+    }
+  }
+  
+  @Override
+  public void copyRecords(){
+    final int recordCount = sv2.getCount();
+    allocateVectors(recordCount);
+    int outgoingPosition = 0;
+    
+    for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
+      doEval(svIndex, outgoingPosition);
+    }
+  }
+  
+  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  public abstract void doEval(int incoming, int outgoing);
+        
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
new file mode 100644
index 0000000..2cf033e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -0,0 +1,47 @@
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class CopierTemplate4 implements Copier{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
+  
+  private SelectionVector4 sv4;
+  private VectorAllocator[] allocators;
+
+  
+  private void allocateVectors(int recordCount){
+    for(VectorAllocator a : allocators){
+      a.alloc(recordCount);
+    }
+  }
+  
+  @Override
+  public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
+    this.allocators = allocators;
+    this.sv4 = incoming.getSelectionVector4();
+    doSetup(context, incoming, outgoing);
+  }
+  
+
+  @Override
+  public void copyRecords(){
+    final int recordCount = sv4.getLength();
+    allocateVectors(recordCount);
+    int outgoingPosition = 0;
+    final int end = sv4.getStart() + sv4.getLength();
+    for(int svIndex = sv4.getStart(); svIndex < end; svIndex++, outgoingPosition++){
+      int deRefIndex = sv4.get(svIndex);
+      doEval(deRefIndex, outgoingPosition);
+    }
+  }
+  
+  public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  public abstract void doEval(int incoming, int outgoing);
+        
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 68793b0..64e89ee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -1,24 +1,22 @@
 package org.apache.drill.exec.physical.impl.svremover;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.VectorHolder;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
@@ -31,36 +29,14 @@ import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JType;
 import com.sun.codemodel.JVar;
 
-public class RemovingRecordBatch implements RecordBatch{
+public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
 
-  private final RecordBatch incoming;
-  private final FragmentContext context;
-  private BatchSchema outSchema;
   private Copier copier;
-  private List<ValueVector> outputVectors;
-  private VectorHolder vh;
   private int recordCount;
   
-  public RemovingRecordBatch(RecordBatch incoming, FragmentContext context){
-    this.incoming = incoming;
-    this.context = context;
-  }
-  
-  @Override
-  public Iterator<ValueVector> iterator() {
-    return outputVectors.iterator();
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return context;
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    Preconditions.checkNotNull(outSchema);
-    return outSchema;
+  public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) {
+    super(popConfig, context, incoming);
   }
 
   @Override
@@ -69,64 +45,36 @@ public class RemovingRecordBatch implements RecordBatch{
   }
 
   @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return vh.getValueVector(path);
-  }
+  protected void setupNewSchema() throws SchemaChangeException {
+    container.clear();
+    
+    switch(incoming.getSchema().getSelectionVectorMode()){
+    case NONE:
+      this.copier = getStraightCopier();
+      break;
+    case TWO_BYTE:
+      this.copier = getGenerated2Copier();
+      break;
+    case FOUR_BYTE:
+      this.copier = getGenerated4Copier();
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
+    
+    container.buildSchema(SelectionVectorMode.NONE);
 
-  @Override
-  public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
-    return vh.getValueVector(fieldId, clazz);
   }
 
   @Override
-  public IterOutcome next() {
-    recordCount = 0;
-    IterOutcome upstream = incoming.next();
-    logger.debug("Upstream... {}", upstream);
-    switch(upstream){
-    case NONE:
-    case NOT_YET:
-    case STOP:
-      return upstream;
-    case OK_NEW_SCHEMA:
-      try{
-        copier = createCopier();
-      }catch(SchemaChangeException ex){
-        incoming.kill();
-        logger.error("Failure during query", ex);
-        context.fail(ex);
-        return IterOutcome.STOP;
-      }
-      // fall through.
-    case OK:
-      recordCount = incoming.getRecordCount();
-      copier.copyRecords();
-      for(ValueVector v : this.outputVectors){
-        ValueVector.Mutator m = v.getMutator();
-        m.setValueCount(recordCount);
-      }
-      return upstream; // change if upstream changed, otherwise normal.
-    default:
-      throw new UnsupportedOperationException();
+  protected void doWork() {
+    recordCount = incoming.getRecordCount();
+    copier.copyRecords();
+    for(VectorWrapper<?> v : container){
+      ValueVector.Mutator m = v.getValueVector().getMutator();
+      m.setValueCount(recordCount);
     }
   }
-  
-  
 
   
   private class StraightCopier implements Copier{
@@ -136,8 +84,8 @@ public class RemovingRecordBatch implements RecordBatch{
     
     @Override
     public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators){
-      for(ValueVector vv : incoming){
-        TransferPair tp = vv.getTransferPair();
+      for(VectorWrapper<?> vv : incoming){
+        TransferPair tp = vv.getValueVector().getTransferPair();
         pairs.add(tp);
         out.add(tp.getTo());
       }
@@ -159,23 +107,23 @@ public class RemovingRecordBatch implements RecordBatch{
   private Copier getStraightCopier(){
     StraightCopier copier = new StraightCopier();
     copier.setupRemover(context, incoming, this, null);
-    outputVectors.addAll(copier.getOut());
+    container.addCollection(copier.getOut());
     return copier;
   }
   
-  private Copier getGeneratedCopier() throws SchemaChangeException{
+  private Copier getGenerated2Copier() throws SchemaChangeException{
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
     
     List<VectorAllocator> allocators = Lists.newArrayList();
-    for(ValueVector i : incoming){
-      TransferPair t = i.getTransferPair();
-      outputVectors.add(t.getTo());
-      allocators.add(getAllocator(i, t.getTo()));
+    for(VectorWrapper<?> i : incoming){
+      ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+      container.add(v);
+      allocators.add(getAllocator(i.getValueVector(), v));
     }
 
     try {
-      final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-      generateCopies(cg);
+      final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
+      generateCopies(cg, false);
       Copier copier = context.getImplementationClass(cg);
       copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
       return copier;
@@ -184,73 +132,77 @@ public class RemovingRecordBatch implements RecordBatch{
     }
   }
   
-  
-  private Copier createCopier() throws SchemaChangeException{
-    if(outputVectors != null){
-      for(ValueVector v : outputVectors){
-        v.close();
-      }
-    }
-    this.outputVectors = Lists.newArrayList();
-    this.vh = new VectorHolder(outputVectors);
-
-    SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.NONE);
-    for(ValueVector v : incoming){
-      bldr.addField(v.getField());
-    }
-    this.outSchema = bldr.build();
+  private Copier getGenerated4Copier() throws SchemaChangeException{
+    Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
     
-    switch(incoming.getSchema().getSelectionVectorMode()){
-    case NONE:
-      return getStraightCopier();
-    case TWO_BYTE:
-      return getGeneratedCopier();
-    default:
-      throw new UnsupportedOperationException();
+    List<VectorAllocator> allocators = Lists.newArrayList();
+    for(VectorWrapper<?> i : incoming){
+      
+      ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+      container.add(v);
+      allocators.add(getAllocator4(v));
     }
 
+    try {
+      final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
+      generateCopies(cg, true);
+      Copier copier = context.getImplementationClass(cg);
+      copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
+      return copier;
+    } catch (ClassTransformationException | IOException e) {
+      throw new SchemaChangeException("Failure while attempting to load generated class", e);
+    }
   }
   
-  private void generateCopies(CodeGenerator<Copier> g){
+  private void generateCopies(CodeGenerator<Copier> g, boolean hyper){
     // we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all.
     int fieldId = 0;
     
-
-
     JExpression inIndex = JExpr.direct("inIndex");
     JExpression outIndex = JExpr.direct("outIndex");
     g.rotateBlock();
-    for(ValueVector vv : incoming){
-      JClass vvClass = (JClass) g.getModel()._ref(vv.getClass());
-      JVar inVV = declareVVSetup("incoming", g, fieldId, vvClass);
-      JVar outVV = declareVVSetup("outgoing", g, fieldId, vvClass);
+    for(VectorWrapper<?> vv : incoming){
+      JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), fieldId, vv.isHyper()));
+      JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+
+      if(hyper){
+        
+        g.getBlock().add( 
+            outVV
+            .invoke("copyFrom")
+            .arg(
+                inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+            .arg(outIndex)
+            .arg(
+                inVV.component(inIndex.shrz(JExpr.lit(16)))
+                )
+            );  
+      }else{
+        g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
+      }
       
-      g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
       
       fieldId++;
     }
   }
   
-  private JVar declareVVSetup(String varName, CodeGenerator<?> g, int fieldId, JClass vvClass){
-    JVar vv = g.declareClassField("vv", vvClass);
-    JClass t = (JClass) g.getModel()._ref(SchemaChangeException.class);
-    JType objClass = g.getModel()._ref(Object.class);
-    JBlock b = g.getSetupBlock();
-    JVar obj = b.decl( //
-        objClass, //
-        g.getNextVar("tmp"), // 
-        JExpr.direct(varName).invoke("getValueVectorById").arg(JExpr.lit(fieldId)).arg( vvClass.dotclass()));
-        b._if(obj.eq(JExpr._null()))._then()._throw(JExpr._new(t).arg(JExpr.lit(String.format("Failure while loading vector %s with id %d", vv.name(), fieldId))));
-        b.assign(vv, JExpr.cast(vvClass, obj));
-        
-    return vv;
-  }
-  
+
   @Override
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);
   }
   
+  private VectorAllocator getAllocator4(ValueVector outgoing){
+    if(outgoing instanceof FixedWidthVector){
+      return new FixedVectorAllocator((FixedWidthVector) outgoing);
+    }else if(outgoing instanceof VariableWidthVector ){
+      return new VariableEstimatedVector( (VariableWidthVector) outgoing, 50);
+    }else{
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  
   private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
     if(outgoing instanceof FixedWidthVector){
       return new FixedVectorAllocator((FixedWidthVector) outgoing);
@@ -273,11 +225,23 @@ public class RemovingRecordBatch implements RecordBatch{
       out.allocateNew(recordCount);
       out.getMutator().setValueCount(recordCount);
     }
-
+  }
+  
+  private class VariableEstimatedVector implements VectorAllocator{
+    VariableWidthVector out;
+    int avgWidth;
     
+    public VariableEstimatedVector(VariableWidthVector out, int avgWidth) {
+      super();
+      this.out = out;
+      this.avgWidth = avgWidth;
+    }
     
+    public void alloc(int recordCount){
+      out.allocateNew(avgWidth * recordCount, recordCount);
+      out.getMutator().setValueCount(recordCount);
+    }
   }
-  
   private class VariableVectorAllocator implements VectorAllocator{
     VariableWidthVector in;
     VariableWidthVector out;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
index 4671baa..88708e2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
@@ -16,7 +16,7 @@ public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{
   @Override
   public RecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
-    return new RemovingRecordBatch(children.iterator().next(), context);
+    return new RemovingRecordBatch(config, context, children.iterator().next());
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
new file mode 100644
index 0000000..a2584b8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -0,0 +1,78 @@
+package org.apache.drill.exec.record;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
+  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+  
+  protected final VectorContainer container = new VectorContainer();
+  protected final T popConfig;
+  protected final FragmentContext context;
+  
+  protected AbstractRecordBatch(T popConfig, FragmentContext context) {
+    super();
+    this.context = context;
+    this.popConfig = popConfig;
+  }
+  
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return container.iterator();
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return container.getSchema();
+  }
+
+  @Override
+  public void kill() {
+    container.clear();
+    killIncoming();
+    cleanup();
+  }
+  
+  protected abstract void killIncoming();
+  
+  protected void cleanup(){
+  }
+  
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVector(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+    return container.getVectorAccessor(fieldId, clazz);
+  }
+
+  
+  @Override
+  public WritableBatch getWritableBatch() {
+    return WritableBatch.get(this);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
new file mode 100644
index 0000000..10cc7ab
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -0,0 +1,52 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingleRecordBatch.class);
+  
+  protected final RecordBatch incoming;
+  
+  public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) {
+    super(popConfig, context);
+    this.incoming = incoming;
+  }
+
+  @Override
+  protected void killIncoming() {
+    incoming.kill();
+  }
+
+  @Override
+  public IterOutcome next() {
+    IterOutcome upstream = incoming.next();
+    
+    switch(upstream){
+    case NONE:
+    case NOT_YET:
+    case STOP:
+      container.clear();
+      return upstream;
+    case OK_NEW_SCHEMA:
+      try{
+        setupNewSchema();
+      }catch(SchemaChangeException ex){
+        kill();
+        logger.error("Failure during query", ex);
+        context.fail(ex);
+        return IterOutcome.STOP;
+      }
+      // fall through.
+    case OK:
+      doWork();
+      return upstream; // change if upstream changed, otherwise normal.
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  protected abstract void setupNewSchema() throws SchemaChangeException;
+  protected abstract void doWork();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
new file mode 100644
index 0000000..e8a5cf8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -0,0 +1,54 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
+  
+  private T[] vectors;
+  private MaterializedField f;
+
+  public HyperVectorWrapper(MaterializedField f, T[] v){
+    assert(v.length > 0);
+    this.f = f;
+    this.vectors = v;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Class<T> getVectorClass() {
+    return (Class<T>) vectors.getClass().getComponentType();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return f;
+  }
+
+  @Override
+  public T getValueVector() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public T[] getValueVectors() {
+    return vectors;
+  }
+
+  @Override
+  public boolean isHyper() {
+    return true;
+  }
+
+  @Override
+  public void release() {
+    for(T x : vectors){
+      x.clear();  
+    }
+    
+  }
+  
+  public static <T extends ValueVector> HyperVectorWrapper<T> create(MaterializedField f, T[] v){
+    return new HyperVectorWrapper<T>(f, v);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
deleted file mode 100644
index d820e0e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-
-public class InvalidValueAccessor extends ExecutionSetupException{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InvalidValueAccessor.class);
-
-  public InvalidValueAccessor() {
-    super();
-  }
-
-  public InvalidValueAccessor(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
-
-  public InvalidValueAccessor(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public InvalidValueAccessor(String message) {
-    super(message);
-  }
-
-  public InvalidValueAccessor(Throwable cause) {
-    super(cause);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index fc4e759..f747968 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.record;
 
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -32,7 +31,7 @@ import org.apache.drill.exec.vector.ValueVector;
  * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids
  * provided utilizing getValueVectorId();
  */
-public interface RecordBatch extends Iterable<ValueVector> {
+public interface RecordBatch extends Iterable<VectorWrapper<?>> {
 
   /**
    * Describes the outcome of a RecordBatch being incremented forward.
@@ -93,8 +92,7 @@ public interface RecordBatch extends Iterable<ValueVector> {
    *         TypedFieldId
    */
   public abstract TypedFieldId getValueVectorId(SchemaPath path);
-
-  public abstract <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz);
+  public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
 
   /**
    * Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
@@ -112,43 +110,4 @@ public interface RecordBatch extends Iterable<ValueVector> {
    */
   public WritableBatch getWritableBatch();
 
-  public static class TypedFieldId {
-    final MajorType type;
-    final int fieldId;
-
-    public TypedFieldId(MajorType type, int fieldId) {
-      super();
-      this.type = type;
-      this.fieldId = fieldId;
-    }
-
-    public MajorType getType() {
-      return type;
-    }
-
-    public int getFieldId() {
-      return fieldId;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      TypedFieldId other = (TypedFieldId) obj;
-      if (fieldId != other.fieldId)
-        return false;
-      if (type == null) {
-        if (other.type != null)
-          return false;
-      } else if (!type.equals(other.type))
-        return false;
-      return true;
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ebacd6e..593c28c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -29,18 +29,17 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
 import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-public class RecordBatchLoader implements Iterable<ValueVector>{
+public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
 
-  private List<ValueVector> vectors = Lists.newArrayList();
+  private VectorContainer container = new VectorContainer();
   private final BufferAllocator allocator;
   private int recordCount; 
   private BatchSchema schema;
@@ -66,11 +65,12 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
     boolean schemaChanged = false;
 
     Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
-    for(ValueVector v : this.vectors){
+    for(VectorWrapper<?> w : container){
+      ValueVector v = w.getValueVector();
       oldFields.put(v.getField(), v);
     }
     
-    List<ValueVector> newVectors = Lists.newArrayList();
+    VectorContainer newVectors = new VectorContainer();
 
     List<FieldMetadata> fields = def.getFieldList();
     
@@ -79,7 +79,7 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
       FieldDef fieldDef = fmd.getDef();
       ValueVector v = oldFields.remove(fieldDef);
       if(v != null){
-        newVectors.add(v);
+        container.add(v);
         continue;
       }
       
@@ -101,49 +101,51 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
     
     // rebuild the schema.
     SchemaBuilder b = BatchSchema.newBuilder();
-    for(ValueVector v : newVectors){
+    for(VectorWrapper<?> v : newVectors){
       b.addField(v.getField());
     }
     b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
     this.schema = b.build();
-    vectors = ImmutableList.copyOf(newVectors);
+    container = newVectors;
     return schemaChanged;
 
   }
 
-  public TypedFieldId getValueVector(SchemaPath path) {
-    for(int i =0; i < vectors.size(); i++){
-      ValueVector vv = vectors.get(i);
-      if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i); 
-    }
-    return null;
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVector(path);
   }
   
-  @SuppressWarnings("unchecked")
-  public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
-    ValueVector v = vectors.get(fieldId);
-    assert v != null;
-    if (v.getClass() != clazz){
-      logger.warn(String.format(
-          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
-          clazz.getCanonicalName(), v.getClass().getCanonicalName()));
-      return null;
-    }
-    return (T) v;
-  }
+  
+  
+//  
+//  @SuppressWarnings("unchecked")
+//  public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
+//    ValueVector v = container.get(fieldId);
+//    assert v != null;
+//    if (v.getClass() != clazz){
+//      logger.warn(String.format(
+//          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
+//          clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+//      return null;
+//    }
+//    return (T) v;
+//  }
 
   public int getRecordCount() {
     return recordCount;
   }
 
-
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
+    return container.getVectorAccessor(fieldId, clazz);
+  }
+  
   public WritableBatch getWritableBatch(){
-    return WritableBatch.getBatchNoSV(recordCount, vectors);
+    return WritableBatch.getBatchNoSVWrap(recordCount, container);
   }
 
   @Override
-  public Iterator<ValueVector> iterator() {
-    return this.vectors.iterator();
+  public Iterator<VectorWrapper<?>> iterator() {
+    return this.container.iterator();
   }
 
   public BatchSchema getSchema(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
deleted file mode 100644
index 9bc6e5f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record;
-
-public class RecordMaker {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordMaker.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
deleted file mode 100644
index 86c963d..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.drill.exec.record;
-
-/**
- * Remove the selection vector from a record batch.
- */
-public class RecordRemapper {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRemapper.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
new file mode 100644
index 0000000..94700a2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -0,0 +1,49 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class);
+  
+  private T v;
+
+  public SimpleVectorWrapper(T v){
+    this.v = v;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Class<T> getVectorClass() {
+    return (Class<T>) v.getClass();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return v.getField();
+  }
+
+  @Override
+  public T getValueVector() {
+    return v;
+  }
+
+  @Override
+  public T[] getValueVectors() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isHyper() {
+    return false;
+  }
+  
+  
+  @Override
+  public void release() {
+    v.clear();
+  }
+
+  public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v){
+    return new SimpleVectorWrapper<T>(v);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
new file mode 100644
index 0000000..3905fa8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
@@ -0,0 +1,58 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+
+public class TypedFieldId {
+  final MajorType type;
+  final int fieldId;
+  final boolean isHyperReader;
+
+  public TypedFieldId(MajorType type, int fieldId){
+    this(type, fieldId, false);
+  }
+  
+  public TypedFieldId(MajorType type, int fieldId, boolean isHyper) {
+    super();
+    this.type = type;
+    this.fieldId = fieldId;
+    this.isHyperReader = isHyper;
+  }
+
+  public boolean isHyperReader(){
+    return isHyperReader;
+  }
+  
+  public MajorType getType() {
+    return type;
+  }
+
+  public int getFieldId() {
+    return fieldId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TypedFieldId other = (TypedFieldId) obj;
+    if (fieldId != other.fieldId)
+      return false;
+    if (type == null) {
+      if (other.type != null)
+        return false;
+    } else if (!type.equals(other.type))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "TypedFieldId [type=" + type + ", fieldId=" + fieldId + ", isSuperReader=" + isHyperReader + "]";
+  }
+
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
new file mode 100644
index 0000000..923fbd5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.record;
+
+import java.lang.reflect.Array;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
+
+public class VectorContainer implements Iterable<VectorWrapper<?>> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
+
+  private final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
+  private BatchSchema schema;
+
+  public VectorContainer() {
+  }
+
+  public VectorContainer(List<ValueVector> vectors, List<ValueVector[]> hyperVectors) {
+    assert !vectors.isEmpty() || !hyperVectors.isEmpty();
+
+    addCollection(vectors);
+
+    for (ValueVector[] vArr : hyperVectors) {
+      add(vArr);
+    }
+  }
+  
+  public void addHyperList(List<ValueVector> vectors){
+    schema = null;
+    ValueVector[] vv = new ValueVector[vectors.size()];
+    for(int i =0; i < vv.length; i++){
+      vv[i] = vectors.get(i);
+    }
+    add(vv);
+  }
+
+  public void addCollection(Iterable<ValueVector> vectors) {
+    schema = null;
+    for (ValueVector vv : vectors) {
+      wrappers.add(SimpleVectorWrapper.create(vv));
+    }
+  }
+
+  public TypedFieldId add(ValueVector vv) {
+    schema = null;
+    int i = wrappers.size();
+    wrappers.add(SimpleVectorWrapper.create(vv));
+    return new TypedFieldId(vv.getField().getType(), i, false);
+  }
+
+  public void add(ValueVector[] hyperVector) {
+    assert hyperVector.length != 0;
+    schema = null;
+    Class<?> clazz = hyperVector[0].getClass();
+    ValueVector[] c = (ValueVector[]) Array.newInstance(clazz, hyperVector.length);
+    for (int i = 0; i < hyperVector.length; i++) {
+      c[i] = hyperVector[i];
+    }
+    // todo: work with a merged schema.
+    wrappers.add(HyperVectorWrapper.create(hyperVector[0].getField(), c));
+  }
+
+  public void remove(ValueVector v) {
+    schema = null;
+    for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); iter.hasNext();) {
+      VectorWrapper<?> w = iter.next();
+      if (!w.isHyper() && v == w.getValueVector()) {
+        iter.remove();
+        return;
+      }
+    }
+
+    throw new IllegalStateException("You attempted to remove a vector that didn't exist.");
+  }
+
+  public TypedFieldId getValueVector(SchemaPath path) {
+    for (int i = 0; i < wrappers.size(); i++) {
+      VectorWrapper<?> va = wrappers.get(i);
+      if (va.getField().matches(path))
+        return new TypedFieldId(va.getField().getType(), i, va.isHyper());
+    }
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends ValueVector> VectorWrapper<T> getVectorAccessor(int fieldId, Class<?> clazz) {
+    VectorWrapper<?> va = wrappers.get(fieldId);
+    assert va != null;
+    if (va.getVectorClass() != clazz) {
+      logger.warn(String.format(
+          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
+          clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
+      return null;
+    }
+    return (VectorWrapper<T>) va;
+  }
+
+  public BatchSchema getSchema(){
+    Preconditions.checkNotNull(schema, "Schema is currently null.  You must call buildSchema(SelectionVectorMode) before this container can return a schema.");
+    return schema;
+  }
+  
+  public void buildSchema(SelectionVectorMode mode) {
+    SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(mode);
+    for (VectorWrapper<?> v : wrappers) {
+      bldr.addField(v.getField());
+    }
+    this.schema = bldr.build();
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return wrappers.iterator();
+  }
+
+  public void clear() {
+    // TODO: figure out a better approach for this.
+    // we don't clear schema because we want empty batches to carry previous schema to avoid extra schema update for no data.
+    // schema = null;
+    for (VectorWrapper<?> w : wrappers) {
+      w.release();
+    }
+    wrappers.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
new file mode 100644
index 0000000..e40dee4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -0,0 +1,14 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+
+public interface VectorWrapper<T extends ValueVector> {
+
+  public Class<T> getVectorClass();
+  public MaterializedField getField();
+  public T getValueVector();
+  public T[] getValueVectors();
+  public boolean isHyper();
+  public void release();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 685cc77..e84bf37 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.ValueVector;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -57,6 +58,15 @@ public class WritableBatch {
     return buffers;
   }
 
+  public static WritableBatch getBatchNoSVWrap(int recordCount, Iterable<VectorWrapper<?>> vws) {
+    List<ValueVector> vectors = Lists.newArrayList();
+    for(VectorWrapper<?> vw : vws){
+      Preconditions.checkArgument(!vw.isHyper());
+      vectors.add(vw.getValueVector());
+    }
+    return getBatchNoSV(recordCount, vectors);
+  }
+  
   public static WritableBatch getBatchNoSV(int recordCount, Iterable<ValueVector> vectors) {
     List<ByteBuf> buffers = Lists.newArrayList();
     List<FieldMetadata> metadata = Lists.newArrayList();
@@ -83,7 +93,7 @@ public class WritableBatch {
   
   public static WritableBatch get(RecordBatch batch) {
     if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE) throw new UnsupportedOperationException("Only batches without selections vectors are writable.");
-    return getBatchNoSV(batch.getRecordCount(), batch);
+    return getBatchNoSVWrap(batch.getRecordCount(), batch);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 1f3874f..2020f92 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -20,22 +20,58 @@ package org.apache.drill.exec.record.selection;
 
 import io.netty.buffer.ByteBuf;
 
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.DeadBuf;
+import org.apache.drill.exec.exception.SchemaChangeException;
 
 public class SelectionVector4 {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
 
-  private final BufferAllocator allocator;
-  private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
-
-  public SelectionVector4(BufferAllocator allocator) {
-    this.allocator = allocator;
+  private final ByteBuf vector;
+  private final int recordCount;
+  private int start;
+  private int length;
+  
+  public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) throws SchemaChangeException {
+    if(recordCount > Integer.MAX_VALUE /4) throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size.  You requested an allocation of %d bytes.", recordCount * 4));
+    this.recordCount = recordCount;
+    this.start = 0;
+    this.length = Math.min(batchRecordCount, recordCount);
+    this.vector = vector;
   }
-
-  public int getCount(){
-    return -1;
+  
+  public int getTotalCount(){
+    return recordCount;
+  }
+  
+  public int getCurrentCount(){
+    return length;
+  }
+  
+  public void set(int index, int compound){
+    vector.setInt(index*4, compound);
+  }
+  public void set(int index, int recordBatch, int recordIndex){
+    vector.setInt(index*4, (recordBatch << 16) | (recordIndex & 65535));
+  }
+  
+  public int get(int index){
+    return vector.getInt(index*4);
   }
 
+  public int getStart() {
+    return start;
+  }
 
+  public int getLength() {
+    return length;
+  }
+  
+  public boolean next(){
+    if(start + length == recordCount) return false;
+    start = start+length;
+    int newEnd = Math.min(start+length, recordCount);
+    length = newEnd - start;
+    return true;
+  }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
new file mode 100644
index 0000000..722f4d5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
@@ -0,0 +1,37 @@
+package org.apache.drill.exec.record.selection;
+
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.collect.Lists;
+
+public class SelectionVector4Builder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4Builder.class);
+  
+  private List<BatchSchema> schemas = Lists.newArrayList();
+  
+  public void add(RecordBatch batch, boolean newSchema) throws SchemaChangeException{
+    if(!schemas.isEmpty() && newSchema) throw new SchemaChangeException("Currently, the sv4 builder doesn't support embedded types");
+    if(newSchema){
+      schemas.add(batch.getSchema());
+    }
+    
+  }
+  
+  
+  // deals with managing selection vectors.
+  // take a four byte int
+  /**
+   * take a four byte value
+   * use the first two as a pointer.  use the other two as a
+   * 
+   *  we should manage an array of valuevectors
+   */
+  
+  private class VectorSchemaBuilder{
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 36fd199..b2283a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcBus;
 import org.apache.drill.exec.rpc.RpcException;
@@ -54,29 +55,40 @@ public class QueryResultHandler {
     final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
     final QueryResultBatch batch = new QueryResultBatch(result, dBody);
     UserResultsListener l = resultsListener.get(result.getQueryId());
+    
+    boolean failed = batch.getHeader().getQueryState() == QueryState.FAILED;
     // logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
-    if (l != null) {
-      // logger.debug("Results listener available, using existing.");
-      l.resultArrived(batch);
-      if (result.getIsLastChunk()) {
-        resultsListener.remove(result.getQueryId(), l);
-      }
-    } else {
-      logger.debug("Results listener not available, creating a buffering listener.");
-      // manage race condition where we start getting results before we receive the queryid back.
+    if (l == null) {
       BufferingListener bl = new BufferingListener();
       l = resultsListener.putIfAbsent(result.getQueryId(), bl);
-      if (l != null) {
-        l.resultArrived(batch);
-      } else {
-        bl.resultArrived(batch);
-      }
+      // if we had a succesful insert, use that reference.  Otherwise, just throw away the new bufering listener.
+      if (l == null) l = bl;
     }
+      
+    if(failed){
+      l.submissionFailed(new RpcException("Remote failure while running query." + batch.getHeader().getErrorList()));
+      resultsListener.remove(result.getQueryId(), l);
+    }else{
+      l.resultArrived(batch);
+    }
+    
+    if (
+        (failed || result.getIsLastChunk())
+        && 
+        (!(l instanceof BufferingListener) || ((BufferingListener)l).output != null)
+        ) {
+      resultsListener.remove(result.getQueryId(), l);
+    }
+
   }
 
+  
+  
   private class BufferingListener implements UserResultsListener {
 
     private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+    private volatile boolean finished = false;
+    private volatile RpcException ex;
     private volatile UserResultsListener output;
 
     public boolean transferTo(UserResultsListener l) {
@@ -87,12 +99,19 @@ public class QueryResultHandler {
           l.resultArrived(r);
           last = r.getHeader().getIsLastChunk();
         }
+        if(ex != null){
+          l.submissionFailed(ex);
+          return true;
+        }
         return last;
       }
     }
 
+    
     @Override
     public void resultArrived(QueryResultBatch result) {
+      if(result.getHeader().getIsLastChunk()) finished = true;
+      
       synchronized (this) {
         if (output == null) {
           this.results.add(result);
@@ -104,7 +123,18 @@ public class QueryResultHandler {
 
     @Override
     public void submissionFailed(RpcException ex) {
-      throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+      finished = true;
+      synchronized (this) {
+        if (output == null){
+          this.ex = ex;
+        } else{
+          output.submissionFailed(ex);
+        }
+      }
+    }
+    
+    public boolean isFinished(){
+      return finished;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index ad54a07..1b1e39a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -28,6 +28,7 @@ abstract class BaseDataValueVector extends BaseValueVector{
       valueCount = 0;
     }
   }
+
   
   @Override
   public ByteBuf[] getBuffers(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
new file mode 100644
index 0000000..d43c38e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
@@ -0,0 +1,7 @@
+package org.apache.drill.exec.vector;
+
+
+public interface SerializableVector extends ValueVector{
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 717c087..dfe8e8c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -32,6 +32,8 @@ import org.apache.drill.exec.record.TransferPair;
  */
 public interface ValueVector extends Closeable {
 
+  public int getBufferSize();
+  
   /**
    * Alternative to clear(). Allows use as closeable in try-with-resources.
    */

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
index 7668bdc..9fd33b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
 import org.apache.drill.exec.work.foreman.ErrorHelper;
 
-public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
+public abstract class AbstractFragmentRunnerListener implements FragmentRunnerListener{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFragmentRunnerListener.class);
   
   private FragmentContext context;
@@ -90,9 +90,9 @@ public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
     statusChange(handle, statusBuilder.build());
   }
   
-  protected void statusChange(FragmentHandle handle, FragmentStatus status){
+  protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
     
-  }
+  
   
   @Override
   public final void fail(FragmentHandle handle, String message, Throwable excep) {
@@ -103,7 +103,6 @@ public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
 
   protected void fail(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
     statusChange(handle, statusBuilder.build());
-    // TODO: ensure the foreman handles the exception
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
index 74fcd2b..ef7bcb1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
@@ -38,8 +38,10 @@ public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListen
     this.tunnel = tunnel;
   }
   
+  
   @Override
   protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+    logger.debug("Sending remote failure.");
     tunnel.sendFragmentStatus(status);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
index d4c4014..9a33109 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
@@ -31,11 +31,24 @@ public class ErrorHelper {
     DrillPBError.Builder builder = DrillPBError.newBuilder();
     builder.setEndpoint(endpoint);
     builder.setErrorId(id);
+    StringBuilder sb = new StringBuilder();
     if(message != null){
-      builder.setMessage(message);  
-    }else{
-      builder.setMessage(t.getMessage());
+      sb.append(message);
     }
+      
+    do{
+      sb.append(" < ");
+      sb.append(t.getClass().getSimpleName());
+      if(t.getMessage() != null){
+        sb.append(":[ ");
+        sb.append(t.getMessage());
+        sb.append(" ]");
+      }
+    }while(t.getCause() != null && t.getCause() != t);
+    
+    builder.setMessage(sb.toString());
+    
+
     builder.setErrorType(0);
     
     // record the error to the log for later reference.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 65fc8c7..af91a6b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
 import org.apache.drill.exec.rpc.RpcException;
@@ -61,6 +62,7 @@ class RunningFragmentManager implements FragmentStatusListener{
   private ForemanManagerListener foreman;
   private AtomicInteger remainingFragmentCount;
   private FragmentRunner rootRunner;
+  private volatile QueryId queryId;
   
   public RunningFragmentManager(ForemanManagerListener foreman, TunnelManager tun) {
     super();
@@ -72,6 +74,7 @@ class RunningFragmentManager implements FragmentStatusListener{
 
   public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments) throws ExecutionSetupException{
     remainingFragmentCount.set(leafFragments.size()+1);
+    queryId = rootFragment.getHandle().getQueryId();
 
     // set up the root fragment first so we'll have incoming buffers available.
     {
@@ -146,7 +149,7 @@ class RunningFragmentManager implements FragmentStatusListener{
   private void fail(FragmentStatus status){
     updateStatus(status);
     stopQuery();
-    QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.FAILED).build();
+    QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getError()).build();
     foreman.cleanupAndSendResult(result);
   }
  
@@ -262,7 +265,7 @@ class RunningFragmentManager implements FragmentStatusListener{
 
     @Override
     protected void statusChange(FragmentHandle handle, FragmentStatus status) {
-      RunningFragmentManager.this.updateStatus(status);
+      RunningFragmentManager.this.statusUpdate(status);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
new file mode 100644
index 0000000..a41a883
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
@@ -0,0 +1,181 @@
+package org.apache.drill.exec.expr;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+
+public final class EscapeTest1 {
+  
+  public static class Timer{
+    long n1;
+    String name;
+    public Timer(String name){
+      this.n1 = System.nanoTime();
+      this.name = name;
+    }
+    
+    public void print(long sum){
+      System.out.println(String.format("Completed %s in %d ms.  Output was %d", name, (System.nanoTime() - n1)/1000/1000, sum));
+    }
+  }
+  
+  public static Timer time(String name){
+    return new Timer(name);
+  }
+  
+  public static void main(String args[]){
+    EscapeTest1 et = new EscapeTest1();
+    Monkey m = new Monkey();
+    for(int i =0; i < 10; i++){
+      time("noalloc").print(et.noAlloc());
+      time("alloc").print(et.alloc());
+      time("set noalloc").print(et.setNoAlloc(m));
+      time("set alloc").print(et.setAlloc(m));
+      time("get noalloc").print(et.getNoAlloc(m));
+      time("get alloc").print(et.getAlloc(m));
+      time("get return alloc").print(et.getReturnAlloc(m));
+    }
+  }
+  
+  public long noAlloc(){
+    long sum = 0;
+    for(int i =0; i < 1000000000; i++){
+      sum+= add(i+1, i+2);
+    }
+    return sum;
+  }
+  
+  public long alloc(){
+    long sum = 0;
+    for(int i =0; i < 1000000000; i++){
+      Ad ad = new Ad(i+1, i+2); 
+      sum+= add(ad.x, ad.y);
+    }
+    return sum;
+  }
+  
+  public long setAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      EH h = new EH(i+1, i+2);
+      m.set(h);
+      sum += i;
+    }
+    return sum; 
+  }
+  
+  public long setNoAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      m.set(i+1, i+2);
+      sum += i;
+    }
+    return sum; 
+  }
+  
+  public long getAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      RR r = new RR();
+      m.get(i, i+1, r);
+      sum += r.v1 + r.v2;
+    }
+    return sum; 
+  }
+  
+  public long getNoAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      int i1 = m.getV1(i);
+      int i2 = m.getV2(i+1);
+      sum += i1 + i2;
+    }
+    return sum; 
+  }
+  
+  public long getReturnAlloc(Monkey m){
+    long sum = 0;
+    for(int i =0; i < 490000000; i++){
+      RR r = m.get(i, i+1);
+      sum += r.v1 + r.v2;
+    }
+    return sum; 
+  }
+  
+  
+  public class Ad{
+    long x;
+    long y;
+    public Ad(long x, long y) {
+      super();
+      this.x = x;
+      this.y = y;
+    }
+  }
+
+  
+  public static final class EH{
+    int index;
+    int value;
+    public EH(int index, int value) {
+      super();
+      this.index = index;
+      this.value = value;
+    }
+  }
+  
+  public static final class RR{
+    int v1;
+    int v2;
+    
+    public RR(){
+      
+    }
+    public RR(int v1, int v2) {
+      super();
+      this.v1 = v1;
+      this.v2 = v2;
+    }
+  }
+  
+  public long add(long a, long b){
+    return a + b;
+  }
+  
+  
+  public final static class Monkey{
+    final IntBuffer buf;
+    
+    public Monkey(){
+      ByteBuffer bb = ByteBuffer.allocateDirect(Integer.MAX_VALUE);
+      buf = bb.asIntBuffer();
+    }
+    
+    public final void set(int index, int value){
+      buf.put(index, value);
+    }
+    
+    public final void set(EH a){
+      buf.put(a.index, a.value);
+    }
+    
+    public final int getV1(int index){
+      return buf.get(index);
+    }
+    
+    public final int getV2(int index){
+      return buf.get(index);
+    }
+    
+    public final RR get(int index1, int index2){
+      return new RR(buf.get(index1), buf.get(index2));
+    }
+    
+    public final void get(int index1, int index2, RR rr){
+      rr.v1 = buf.get(index1);
+      rr.v2 = buf.get(index2);
+    }
+    
+  }
+  
+  
+}