You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/15 22:24:06 UTC

[6/7] git commit: Separate allocate and load methods. rename setRecordCount to setValueCount add setGroupAndValueCount to RepeatedVectors. add a number of marker/cross-inheritance interfaces.

Separate allocate and load methods.
rename setRecordCount to setValueCount
add setGroupAndValueCount to RepeatedVectors.
add a number of marker/cross-inheritance interfaces.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9ca9eb9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9ca9eb9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9ca9eb9b

Branch: refs/heads/execwork
Commit: 9ca9eb9b3d88e86e28d1b688d9cd943e6a7f08df
Parents: 36793bb
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon Jul 15 10:50:07 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jul 15 13:13:59 2013 -0700

----------------------------------------------------------------------
 .../templates/FixedValueVectors.java            | 118 +++++---
 .../templates/NullableValueVectors.java         | 220 ++++++++++-----
 .../templates/RepeatedValueVectors.java         | 273 ++++++++++++++-----
 .../templates/VariableLengthVectors.java        | 177 +++++++-----
 .../exec/physical/config/MockRecordReader.java  |  37 ++-
 .../drill/exec/record/MaterializedField.java    |  16 ++
 .../drill/exec/record/RecordBatchLoader.java    |   7 +-
 .../apache/drill/exec/record/WritableBatch.java |   3 +-
 .../drill/exec/store/JSONRecordReader.java      |   4 +-
 .../apache/drill/exec/store/VectorHolder.java   |  21 +-
 .../drill/exec/vector/BaseDataValueVector.java  |  47 ++++
 .../drill/exec/vector/BaseValueVector.java      |  38 +++
 .../org/apache/drill/exec/vector/BitVector.java | 112 +++++---
 .../apache/drill/exec/vector/ByteHolder.java    |  12 +
 .../drill/exec/vector/FixedWidthVector.java     |  23 ++
 .../drill/exec/vector/NonRepeatedMutator.java   |   7 +
 .../exec/vector/RepeatedFixedWidthVector.java   |  22 ++
 .../vector/RepeatedVariableWidthVector.java     |  24 ++
 .../apache/drill/exec/vector/ValueVector.java   | 179 ++++--------
 .../drill/exec/vector/VariableWidthVector.java  |  29 ++
 .../apache/drill/exec/work/foreman/Foreman.java |   5 +-
 .../physical/impl/TestSimpleFragmentRun.java    |   2 +-
 .../exec/record/vector/TestValueVector.java     |  81 +++---
 .../drill/exec/store/JSONRecordReaderTest.java  |   2 +-
 24 files changed, 995 insertions(+), 464 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
index 09dd5d8..7583d9f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
@@ -12,6 +12,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.MsgPack2Vector;
 
 import java.util.Random;
@@ -26,71 +27,100 @@ import java.util.Random;
  * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
  */
 @SuppressWarnings("unused")
-public final class ${minor.class}Vector extends ValueVector {
+public final class ${minor.class}Vector extends BaseDataValueVector implements FixedWidthVector{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class);
 
+ 
+  private final Accessor accessor = new Accessor();
+  private final Mutator mutator = new Mutator();
+  
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
   }
 
+  public int getValueCapacity(){
+    return (int) (data.capacity() *1.0 / ${type.width});
+  }
+
+  public Accessor getAccessor(){
+    return accessor;
+  }
+  
+  public Mutator getMutator(){
+    return mutator;
+  }
+
   /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
+   * Allocate a new buffer that supports setting at least the provided number of values.  May actually be sized bigger depending on underlying buffer rounding size. Must be called prior to using the ValueVector.
    * @param valueCount
-   *          The number of values which can be contained within this vector.
    */
   public void allocateNew(int valueCount) {
-    totalBytes = valueCount * ${type.width};
-    allocateNew(totalBytes, allocator.buffer(totalBytes), valueCount);
+    clear();
+    this.data = allocator.buffer(valueCount * ${type.width});
+    this.data.retain();
+    this.data.readerIndex(0);
   }
-
+  
   @Override
-  public int getAllocatedSize() {
-    return (int) Math.ceil(totalBytes);
+  public FieldMetadata getMetadata() {
+    return FieldMetadata.newBuilder()
+             .setDef(getField().getDef())
+             .setValueCount(recordCount)
+             .setBufferLength(recordCount * ${type.width})
+             .build();
   }
 
-  /**
-   * Get the size requirement (in bytes) for the given number of values.  Only accurate
-   * for fixed width value vectors.
-   */
   @Override
-  public int getSizeFromCount(int valueCount) {
-    return valueCount * ${type.width};
+  public int load(int valueCount, ByteBuf buf){
+    clear();
+    this.recordCount = valueCount;
+    int len = recordCount * ${type.width};
+    data = buf.slice(0, len);
+    data.retain();
+    return len;
   }
-
-  public Mutator getMutator() {
-    return new Mutator();
+  
+  @Override
+  public void load(FieldMetadata metadata, ByteBuf buffer) {
+    assert this.field.getDef().equals(metadata.getDef());
+    int loaded = load(metadata.getValueCount(), buffer);
+    assert metadata.getBufferLength() == loaded;
   }
+  
+  public final class Accessor extends BaseValueVector.BaseAccessor{
 
- <#if (type.width > 8)>
-
-  public ${minor.javaType!type.javaType} get(int index) {
-    ByteBuf dst = allocator.buffer(${type.width});
-    data.getBytes(index * ${type.width}, dst, 0, ${type.width});
-    return dst;
-  }
+    public int getRecordCount() {
+      return recordCount;
+    }
+    
+    <#if (type.width > 8)>
 
-  @Override
-  public Object getObject(int index) {
-    ByteBuf dst = allocator.buffer(${type.width});
-    data.getBytes(index, dst, 0, ${type.width});
-    return dst;
-  }
+    public ${minor.javaType!type.javaType} get(int index) {
+      ByteBuf dst = allocator.buffer(${type.width});
+      data.getBytes(index * ${type.width}, dst, 0, ${type.width});
+      return dst;
+    }
 
+    @Override
+    public Object getObject(int index) {
+      ByteBuf dst = allocator.buffer(${type.width});
+      data.getBytes(index, dst, 0, ${type.width});
+      return dst;
+    }
 
- <#else> <#-- type.width <= 8 -->
+    <#else> <#-- type.width <= 8 -->
 
-  public ${minor.javaType!type.javaType} get(int index) {
-    return data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
-  }
+    public ${minor.javaType!type.javaType} get(int index) {
+      return data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+    }
 
-  public Object getObject(int index) {
-    return get(index);
-  }
+    public Object getObject(int index) {
+      return get(index);
+    }
 
 
- </#if> <#-- type.width -->
- 
+   </#if> <#-- type.width -->
+ }
  
  /**
   * ${minor.class}.Mutator implements a mutable vector of fixed width values.  Elements in the
@@ -101,7 +131,7 @@ public final class ${minor.class}Vector extends ValueVector {
   *
   * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
   */
-  public class Mutator implements ValueVector.Mutator{
+  public final class Mutator extends BaseValueVector.BaseMutator{
 
     private Mutator(){};
    /**
@@ -147,9 +177,9 @@ public final class ${minor.class}Vector extends ValueVector {
    }
   </#if> <#-- type.width -->
   
-   @Override
-   public void setRecordCount(int recordCount) {
-     ${minor.class}Vector.this.setRecordCount(recordCount);
+   public void setValueCount(int recordCount) {
+     ${minor.class}Vector.this.recordCount = recordCount;
+     data.writerIndex(${type.width} * recordCount);
    }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
index c7de73f..3232f87 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
@@ -10,12 +10,15 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 import java.util.Random;
+import java.util.Vector;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.SchemaDefProtos;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.UInt2Vector;
 import org.apache.drill.exec.vector.UInt4Vector;
 
@@ -27,93 +30,180 @@ import org.apache.drill.exec.vector.UInt4Vector;
  * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
  */
 @SuppressWarnings("unused")
-public final class Nullable${minor.class}Vector extends ValueVector {
+public final class Nullable${minor.class}Vector extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector {
 
+  private int recordCount;
   private final BitVector bits;
   private final ${minor.class}Vector values;
+  private final Accessor accessor = new Accessor();
+  private final Mutator mutator = new Mutator();
 
   public Nullable${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
-    bits = new BitVector(null, allocator);
-    values = new ${minor.class}Vector(null, allocator);
+    this.bits = new BitVector(null, allocator);
+    this.values = new ${minor.class}Vector(null, allocator);
   }
-
-  /**
-   * Get the element at the specified position.
-   *
-   * @param   index   position of the value
-   * @return  value of the element, if not null
-   * @throws  NullValueException if the value is null
-   */
-  public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
-    assert !isNull(index);
-    return values.get(index);
+  
+  public int getValueCapacity(){
+    return bits.getValueCapacity();
+  }
+  
+  @Override
+  public ByteBuf[] getBuffers() {
+    return new ByteBuf[]{bits.data, values.data};
   }
 
-
-  public boolean isNull(int index) {
-    return bits.get(index) == 0;
+  @Override
+  public void clear() {
+    recordCount = 0;
+    bits.clear();
+    values.clear();
+  }
+  
+  int getBufferSize(){
+    return values.getBufferSize() + bits.getBufferSize();
   }
 
-  public int isSet(int index){
-    return bits.get(index);
+  <#if type.major == "VarLen">
+  @Override
+  public FieldMetadata getMetadata() {
+    return FieldMetadata.newBuilder()
+             .setDef(getField().getDef())
+             .setValueCount(recordCount)
+             .setVarByteLength(values.getVarByteLength())
+             .setBufferLength(getBufferSize())
+             .build();
   }
   
-  /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
-   * @param valueCount   The number of values which may be contained by this vector.
-   */
-  public void allocateNew(int totalBytes, ByteBuf sourceBuffer, int valueCount) {
-    values.allocateNew(totalBytes, sourceBuffer, valueCount);
+  @Override
+  public void allocateNew(int totalBytes, int valueCount) {
+    values.allocateNew(totalBytes, valueCount);
     bits.allocateNew(valueCount);
+    mutator.reset();
+    accessor.reset();
   }
 
   @Override
-  public int getAllocatedSize() {
-    return bits.getAllocatedSize() + values.getAllocatedSize();
+  public int load(int dataBytes, int valueCount, ByteBuf buf){
+    clear();
+    this.recordCount = valueCount;
+    int loaded = bits.load(valueCount, buf);
+    
+    // remove bits part of buffer.
+    buf = buf.slice(loaded, buf.capacity() - loaded);
+    loaded += values.load(dataBytes, valueCount, buf);
+    return loaded;
   }
-
-  /**
-   * Get the size requirement (in bytes) for the given number of values.  Only accurate
-   * for fixed width value vectors.
-   */
-  public int getTotalSizeFromCount(int valueCount) {
-    return values.getSizeFromCount(valueCount) + bits.getSizeFromCount(valueCount);
+  
+  @Override
+  public void load(FieldMetadata metadata, ByteBuf buffer) {
+    assert this.field.getDef().equals(metadata.getDef());
+    int loaded = load(metadata.getVarByteLength(), metadata.getValueCount(), buffer);
+    assert metadata.getBufferLength() == loaded;
   }
   
-  public int getSizeFromCount(int valueCount){
-    return getTotalSizeFromCount(valueCount);
+  @Override
+  public int getByteCapacity(){
+    return values.getByteCapacity();
   }
 
+  <#else>
   @Override
-  public MaterializedField getField() {
-    return field;
+  public FieldMetadata getMetadata() {
+    return FieldMetadata.newBuilder()
+             .setDef(getField().getDef())
+             .setValueCount(recordCount)
+             .setBufferLength(getBufferSize())
+             .build();
   }
-
+  
   @Override
-  public ByteBuf[] getBuffers() {
-    return new ByteBuf[]{bits.data, values.data};
+  public void allocateNew(int valueCount) {
+    values.allocateNew(valueCount);
+    bits.allocateNew(valueCount);
+    mutator.reset();
+    accessor.reset();
   }
-
-
+  
+  @Override
+  public int load(int valueCount, ByteBuf buf){
+    clear();
+    this.recordCount = valueCount;
+    int loaded = bits.load(valueCount, buf);
+    
+    // remove bits part of buffer.
+    buf = buf.slice(loaded, buf.capacity() - loaded);
+    loaded += values.load(valueCount, buf);
+    return loaded;
+  }
+  
   @Override
-  public Object getObject(int index) {
-    return isNull(index) ? null : values.getObject(index);
+  public void load(FieldMetadata metadata, ByteBuf buffer) {
+    assert this.field.getDef().equals(metadata.getDef());
+    int loaded = load(metadata.getValueCount(), buffer);
+    assert metadata.getBufferLength() == loaded;
+  }
+  
+  </#if>
+  
+  public Accessor getAccessor(){
+    return accessor;
   }
   
   public Mutator getMutator(){
-    return new Mutator();
+    return mutator;
+  }
+  
+  public ${minor.class}Vector convertToRequiredVector(){
+    ${minor.class}Vector v = new ${minor.class}Vector(getField().getOtherNullableVersion(), allocator);
+    v.data = values.data;
+    v.recordCount = this.recordCount;
+    v.data.retain();
+    clear();
+    return v;
   }
   
-  public class Mutator implements ValueVector.Mutator{
+  
+  
+  public final class Accessor implements ValueVector.Accessor{
 
-    private final BitVector.Mutator bitMutator;
-    private final ${minor.class}Vector.Mutator valueMutator;
+    /**
+     * Get the element at the specified position.
+     *
+     * @param   index   position of the value
+     * @return  value of the element, if not null
+     * @throws  NullValueException if the value is null
+     */
+    public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
+      assert !isNull(index);
+      return values.getAccessor().get(index);
+    }
+
+    public boolean isNull(int index) {
+      return isSet(index) == 0;
+    }
+
+    public int isSet(int index){
+      return bits.getAccessor().get(index);
+    }
+    
+    @Override
+    public Object getObject(int index) {
+      return isNull(index) ? null : values.getAccessor().getObject(index);
+    }
+    
+    public int getRecordCount(){
+      return recordCount;
+    }
+    
+    public void reset(){}
+  }
+  
+  public final class Mutator implements ValueVector.Mutator{
+    
+    private int setCount;
     
     private Mutator(){
-      bitMutator = bits.getMutator();
-      valueMutator = values.getMutator();
     }
 
     /**
@@ -123,28 +213,30 @@ public final class Nullable${minor.class}Vector extends ValueVector {
      * @param bytes   array of bytes to write
      */
     public void set(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
-      setNotNull(index);
-      valueMutator.set(index, value);
-    }
-
-    public void setNull(int index) {
-      bitMutator.set(index, 0);
+      setCount++;
+      bits.getMutator().set(index, 1);
+      values.getMutator().set(index, value);
     }
 
-    private void setNotNull(int index) {
-      bitMutator.set(index, 1);
+    public void setValueCount(int recordCount) {
+      assert recordCount >= 0;
+      Nullable${minor.class}Vector.this.recordCount = recordCount;
+      values.getMutator().setValueCount(recordCount);
+      bits.getMutator().setValueCount(recordCount);
     }
     
-    @Override
-    public void setRecordCount(int recordCount) {
-      Nullable${minor.class}Vector.this.setRecordCount(recordCount);
-      bits.setRecordCount(recordCount);
+    public boolean noNulls(){
+      return recordCount == setCount;
     }
     
     public void randomizeData(){
       throw new UnsupportedOperationException();
     }
     
+    public void reset(){
+      setCount = 0;
+    }
+    
   }
 }
 </#list>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
index 4acc4cc..363e4c8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
@@ -7,12 +7,18 @@ import org.apache.drill.exec.vector.UInt4Vector;
 <@pp.changeOutputFile name="Repeated${minor.class}Vector.java" />
 package org.apache.drill.exec.vector;
 
+
+
+
+
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 import java.util.Random;
+import java.util.Vector;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.SchemaDefProtos;
@@ -30,100 +36,216 @@ import org.apache.drill.exec.record.MaterializedField;
  * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
  */
 
- public final class Repeated${minor.class}Vector extends ValueVector {
-
-  private final UInt2Vector countVector;    // number of repeated elements in each record
-  private final UInt4Vector offsetVector;   // offsets to start of each record
-  private final ${minor.class}Vector valuesVector;
+ public final class Repeated${minor.class}Vector extends BaseValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector {
 
+  private MaterializedField field;
+  
+  private int parentValueCount;
+  private int childValueCount;
+  
+  private final UInt2Vector counts;    // number of repeated elements in each record
+  private final UInt4Vector offsets;   // offsets to start of each record
+  private final ${minor.class}Vector values;
+  private final Mutator mutator = new Mutator();
+  private final Accessor accessor = new Accessor();
+  
+  
   public Repeated${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
-    this.countVector = new UInt2Vector(null, allocator);
-    this.offsetVector = new UInt4Vector(null, allocator);
-    this.valuesVector = new ${minor.class}Vector(null, allocator);
+    this.counts = new UInt2Vector(null, allocator);
+    this.offsets = new UInt4Vector(null, allocator);
+    this.values = new ${minor.class}Vector(null, allocator);
   }
 
-  public void allocateNew(int totalBytes, ByteBuf sourceBuffer, int valueCount) {
-    super.allocateNew(totalBytes, sourceBuffer, valueCount);
-    countVector.allocateNew(valueCount);
-    offsetVector.allocateNew(valueCount);
+  public int getValueCapacity(){
+    return values.getValueCapacity();
+  }
+  
+  int getBufferSize(){
+    return counts.getBufferSize() + offsets.getBufferSize() + values.getBufferSize();
+  }
+  
+  <#if type.major == "VarLen">
+  @Override
+  public FieldMetadata getMetadata() {
+    return FieldMetadata.newBuilder()
+             .setDef(getField().getDef())
+             .setGroupCount(this.parentValueCount)
+             .setValueCount(this.childValueCount)
+             .setVarByteLength(values.getVarByteLength())
+             .setBufferLength(getBufferSize())
+             .build();
+  }
+  
+  public void allocateNew(int totalBytes, int parentValueCount, int childValueCount) {
+    counts.allocateNew(parentValueCount);
+    offsets.allocateNew(parentValueCount);
+    values.allocateNew(totalBytes, childValueCount);
+    mutator.reset();
+    accessor.reset();
   }
 
+  
 
-  /**
-   * Get a value for the given record.  Each element in the repeated field is accessed by
-   * the positionIndex param.
-   *
-   * @param  index           record containing the repeated field
-   * @param  positionIndex   position within the repeated field
-   * @return element at the given position in the given record
-   */
-  public <#if type.major == "VarLen">byte[]
-         <#else>${minor.javaType!type.javaType}
-         </#if> get(int index, int positionIndex) {
-
-    assert positionIndex < countVector.get(index);
-    return valuesVector.get(offsetVector.get(index) + positionIndex);
+  
+  @Override
+  public int load(int dataBytes, int parentValueCount, int childValueCount, ByteBuf buf){
+    clear();
+    this.parentValueCount = parentValueCount;
+    this.childValueCount = childValueCount;
+    int loaded = 0;
+    loaded += counts.load(parentValueCount, buf);
+    loaded += offsets.load(parentValueCount, buf.slice(loaded, buf.capacity() - loaded));
+    loaded += values.load(dataBytes, childValueCount, buf.slice(loaded, buf.capacity() - loaded));
+    return loaded;
   }
-
-  public MaterializedField getField() {
-    return field;
+  
+  @Override
+  public void load(FieldMetadata metadata, ByteBuf buffer) {
+    assert this.field.getDef().equals(metadata.getDef());
+    int loaded = load(metadata.getVarByteLength(), metadata.getGroupCount(), metadata.getValueCount(), buffer);
+    assert metadata.getBufferLength() == loaded;
+  }
+  
+  public int getByteCapacity(){
+    return values.getByteCapacity();
   }
 
-  /**
-   * Get the size requirement (in bytes) for the given number of values.  Only accurate
-   * for fixed width value vectors.
-   */
-  public int getTotalSizeFromCount(int valueCount) {
-    return valuesVector.getSizeFromCount(valueCount) +
-           countVector.getSizeFromCount(valueCount) +
-           offsetVector.getSizeFromCount(valueCount);
+  <#else>
+  
+  @Override
+  public FieldMetadata getMetadata() {
+    return FieldMetadata.newBuilder()
+             .setDef(getField().getDef())
+             .setGroupCount(this.parentValueCount)
+             .setValueCount(this.childValueCount)
+             .setBufferLength(getBufferSize())
+             .build();
   }
   
-  public int getSizeFromCount(int valueCount){
-    return getTotalSizeFromCount(valueCount);
+  public void allocateNew(int parentValueCount, int childValueCount) {
+    clear();
+    values.allocateNew(childValueCount);
+    counts.allocateNew(parentValueCount);
+    offsets.allocateNew(parentValueCount);
+    mutator.reset();
+    accessor.reset();
   }
-
-  /**
-   * Get the explicitly specified size of the allocated buffer, if available.  Otherwise
-   * calculate the size based on width and record count.
-   */
-  public int getAllocatedSize() {
-    return valuesVector.getAllocatedSize() +
-           countVector.getAllocatedSize() +
-           offsetVector.getAllocatedSize();
+  
+  public int load(int parentValueCount, int childValueCount, ByteBuf buf){
+    clear();
+    this.parentValueCount = parentValueCount;
+    this.childValueCount = childValueCount;
+    int loaded = 0;
+    loaded += counts.load(parentValueCount, buf);
+    loaded += offsets.load(parentValueCount, buf.slice(loaded, buf.capacity() - loaded));
+    loaded += values.load(childValueCount, buf.slice(loaded, buf.capacity() - loaded));
+    return loaded;
   }
-
-  /**
-   * Get the elements at the given index.
-   */
-  public int getCount(int index) {
-    return countVector.get(index);
+  
+  @Override
+  public void load(FieldMetadata metadata, ByteBuf buffer) {
+    assert this.field.getDef().equals(metadata.getDef());
+    int loaded = load(metadata.getGroupCount(), metadata.getValueCount(), buffer);
+    assert metadata.getBufferLength() == loaded;
   }
+  </#if>
+  
+//  /**
+//   * Get the size requirement (in bytes) for the given number of values.  Only accurate
+//   * for fixed width value vectors.
+//   */
+//  public int getTotalSizeFromCount(int valueCount) {
+//    return values.getSizeFromCount(valueCount) +
+//           counts.getSizeFromCount(valueCount) +
+//           offsets.getSizeFromCount(valueCount);
+//  }
+//  
+//  public int getSizeFromCount(int valueCount){
+//    return getTotalSizeFromCount(valueCount);
+//  }
+
+//  /**
+//   * Get the explicitly specified size of the allocated buffer, if available.  Otherwise
+//   * calculate the size based on width and record count.
+//   */
+//  public int getAllocatedSize() {
+//    return values.getAllocatedSize() +
+//           counts.getAllocatedSize() +
+//           offsets.getAllocatedSize();
+//  }
+
+
 
   public ByteBuf[] getBuffers() {
-    return new ByteBuf[]{countVector.data, offsetVector.data, data};
+    return new ByteBuf[]{counts.data, offsets.data, values.data};
   }
 
-  public Object getObject(int index) {
-    return data.slice(index, getSizeFromCount(countVector.get(index)));
+  public void clear(){
+    counts.clear();
+    offsets.clear();
+    values.clear();
+    parentValueCount = 0;
+    childValueCount = 0;
   }
 
   public Mutator getMutator(){
-    return new Mutator();
+    return mutator;
+  }
+  
+  public Accessor getAccessor(){
+    return accessor;
   }
   
-  public class Mutator implements ValueVector.Mutator{
+  public final class Accessor implements ValueVector.Accessor{
+    /**
+     * Get the elements at the given index.
+     */
+    public int getCount(int index) {
+      return counts.getAccessor().get(index);
+    }
+    
+    public Object getObject(int index) {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Get a value for the given record.  Each element in the repeated field is accessed by
+     * the positionIndex param.
+     *
+     * @param  index           record containing the repeated field
+     * @param  positionIndex   position within the repeated field
+     * @return element at the given position in the given record
+     */
+    public <#if type.major == "VarLen">byte[]
+           <#else>${minor.javaType!type.javaType}
+           </#if> get(int index, int positionIndex) {
 
+      assert positionIndex < counts.getAccessor().get(index);
+      return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex);
+    }
+
+    public MaterializedField getField() {
+      return field;
+    }
+    
+    public int getGroupCount(){
+      return parentValueCount;
+    }
+    
+    public int getValueCount(){
+      return childValueCount;
+    }
     
-    private final UInt2Vector.Mutator countMutator;
-    private final ${minor.class}Vector.Mutator valuesMutator;
-    private final UInt4Vector.Mutator offsetMutator;
+    public void reset(){
+      
+    }
+  }
+  
+  public final class Mutator implements ValueVector.Mutator{
+
     
     private Mutator(){
-      this.countMutator = countVector.getMutator();
-      this.offsetMutator = offsetVector.getMutator();
-      this.valuesMutator = valuesVector.getMutator();
     }
 
     /**
@@ -137,21 +259,28 @@ import org.apache.drill.exec.record.MaterializedField;
                                <#elseif type.major == "VarLen"> byte[]
                                <#else> int
                                </#if> value) {
-      countMutator.set(index, countVector.get(index) + 1);
-      offsetMutator.set(index, offsetVector.get(index - 1) + countVector.get(index-1));
-      valuesMutator.set(offsetVector.get(index), value);
+      counts.getMutator().set(index, counts.getAccessor().get(index) + 1);
+      offsets.getMutator().set(index, offsets.getAccessor().get(index - 1) + counts.getAccessor().get(index-1));
+      values.getMutator().set(offsets.getAccessor().get(index), value);
     }
+
     
-    public void setRecordCount(int recordCount) {
-      valuesMutator.setRecordCount(recordCount);
-      offsetMutator.setRecordCount(recordCount);
-      countMutator.setRecordCount(recordCount);
+    public void setGroupAndValueCount(int groupCount, int valueCount) {
+      parentValueCount = groupCount;
+      childValueCount = valueCount;
+      counts.getMutator().setValueCount(groupCount);
+      offsets.getMutator().setValueCount(groupCount);
+      values.getMutator().setValueCount(valueCount);
     }
     
     public void randomizeData(){
       throw new UnsupportedOperationException();
     }
     
+    public void reset(){
+      
+    }
+    
   }
 }
 </#list>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
index 954836a..c615258 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
@@ -11,6 +11,7 @@ import static com.google.common.base.Preconditions.checkState;
 import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -18,10 +19,11 @@ import org.apache.drill.exec.proto.SchemaDefProtos;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ByteHolder;
 
 /**
  * ${minor.class}Vector implements a vector of variable width values.  Elements in the vector
- * are accessed by position from the logical start of the vector.  A fixed width lengthVector
+ * are accessed by position from the logical start of the vector.  A fixed width offsetVector
  * is used to convert an element's position to it's offset from the start of the (0-based)
  * ByteBuf.  Size is inferred by adjacent elements.
  *   The width of each element is ${type.width} byte(s)
@@ -30,81 +32,125 @@ import org.apache.drill.exec.record.MaterializedField;
  * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
  */
 @SuppressWarnings("unused")
-public final class ${minor.class}Vector extends ValueVector {
+public final class ${minor.class}Vector extends BaseDataValueVector implements VariableWidthVector{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class);
 
-  private final UInt${type.width}Vector lengthVector;
-  private final UInt${type.width}Vector.Mutator lengthVectorMutator;
-
+  private final UInt${type.width}Vector offsetVector;
+  private final Accessor accessor = new Accessor();
+  private final Mutator mutator = new Mutator();
+  
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
-    this.lengthVector = new UInt${type.width}Vector(null, allocator);
-    this.lengthVectorMutator = lengthVector.getMutator();
+    this.offsetVector = new UInt${type.width}Vector(null, allocator);
   }
 
-  public byte[] get(int index) {
-    checkArgument(index >= 0);
-    int startIdx = 0;
-    int size = 0;
-    if (index == 0) {
-      size = lengthVector.get(1);
-    } else {
-      startIdx = lengthVector.get(index);
-      size = lengthVector.get(index + 1) - startIdx;
-    }
-    checkState(size >= 0);
-    byte[] dst = new byte[size];
-    data.getBytes(startIdx, dst, 0, size);
-    return dst;
-  }
 
-  @Override
-  public int getAllocatedSize() {
-    return lengthVector.getAllocatedSize() + totalBytes;
+  int getSizeFromCount(int valueCount) {
+    return valueCount * ${type.width};
   }
-
+  
+  public int getValueCapacity(){
+    return offsetVector.getValueCapacity();
+  }
+  
+  public int getByteCapacity(){
+    return data.capacity(); 
+  }
+  
   /**
-   * Get the size requirement (in bytes) for the given number of values.  Only accurate
-   * for fixed width value vectors.
+   * Return the number of bytes contained in the current var len byte vector.
+   * @return
    */
-  public int getSizeFromCount(int valueCount) {
-    return valueCount * ${type.width};
+  public int getVarByteLength(){
+    return offsetVector.getAccessor().get(recordCount); 
   }
-
+  
   @Override
-  protected void clear() {
-    super.clear();
-    lengthVector.clear();
+  public FieldMetadata getMetadata() {
+    int len = recordCount * ${type.width} + getVarByteLength();
+    return FieldMetadata.newBuilder()
+             .setDef(getField().getDef())
+             .setValueCount(recordCount)
+             .setVarByteLength(getVarByteLength())
+             .setBufferLength(len)
+             .build();
   }
 
-  /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
-   * @param valueCount
-   *          The number of values which can be contained within this vector.
-   */
-  public void allocateNew(int totalBytes, ByteBuf sourceBuffer, int valueCount) {
-    super.allocateNew(totalBytes, sourceBuffer, valueCount);
-    lengthVector.allocateNew(valueCount);
+  public int load(int dataBytes, int valueCount, ByteBuf buf){
+    this.recordCount = valueCount;
+    int loaded = offsetVector.load(valueCount+1, buf);
+    data = buf.slice(loaded, dataBytes);
+    data.retain();
+    return loaded + dataBytes;
+  }
+  
+  @Override
+  public void load(FieldMetadata metadata, ByteBuf buffer) {
+    assert this.field.getDef().equals(metadata.getDef());
+    int loaded = load(metadata.getVarByteLength(), metadata.getValueCount(), buffer);
+    assert metadata.getBufferLength() == loaded;
+  }
+  
+  @Override
+  public void clear() {
+    super.clear();
+    offsetVector.clear();
   }
 
   @Override
   public ByteBuf[] getBuffers() {
-    return new ByteBuf[]{lengthVector.data, data};
+    return new ByteBuf[]{offsetVector.data, this.data};
   }
-
-  public Object getObject(int index) {
-    return get(index);
+  
+  public void allocateNew(int totalBytes, int valueCount) {
+    clear();
+    assert totalBytes >= 0;
+    data = allocator.buffer(totalBytes);
+    data.retain();
+    data.readerIndex(0);
+    offsetVector.allocateNew(valueCount+1);
   }
 
+  public Accessor getAccessor(){
+    return accessor;
+  }
+  
   public Mutator getMutator() {
-    return new Mutator();
+    return mutator;
   }
   
+  public final class Accessor extends BaseValueVector.BaseAccessor{
+    
+    public byte[] get(int index) {
+      assert index >= 0;
+      int startIdx = offsetVector.getAccessor().get(index);
+      int length = offsetVector.getAccessor().get(index + 1) - startIdx;
+      assert length >= 0;
+      byte[] dst = new byte[length];
+      data.getBytes(startIdx, dst, 0, length);
+      return dst;
+    }
+    
+    public void get(int index, ByteHolder holder){
+      assert index >= 0;
+      holder.start = offsetVector.getAccessor().get(index);
+      holder.length = offsetVector.getAccessor().get(index + 1) - holder.start;
+      assert holder.length >= 0;
+      holder.buffer = offsetVector.data;
+    }
+    
+    public Object getObject(int index) {
+      return get(index);
+    }
+    
+    public int getRecordCount() {
+      return recordCount;
+    }
+  }
   
   /**
    * Mutable${minor.class} implements a vector of variable width values.  Elements in the vector
-   * are accessed by position from the logical start of the vector.  A fixed width lengthVector
+   * are accessed by position from the logical start of the vector.  A fixed width offsetVector
    * is used to convert an element's position to it's offset from the start of the (0-based)
    * ByteBuf.  Size is inferred by adjacent elements.
    *   The width of each element is ${type.width} byte(s)
@@ -112,7 +158,7 @@ public final class ${minor.class}Vector extends ValueVector {
    *
    * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
    */
-  public class Mutator implements ValueVector.Mutator{
+  public final class Mutator extends BaseValueVector.BaseMutator{
 
     /**
      * Set the variable length element at the specified index to the supplied byte array.
@@ -121,23 +167,24 @@ public final class ${minor.class}Vector extends ValueVector {
      * @param bytes   array of bytes to write
      */
     public void set(int index, byte[] bytes) {
-      checkArgument(index >= 0);
-      if (index == 0) {
-        lengthVectorMutator.set(0, 0);
-        lengthVectorMutator.set(1, bytes.length);
-        data.setBytes(0, bytes);
-      } else {
-        int currentOffset = lengthVector.get(index);
-        // set the end offset of the buffer
-        lengthVectorMutator.set(index + 1, currentOffset + bytes.length);
-        data.setBytes(currentOffset, bytes);
-      }
+      assert index >= 0;
+      int currentOffset = offsetVector.getAccessor().get(index);
+      offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
+      data.setBytes(currentOffset, bytes);
     }
 
-    @Override
-    public void setRecordCount(int recordCount) {
-      ${minor.class}Vector.this.setRecordCount(recordCount);
-      lengthVector.setRecordCount(recordCount);
+    public void set(int index, int start, int length, ByteBuf buffer){
+      assert index >= 0;
+      int currentOffset = offsetVector.getAccessor().get(index);
+      offsetVector.getMutator().set(index + 1, currentOffset + length);
+      ByteBuf bb = buffer.slice(start, length);
+      data.setBytes(currentOffset, bb);
+    }
+
+    public void setValueCount(int recordCount) {
+      ${minor.class}Vector.this.recordCount = recordCount;
+      data.writerIndex(recordCount * ${type.width});
+      offsetVector.getMutator().setValueCount(recordCount+1);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
index cd3371d..0f4619c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -28,8 +28,11 @@ import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
 import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NonRepeatedMutator;
 import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
 
 public class MockRecordReader implements RecordReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
@@ -39,6 +42,7 @@ public class MockRecordReader implements RecordReader {
   private FragmentContext context;
   private ValueVector[] valueVectors;
   private int recordsRead;
+  private int batchRecordCount;
 
   public MockRecordReader(FragmentContext context, MockScanEntry config) {
     this.context = context;
@@ -60,7 +64,14 @@ public class MockRecordReader implements RecordReader {
     MaterializedField f = MaterializedField.create(new SchemaPath(name), fieldId, 0, type);
     ValueVector v;
     v = TypeHelper.getNewVector(f, context.getAllocator());
-    v.allocateNew(length);
+    if(v instanceof FixedWidthVector){
+      ((FixedWidthVector)v).allocateNew(length);  
+    }else if(v instanceof VariableWidthVector){
+      ((VariableWidthVector)v).allocateNew(50*length, length);
+    }else{
+      throw new UnsupportedOperationException(String.format("Unable to get allocate vector %s", v.getClass().getName()));
+    }
+    
     return v;
 
   }
@@ -71,7 +82,7 @@ public class MockRecordReader implements RecordReader {
       this.output = output;
       int estimateRowSize = getEstimatedRecordSize(config.getTypes());
       valueVectors = new ValueVector[config.getTypes().length];
-      int batchRecordCount = 250000 / estimateRowSize;
+      batchRecordCount = 250000 / estimateRowSize;
 
       for (int i = 0; i < config.getTypes().length; i++) {
         valueVectors[i] = getVector(i, config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
@@ -86,13 +97,29 @@ public class MockRecordReader implements RecordReader {
 
   @Override
   public int next() {
-    int recordSetSize = Math.min(valueVectors[0].capacity(), this.config.getRecords()- recordsRead);
+    
+    int recordSetSize = Math.min(batchRecordCount, this.config.getRecords()- recordsRead);
+
     recordsRead += recordSetSize;
     for(ValueVector v : valueVectors){
+      if(v instanceof FixedWidthVector){
+        ((FixedWidthVector)v).allocateNew(recordSetSize);
+      }else if(v instanceof VariableWidthVector){
+        ((VariableWidthVector)v).allocateNew(50*recordSetSize, recordSetSize);
+      }else{
+        throw new UnsupportedOperationException();
+      }
+      
       logger.debug("MockRecordReader:  Generating random data for VV of type " + v.getClass().getName());
-      v.randomizeData();
+      ValueVector.Mutator m = v.getMutator();
+      m.randomizeData();
+      
+      if(m instanceof NonRepeatedMutator){
+        ((NonRepeatedMutator)m).setValueCount(recordSetSize);  
+      }else{
+        throw new UnsupportedOperationException();
+      }
       
-      v.getMutator().setRecordCount(recordSetSize);
     }
     return recordSetSize;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index d1858f1..05fb576 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -105,6 +105,22 @@ public class MaterializedField implements Comparable<MaterializedField> {
   public DataMode getDataMode() {
     return def.getMajorType().getMode();
   }
+  
+  public MaterializedField getOtherNullableVersion(){
+    MajorType mt = def.getMajorType();
+    DataMode newDataMode = null;
+    switch(mt.getMode()){
+    case OPTIONAL:
+      newDataMode = DataMode.REQUIRED;
+      break;
+    case REQUIRED:
+      newDataMode = DataMode.OPTIONAL;
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
+    return new MaterializedField(def.toBuilder().setMajorType(mt.toBuilder().setMode(newDataMode).build()).build());
+  }
 
   public boolean matches(SchemaPath path) {
     Iterator<NamePart> iter = def.getNameList().iterator();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index be43026..a2dbd81 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -52,7 +52,7 @@ public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector>>
    * @param def
    *          The definition for the record batch.
    * @param buf
-   *          The buffer that holds the data ssociated with the record batch
+   *          The buffer that holds the data associated with the record batch
    * @return Whether or not the schema changed since the previous load.
    * @throws SchemaChangeException 
    */
@@ -71,7 +71,8 @@ public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector>>
       ValueVector v = vectors.remove(fieldDef.getFieldId());
       if (v != null) {
         if (v.getField().getDef().equals(fieldDef)) {
-          v.allocateNew(fmd.getBufferLength(), buf.slice(bufOffset, fmd.getBufferLength()), recordCount);
+          ValueVector.Mutator m = v.getMutator();
+          v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
           newVectors.put(fieldDef.getFieldId(), v);
           continue;
         } else {
@@ -83,7 +84,7 @@ public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector>>
       schemaChanged = true;
       MaterializedField m = new MaterializedField(fieldDef);
       v = TypeHelper.getNewVector(m, allocator);
-      v.allocateNew(fmd.getBufferLength(), buf.slice(bufOffset, fmd.getBufferLength()), recordCount);
+      v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
       newVectors.put(fieldDef.getFieldId(), v);
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index a367b6d..4b97768 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -93,8 +93,7 @@ public class WritableBatch {
         buffers.add(b);
         b.retain();
       }
-      // allocate new buffer to release hold on old buffer.
-      value.allocateNew(value.capacity());
+      value.clear();
     }
 
     public WritableBatch get(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index e637518..07ae20a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -422,9 +422,11 @@ public class JSONRecordReader implements RecordReader {
             SchemaDefProtos.MajorType type = field.getFieldType();
             int fieldId = field.getFieldId();
             MaterializedField f = MaterializedField.create(new SchemaPath(field.getFieldName()), fieldId, parentFieldId, type);
+            
             ValueVector v = TypeHelper.getNewVector(f, allocator);
-            v.allocateNew(batchSize);
             VectorHolder holder = new VectorHolder(batchSize, v);
+            holder.allocateNew(batchSize);
+            
             valueVectorMap.put(fieldId, holder);
             outputMutator.addField(fieldId, v);
             return holder;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index fa0cbd5..d594b9e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -18,16 +18,20 @@
 
 package org.apache.drill.exec.store;
 
+import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
 
 public class VectorHolder {
     private int length;
     private ValueVector vector;
+    private ValueVector.Mutator mutator;
     private int currentLength;
 
     VectorHolder(int length, ValueVector vector) {
         this.length = length;
         this.vector = vector;
+        this.mutator = vector.getMutator();
     }
 
     public ValueVector getValueVector() {
@@ -51,6 +55,21 @@ public class VectorHolder {
 
     public void reset() {
         currentLength = 0;
-        vector.allocateNew(length);
+        allocateNew(length);
+        
+    }
+    
+    public void allocateNew(int valueLength){
+      if(vector instanceof FixedWidthVector){
+        ((FixedWidthVector)vector).allocateNew(valueLength);  
+      }else if(vector instanceof VariableWidthVector){
+        ((VariableWidthVector)vector).allocateNew(valueLength * 10, valueLength);  
+      }else{
+        throw new UnsupportedOperationException();
+      }
+    }
+    
+    public ValueVector.Mutator getMutator(){
+      return mutator;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
new file mode 100644
index 0000000..dd2b504
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -0,0 +1,47 @@
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.record.DeadBuf;
+import org.apache.drill.exec.record.MaterializedField;
+
+abstract class BaseDataValueVector extends BaseValueVector{
+
+  protected ByteBuf data = DeadBuf.DEAD_BUFFER;
+  protected int recordCount;
+  
+  public BaseDataValueVector(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+    
+  }
+
+  /**
+   * Release the underlying ByteBuf and reset the ValueVector
+   */
+  @Override
+  public void clear() {
+    if (data != DeadBuf.DEAD_BUFFER) {
+      data.release();
+      data = DeadBuf.DEAD_BUFFER;
+      recordCount = 0;
+    }
+  }
+  
+  @Override
+  public ByteBuf[] getBuffers(){
+    return new ByteBuf[]{data};
+  }
+  
+  public int getBufferSize() {
+    return data.writerIndex();
+  }
+
+  @Override
+  public FieldMetadata getMetadata() {
+    return null;
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
new file mode 100644
index 0000000..a8678f5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -0,0 +1,38 @@
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+abstract class BaseValueVector implements ValueVector{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
+  
+  protected final BufferAllocator allocator;
+  protected final MaterializedField field;
+
+  BaseValueVector(MaterializedField field, BufferAllocator allocator) {
+    this.allocator = allocator;
+    this.field = field;
+  }
+  
+  @Override
+  public void close() {
+    clear();
+  }
+  
+  @Override
+  public MaterializedField getField() {
+    return field;
+  }
+  
+  abstract class BaseAccessor implements ValueVector.Accessor{
+
+    
+    public void reset(){}
+  }
+  
+  abstract class BaseMutator implements NonRepeatedMutator{
+    public void reset(){}
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index d18a29d..9d247f5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -1,8 +1,11 @@
 package org.apache.drill.exec.vector;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.Random;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
 /**
@@ -13,64 +16,93 @@ import org.apache.drill.exec.record.MaterializedField;
  *
  * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
  */
-public final class BitVector extends ValueVector {
+public final class BitVector extends BaseDataValueVector implements FixedWidthVector{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitVector.class);
 
+  private int valueCapacity;
+  
   public BitVector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
   }
 
+  private int getSizeFromCount(int valueCount) {
+    return (int) Math.ceil(valueCount / 8);
+  }
+  
   /**
-   * Get the byte holding the desired bit, then mask all other bits.  Iff the result is 0, the
-   * bit was not set.
+   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
    *
-   * @param  index   position of the bit in the vector
-   * @return 1 if set, otherwise 0
+   * @param valueCount  The number of values which can be contained within this vector.
    */
-  public int get(int index) {
-    // logger.debug("BIT GET: index: {}, byte: {}, mask: {}, masked byte: {}",
-    //             index,
-    //             data.getByte((int)Math.floor(index/8)),
-    //             (int)Math.pow(2, (index % 8)),
-    //             data.getByte((int)Math.floor(index/8)) & (int)Math.pow(2, (index % 8)));
-    return ((data.getByte((int)Math.floor(index/8)) & (int)Math.pow(2, (index % 8))) == 0) ? 0 : 1;
+  public void allocateNew(int valueCount) {
+    clear();
+    valueCapacity = valueCount;
+    int valueSize = getSizeFromCount(valueCount);
+    data = allocator.buffer(valueSize);
+    for (int i = 0; i < getSizeFromCount(valueCount); i++) {
+      data.setByte(i, 0);
+    }
   }
-
+  
   @Override
-  public Object getObject(int index) {
-    return new Boolean(get(index) != 0);
+  public int load(int valueCount, ByteBuf buf){
+    clear();
+    this.recordCount = valueCount;
+    int len = getSizeFromCount(valueCount);
+    data = buf.slice(0, len);
+    data.retain();
+    return len;
   }
-
-  /**
-   * Get the size requirement (in bytes) for the given number of values.
-   */
+  
   @Override
-  public int getSizeFromCount(int valueCount) {
-    return (int) Math.ceil(valueCount / 8);
+  public void load(FieldMetadata metadata, ByteBuf buffer) {
+    assert this.field.getDef().equals(metadata.getDef());
+    int loaded = load(metadata.getValueCount(), buffer);
+    assert metadata.getBufferLength() == loaded;
   }
-
+  
   @Override
-  public int getAllocatedSize() {
-    return totalBytes;
+  public int getValueCapacity() {
+    return valueCapacity;
   }
 
   public Mutator getMutator() {
     return new Mutator();
   }
 
-  /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
-   * @param valueCount  The number of values which can be contained within this vector.
-   */
-  @Override
-  public void allocateNew(int valueCount) {
-    allocateNew(getSizeFromCount(valueCount), null, valueCount);
-    for (int i = 0; i < getSizeFromCount(valueCount); i++) {
-      data.setByte(i, 0);
-    }
+  public Accessor getAccessor(){
+    return new Accessor();
   }
+  
+  
+  public class Accessor extends BaseAccessor{
 
+    /**
+     * Get the byte holding the desired bit, then mask all other bits.  Iff the result is 0, the
+     * bit was not set.
+     *
+     * @param  index   position of the bit in the vector
+     * @return 1 if set, otherwise 0
+     */
+    public int get(int index) {
+      // logger.debug("BIT GET: index: {}, byte: {}, mask: {}, masked byte: {}",
+      //             index,
+      //             data.getByte((int)Math.floor(index/8)),
+      //             (int)Math.pow(2, (index % 8)),
+      //             data.getByte((int)Math.floor(index/8)) & (int)Math.pow(2, (index % 8)));
+      return ((data.getByte((int)Math.floor(index/8)) & (int)Math.pow(2, (index % 8))) == 0) ? 0 : 1;
+    }
+    
+    @Override
+    public Object getObject(int index) {
+      return new Boolean(get(index) != 0);
+    }
+    
+    public int getRecordCount() {
+      return recordCount;
+    }
+    
+  }
   
   /**
    * MutableBit implements a vector of bit-width values.  Elements in the vector are accessed
@@ -79,7 +111,7 @@ public final class BitVector extends ValueVector {
    *
    * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
    */
-  public class Mutator implements ValueVector.Mutator{
+  public class Mutator extends BaseMutator{
 
     private Mutator(){}
     
@@ -102,10 +134,9 @@ public final class BitVector extends ValueVector {
       data.setByte((int) Math.floor(index/8), currentByte);
     }
 
-    
-    @Override
-    public void setRecordCount(int recordCount) {
-      BitVector.this.setRecordCount(recordCount);
+    public void setValueCount(int recordCount) {
+      BitVector.this.recordCount = recordCount;
+      data.writerIndex(getSizeFromCount(recordCount));
     }
 
     @Override
@@ -119,5 +150,6 @@ public final class BitVector extends ValueVector {
         }
       }
     }
+
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ByteHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ByteHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ByteHolder.java
new file mode 100644
index 0000000..45d8019
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ByteHolder.java
@@ -0,0 +1,12 @@
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.ByteBuf;
+
+public class ByteHolder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ByteHolder.class);
+  
+  public ByteBuf buffer;
+  public int start;
+  public int length;
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
new file mode 100644
index 0000000..0e3e3e9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
@@ -0,0 +1,23 @@
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.ByteBuf;
+
+public interface FixedWidthVector extends ValueVector{
+  
+  /**
+   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
+   *
+   * @param totalBytes   Desired size of the underlying data buffer.
+   * @param valueCount   Number of values in the vector.
+   */
+  public void allocateNew(int valueCount);
+  
+  /**
+   * Load the records in the provided buffer based on the given number of values.
+   * @param valueCount Number of values the buffer contains.
+   * @param buf Incoming buffer.
+   * @return The number of bytes of the buffer that were consumed.
+   */
+  public int load(int valueCount, ByteBuf buf);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NonRepeatedMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NonRepeatedMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NonRepeatedMutator.java
new file mode 100644
index 0000000..e9bdcbd
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NonRepeatedMutator.java
@@ -0,0 +1,7 @@
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.exec.vector.ValueVector.Mutator;
+
+public interface NonRepeatedMutator extends Mutator{
+  public void setValueCount(int recordCount);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
new file mode 100644
index 0000000..35261d7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
@@ -0,0 +1,22 @@
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.ByteBuf;
+
+public interface RepeatedFixedWidthVector extends ValueVector{
+  /**
+   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
+   *
+   * @param parentValueCount   Number of separate repeating groupings.
+   * @param childValueCount   Number of supported values in the vector.
+   */
+  public void allocateNew(int parentValueCount, int childValueCount);
+  
+  /**
+   * Load the records in the provided buffer based on the given number of values.
+   * @param parentValueCount   Number of separate repeating groupings.
+   * @param valueCount Number atomic values the buffer contains.
+   * @param buf Incoming buffer.
+   * @return The number of bytes of the buffer that were consumed.
+   */
+  public int load(int parentValueCount, int childValueCount, ByteBuf buf);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
new file mode 100644
index 0000000..4f22481
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
@@ -0,0 +1,24 @@
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.ByteBuf;
+
+public interface RepeatedVariableWidthVector extends ValueVector{
+  /**
+   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
+   *
+   * @param totalBytes   Desired size of the underlying data buffer.
+   * @param parentValueCount   Number of separate repeating groupings.
+   * @param childValueCount   Number of supported values in the vector.
+   */
+  public void allocateNew(int totalBytes, int parentValueCount, int childValueCount);
+  
+  /**
+   * Load the records in the provided buffer based on the given number of values.
+   * @param dataBytes   The number of bytes associated with the data array.
+   * @param parentValueCount   Number of separate repeating groupings.
+   * @param childValueCount   Number of supported values in the vector.
+   * @param buf Incoming buffer.
+   * @return The number of bytes of the buffer that were consumed.
+   */
+  public int load(int dataBytes, int parentValueCount, int childValueCount, ByteBuf buf);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 718478e..328182b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -21,9 +21,8 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
-import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
 
 /**
@@ -31,42 +30,25 @@ import org.apache.drill.exec.record.MaterializedField;
  * value vectors.  The template approach was chosen due to the lack of multiple inheritence.  It
  * is also important that all related logic be as efficient as possible.
  */
-public abstract class ValueVector implements Closeable {
+public interface ValueVector extends Closeable {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVector.class);
+//  /**
+//   * Get the explicitly specified size of the allocated buffer, if available.  Otherwise
+//   * calculate the size based on width and record count.
+//   */
+//  public abstract int getAllocatedSize();
 
-  protected final BufferAllocator allocator;
-  protected ByteBuf data = DeadBuf.DEAD_BUFFER;
-  protected MaterializedField field;
-  protected int recordCount;
-  protected int totalBytes;
-
-  ValueVector(MaterializedField field, BufferAllocator allocator) {
-    this.allocator = allocator;
-    this.field = field;
-  }
 
   /**
-   * Get the explicitly specified size of the allocated buffer, if available.  Otherwise
-   * calculate the size based on width and record count.
+   * Alternative to clear().  Allows use as closeable in try-with-resources.
    */
-  public abstract int getAllocatedSize();
-
+  public void close();
+  
   /**
-   * Get the size requirement (in bytes) for the given number of values.  Takes derived
-   * type specs into account.
+   * Release the underlying ByteBuf and reset the ValueVector to empty.
    */
-  public abstract int getSizeFromCount(int valueCount);
-
-  /**
-   * Get the Java Object representation of the element at the specified position
-   *
-   * @param index   Index of the value to get
-   */
-  public abstract Object getObject(int index);
-
+  public void clear();
   
-  public abstract Mutator getMutator();
   
   /**
    * Return the underlying buffers associated with this vector. Note that this doesn't impact the
@@ -76,117 +58,72 @@ public abstract class ValueVector implements Closeable {
    *
    * @return The underlying ByteBuf.
    */
-  public ByteBuf[] getBuffers() {
-    return new ByteBuf[]{data};
-  }
-
-  /**
-   * Returns the maximum number of values contained within this vector.
-   * @return Vector size
-   */
-  public int capacity() {
-    return getRecordCount();
-  }
-
+  public abstract ByteBuf[] getBuffers();
+  
   /**
-   * Release supporting resources.
+   * Load the data provided in the buffer.  Typically used when deserializing from the wire.
+   * @param metadata Metadata used to decode the incoming buffer.
+   * @param buffer The buffer that contains the ValueVector.
    */
-  @Override
-  public void close() {
-    clear();
-  }
+  public void load(FieldMetadata metadata, ByteBuf buffer);
 
-  /**
-   * Get information about how this field is materialized.
-   * @return
-   */
-  public MaterializedField getField() {
-    return field;
-  }
 
   /**
-   * Get the number of records allocated for this value vector.
-   * @return number of allocated records
+   * Given the current buffer allocation, return the maximum number of values that this buffer can contain.
+   * @return Maximum values buffer can contain.  In the case of a Repeated field, this is the number of atoms, not repeated groups.
    */
-  public int getRecordCount() {
-    return recordCount;
-  }
-
+  public int getValueCapacity();
+  
   /**
-   * Get the metadata for this field.
+   * Get information about how this field is materialized.
    * @return
    */
-  public FieldMetadata getMetadata() {
-    int len = 0;
-    for(ByteBuf b : getBuffers()){
-      len += b.writerIndex();
-    }
-    return FieldMetadata.newBuilder()
-             .setDef(getField().getDef())
-             .setValueCount(getRecordCount())
-             .setBufferLength(len)
-             .build();
-  }
-
+  public MaterializedField getField();
+  
   /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
-   * @param totalBytes   Optional desired size of the underlying buffer.  Specifying 0 will
-   *                     estimate the size based on valueCount.
-   * @param sourceBuffer Optional ByteBuf to use for storage (null will allocate automatically).
-   * @param valueCount   Number of values in the vector.
+   * Get the metadata for this field.  Used in serialization
+   * @return FieldMetadata for this field.
    */
-  public void allocateNew(int totalBytes, ByteBuf sourceBuffer, int valueCount) {
-    clear();
-    this.recordCount = valueCount;
-    this.totalBytes = totalBytes > 0 ? totalBytes : getSizeFromCount(valueCount);
-    this.data = (sourceBuffer != null) ? sourceBuffer : allocator.buffer(this.totalBytes);
-    this.data.retain();
-    data.readerIndex(0);
-  }
-
+  public FieldMetadata getMetadata();
+  
   /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
-   * @param valueCount
-   *          The number of elements which can be contained within this vector.
+   * Get Accessor to read value vector data.
+   * @return 
    */
-  public void allocateNew(int valueCount) {
-    allocateNew(0, null, valueCount);
-  }
-
+  public abstract Accessor getAccessor();
+  
   /**
-   * Release the underlying ByteBuf and reset the ValueVector
+   * Get a Mutator to update this vectors data.
+   * @return
    */
-  protected void clear() {
-    if (data != DeadBuf.DEAD_BUFFER) {
-      data.release();
-      data = DeadBuf.DEAD_BUFFER;
-      recordCount = 0;
-      totalBytes = 0;
-    }
-  }
+  public abstract Mutator getMutator();
 
-  //public abstract <T extends Mutator> T getMutator();
   
-  /**
-   * Define the number of records that are in this value vector.
-   * @param recordCount Number of records active in this vector.
-   */
-  void setRecordCount(int recordCount) {
-    data.writerIndex(getSizeFromCount(recordCount));
-    this.recordCount = recordCount;
+  public interface Accessor{
+
+//    /**
+//     * Get the number of records allocated for this value vector.
+//     * @return number of allocated records
+//     */
+//    public int getRecordCount();
+
+    /**
+     * Get the Java Object representation of the element at the specified position.  Useful for testing.
+     *
+     * @param index   Index of the value to get
+     */
+    public abstract Object getObject(int index);
+    
+    public void reset();
   }
-
-  /**
-   * For testing only -- randomize the buffer contents
-   */
-  public void randomizeData() { }
-
   
-  public static interface Mutator{
+  
+    
+  
+  
+  public interface Mutator{
+    public void reset();
     public void randomizeData();
-    public void setRecordCount(int recordCount);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
new file mode 100644
index 0000000..c26cbab
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
@@ -0,0 +1,29 @@
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.ByteBuf;
+
+public interface VariableWidthVector extends ValueVector{
+
+  /**
+   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
+   *
+   * @param totalBytes   Desired size of the underlying data buffer.
+   * @param valueCount   Number of values in the vector.
+   */
+  public void allocateNew(int totalBytes, int valueCount);
+  
+  /**
+   * Provide the maximum amount of variable width bytes that can be stored int his vector.
+   * @return
+   */
+  public int getByteCapacity();
+  
+  /**
+   * Load the records in the provided buffer based on the given number of values.
+   * @param dataBytes   The number of bytes associated with the data array.
+   * @param valueCount Number of values the buffer contains.
+   * @param buf Incoming buffer.
+   * @return The number of bytes of the buffer that were consumed.
+   */
+  public int load(int dataBytes, int valueCount, ByteBuf buf);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index ba103ed..a90382a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -230,8 +230,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
   }
 
   public QueryResult getResult(UserClientConnection connection, RequestResults req) {
-
-    return null;
+    throw new UnsupportedOperationException();
   }
 
 
@@ -254,7 +253,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     }
     
     void cleanupAndSendResult(QueryResult result){
-      ForemanManagerListener.this.cleanupAndSendResult(result);
+      Foreman.this.cleanupAndSendResult(result);
     }
     
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index 3fe0622..cac6aa2 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -86,7 +86,7 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
           } else {
             System.out.print("\t");
           }
-          System.out.print(v.value.getObject(i));
+          System.out.print(v.value.getAccessor().getObject(i));
         }
         if(!first) System.out.println();
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index ae4f644..6a1f3ad 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -1,7 +1,6 @@
 package org.apache.drill.exec.record.vector;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 import java.nio.charset.Charset;
 
@@ -46,14 +45,14 @@ public class TestValueVector {
     m.set(100, 102);
     m.set(1022, 103);
     m.set(1023, 104);
-    assertEquals(100, v.get(0));
-    assertEquals(101, v.get(1));
-    assertEquals(102, v.get(100));
-    assertEquals(103, v.get(1022));
-    assertEquals(104, v.get(1023));
+    assertEquals(100, v.getAccessor().get(0));
+    assertEquals(101, v.getAccessor().get(1));
+    assertEquals(102, v.getAccessor().get(100));
+    assertEquals(103, v.getAccessor().get(1022));
+    assertEquals(104, v.getAccessor().get(1023));
 
     // Ensure unallocated space returns 0
-    assertEquals(0, v.get(3));
+    assertEquals(0, v.getAccessor().get(3));
   }
 
   @Test
@@ -74,7 +73,7 @@ public class TestValueVector {
     // Create a new value vector for 1024 integers
     NullableVarChar2Vector v = new NullableVarChar2Vector(field, allocator);
     NullableVarChar2Vector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
+    v.allocateNew(1024*10, 1024);
 
     // Create and set 3 sample strings
     String str1 = new String("AAAAA1");
@@ -85,14 +84,14 @@ public class TestValueVector {
     m.set(2, str3.getBytes(Charset.forName("UTF-8")));
 
     // Check the sample strings
-    assertEquals(str1, new String(v.get(0), Charset.forName("UTF-8")));
-    assertEquals(str2, new String(v.get(1), Charset.forName("UTF-8")));
-    assertEquals(str3, new String(v.get(2), Charset.forName("UTF-8")));
+    assertEquals(str1, new String(v.getAccessor().get(0), Charset.forName("UTF-8")));
+    assertEquals(str2, new String(v.getAccessor().get(1), Charset.forName("UTF-8")));
+    assertEquals(str3, new String(v.getAccessor().get(2), Charset.forName("UTF-8")));
 
     // Ensure null value throws
     boolean b = false;
     try {
-      v.get(3);
+      v.getAccessor().get(3);
     } catch(AssertionError e) { 
       b = true;
     }finally{
@@ -130,17 +129,17 @@ public class TestValueVector {
     m.set(100, 102);
     m.set(1022, 103);
     m.set(1023, 104);
-    assertEquals(100, v.get(0));
-    assertEquals(101, v.get(1));
-    assertEquals(102, v.get(100));
-    assertEquals(103, v.get(1022));
-    assertEquals(104, v.get(1023));
+    assertEquals(100, v.getAccessor().get(0));
+    assertEquals(101, v.getAccessor().get(1));
+    assertEquals(102, v.getAccessor().get(100));
+    assertEquals(103, v.getAccessor().get(1022));
+    assertEquals(104, v.getAccessor().get(1023));
 
     // Ensure null values throw
     {
       boolean b = false;
       try {
-        v.get(3);
+        v.getAccessor().get(3);
       } catch(AssertionError e) { 
         b = true;
       }finally{
@@ -155,7 +154,7 @@ public class TestValueVector {
     {
       boolean b = false;
       try {
-        v.get(0);
+        v.getAccessor().get(0);
       } catch(AssertionError e) { 
         b = true;
       }finally{
@@ -170,18 +169,18 @@ public class TestValueVector {
     m.set(100, 102);
     m.set(1022, 103);
     m.set(1023, 104);
-    assertEquals(100, v.get(0));
-    assertEquals(101, v.get(1));
-    assertEquals(102, v.get(100));
-    assertEquals(103, v.get(1022));
-    assertEquals(104, v.get(1023));
+    assertEquals(100, v.getAccessor().get(0));
+    assertEquals(101, v.getAccessor().get(1));
+    assertEquals(102, v.getAccessor().get(100));
+    assertEquals(103, v.getAccessor().get(1022));
+    assertEquals(104, v.getAccessor().get(1023));
 
     // Ensure null values throw
     
     {
       boolean b = false;
       try {
-        v.get(3);
+        v.getAccessor().get(3);
       } catch(AssertionError e) { 
         b = true;
       }finally{
@@ -219,17 +218,17 @@ public class TestValueVector {
     m.set(100, 102.3f);
     m.set(1022, 103.4f);
     m.set(1023, 104.5f);
-    assertEquals(100.1f, v.get(0), 0);
-    assertEquals(101.2f, v.get(1), 0);
-    assertEquals(102.3f, v.get(100), 0);
-    assertEquals(103.4f, v.get(1022), 0);
-    assertEquals(104.5f, v.get(1023), 0);
+    assertEquals(100.1f, v.getAccessor().get(0), 0);
+    assertEquals(101.2f, v.getAccessor().get(1), 0);
+    assertEquals(102.3f, v.getAccessor().get(100), 0);
+    assertEquals(103.4f, v.getAccessor().get(1022), 0);
+    assertEquals(104.5f, v.getAccessor().get(1023), 0);
 
     // Ensure null values throw
     {
       boolean b = false;
       try {
-        v.get(3);
+        v.getAccessor().get(3);
       } catch(AssertionError e) { 
         b = true;
       }finally{
@@ -243,7 +242,7 @@ public class TestValueVector {
     {
       boolean b = false;
       try {
-        v.get(0);
+        v.getAccessor().get(0);
       } catch(AssertionError e) { 
         b = true;
       }finally{
@@ -279,27 +278,27 @@ public class TestValueVector {
     m.set(1, 0);
     m.set(100, 0);
     m.set(1022, 1);
-    assertEquals(1, v.get(0));
-    assertEquals(0, v.get(1));
-    assertEquals(0, v.get(100));
-    assertEquals(1, v.get(1022));
+    assertEquals(1, v.getAccessor().get(0));
+    assertEquals(0, v.getAccessor().get(1));
+    assertEquals(0, v.getAccessor().get(100));
+    assertEquals(1, v.getAccessor().get(1022));
 
     // test setting the same value twice
     m.set(0, 1);
     m.set(0, 1);
     m.set(1, 0);
     m.set(1, 0);
-    assertEquals(1, v.get(0));
-    assertEquals(0, v.get(1));
+    assertEquals(1, v.getAccessor().get(0));
+    assertEquals(0, v.getAccessor().get(1));
 
     // test toggling the values
     m.set(0, 0);
     m.set(1, 1);
-    assertEquals(0, v.get(0));
-    assertEquals(1, v.get(1));
+    assertEquals(0, v.getAccessor().get(0));
+    assertEquals(1, v.getAccessor().get(1));
 
     // Ensure unallocated space returns 0
-    assertEquals(0, v.get(3));
+    assertEquals(0, v.getAccessor().get(3));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9ca9eb9b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 117414c..4b35313 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -73,7 +73,7 @@ public class JSONRecordReaderTest {
             return;
         }
 
-        T val = (T) valueVector.getObject(index);
+        T val = (T) valueVector.getAccessor().getObject(index);
         if (val instanceof byte[]) {
             assertTrue(Arrays.equals((byte[]) value, (byte[]) val));
         } else {