You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by hg...@apache.org on 2015/05/11 10:18:02 UTC

[2/2] drill git commit: DRILL-2150: Create an abstraction for repeated value vectors.

DRILL-2150: Create an abstraction for repeated value vectors.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4689468e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4689468e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4689468e

Branch: refs/heads/master
Commit: 4689468ef11a70c782f64af451807e1e10cdce65
Parents: a3ec52a
Author: Hanifi Gunes <hg...@maprtech.com>
Authored: Wed Apr 29 14:54:24 2015 -0700
Committer: Hanifi Gunes <hg...@maprtech.com>
Committed: Mon May 11 01:20:26 2015 -0700

----------------------------------------------------------------------
 .../main/codegen/templates/ComplexReaders.java  |   2 +-
 .../main/codegen/templates/ComplexWriters.java  |  16 +-
 .../codegen/templates/FixedValueVectors.java    |   1 +
 .../src/main/codegen/templates/ListWriters.java |   6 +-
 .../src/main/codegen/templates/MapWriters.java  |   2 +-
 .../codegen/templates/RepeatedValueVectors.java | 240 ++-------
 .../exception/SchemaChangeRuntimeException.java |  42 ++
 .../impl/flatten/FlattenRecordBatch.java        |  25 +-
 .../physical/impl/flatten/FlattenTemplate.java  |  25 +-
 .../exec/physical/impl/flatten/Flattener.java   |   6 +-
 .../impl/project/ProjectRecordBatch.java        |  12 +-
 .../apache/drill/exec/store/VectorHolder.java   |  10 +-
 .../text/compliant/RepeatedVarCharOutput.java   |   8 +-
 .../columnreaders/FixedWidthRepeatedReader.java |  22 +-
 .../columnreaders/ParquetRecordReader.java      |  15 +-
 .../exec/store/text/DrillTextRecordReader.java  |   2 +-
 .../drill/exec/vector/AddOrGetResult.java       |  38 ++
 .../drill/exec/vector/AllocationHelper.java     |   8 +-
 .../drill/exec/vector/BaseDataValueVector.java  |   5 +-
 .../exec/vector/BaseRepeatedValueVector.java    | 206 ++++++++
 .../drill/exec/vector/BaseValueVector.java      |  21 +-
 .../drill/exec/vector/ContainerVectorLike.java  |  39 ++
 .../exec/vector/RepeatedFixedWidthVector.java   |  53 --
 .../vector/RepeatedFixedWidthVectorLike.java    |  56 ++
 .../drill/exec/vector/RepeatedValueVector.java  |  85 +++
 .../vector/RepeatedVariableWidthVector.java     |  47 --
 .../vector/RepeatedVariableWidthVectorLike.java |  47 ++
 .../drill/exec/vector/RepeatedVector.java       |  25 -
 .../drill/exec/vector/SchemaChangeCallBack.java |   6 +-
 .../apache/drill/exec/vector/ValueVector.java   |   7 +-
 .../drill/exec/vector/VectorDescriptor.java     |  57 ++
 .../apache/drill/exec/vector/ZeroVector.java    | 170 ++++++
 .../drill/exec/vector/complex/MapVector.java    |   3 +-
 .../exec/vector/complex/RepeatedListVector.java | 525 ++++++++++---------
 .../exec/vector/complex/RepeatedMapVector.java  | 111 ++--
 .../exec/record/vector/TestValueVector.java     |   7 -
 36 files changed, 1246 insertions(+), 704 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
index fa1dac4..068efb4 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
@@ -89,7 +89,7 @@ public class ${nullMode}${name}ReaderImpl extends AbstractFieldReader {
   }
   
   public int size(){
-    return vector.getAccessor().getCount(idx());
+    return vector.getAccessor().getInnerValueCountAt(idx());
   }
   
   public void read(int arrayIndex, ${minor.class?cap_first}Holder h){

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
index 49c75d1..980f9ac 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
@@ -77,24 +77,24 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
 
   public void write(${minor.class?cap_first}Holder h){
     mutator.addSafe(idx(), h);
-    vector.getMutator().setValueCount(idx());
+    vector.getMutator().setValueCount(idx()+1);
   }
   
   public void write(Nullable${minor.class?cap_first}Holder h){
     mutator.addSafe(idx(), h);
-    vector.getMutator().setValueCount(idx());
+    vector.getMutator().setValueCount(idx()+1);
   }
 
   <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
   public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
     mutator.addSafe(idx(), <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
-    vector.getMutator().setValueCount(idx());
+    vector.getMutator().setValueCount(idx()+1);
   }
   </#if>
   
   public void setPosition(int idx){
     super.setPosition(idx);
-    mutator.startNewGroup(idx);
+    mutator.startNewValue(idx);
   }
   
   
@@ -102,24 +102,24 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   
   public void write(${minor.class}Holder h){
     mutator.setSafe(idx(), h);
-    vector.getMutator().setValueCount(idx());
+    vector.getMutator().setValueCount(idx()+1);
   }
   
   public void write(Nullable${minor.class}Holder h){
     mutator.setSafe(idx(), h);
-    vector.getMutator().setValueCount(idx());
+    vector.getMutator().setValueCount(idx()+1);
   }
 
   <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
   public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
     mutator.setSafe(idx(), <#if mode == "Nullable">1, </#if><#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
-    vector.getMutator().setValueCount(idx());
+    vector.getMutator().setValueCount(idx()+1);
   }
 
   <#if mode == "Nullable">
   public void writeNull(){
     mutator.setNull(idx());
-    vector.getMutator().setValueCount(idx());
+    vector.getMutator().setValueCount(idx()+1);
   }
   </#if>
   </#if>

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index a805b8e..7d85810 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -764,6 +764,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
        allocationMonitor = 0;
      }
      VectorTrimmer.trim(data, idx);
+     data.writerIndex(valueCount * ${type.width});
    }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/ListWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java b/exec/java-exec/src/main/codegen/templates/ListWriters.java
index 6df4248..ab78603 100644
--- a/exec/java-exec/src/main/codegen/templates/ListWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java
@@ -46,7 +46,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{
   protected final ${containerClass} container;
   private Mode mode = Mode.INIT;
   private FieldWriter writer;
-  protected RepeatedVector innerVector;
+  protected RepeatedValueVector innerVector;
   
   <#if mode == "Repeated">private int currentChildIndex = 0;</#if>
   public ${mode}ListWriter(String name, ${containerClass} container, FieldWriter parent){
@@ -158,7 +158,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{
   public void start(){
     
     final RepeatedListVector list = (RepeatedListVector) container;
-    final RepeatedListVector.Mutator mutator = list.getMutator();
+    final RepeatedListVector.RepeatedMutator mutator = list.getMutator();
     
     // make sure that the current vector can support the end position of this list.
     if(container.getValueCapacity() <= idx()){
@@ -169,7 +169,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{
     RepeatedListHolder h = new RepeatedListHolder();
     list.getAccessor().get(idx(), h);
     if(h.start >= h.end){
-      mutator.startNewGroup(idx());  
+      mutator.startNewValue(idx());  
     }
     currentChildIndex = container.getMutator().add(idx());
     if(writer != null){

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java
index 38df84b..06a6813 100644
--- a/exec/java-exec/src/main/codegen/templates/MapWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java
@@ -116,7 +116,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
 
     map.getAccessor().get(idx(), h);
     if(h.start >= h.end){
-      container.getMutator().startNewGroup(idx());  
+      container.getMutator().startNewValue(idx());  
     }
     currentChildIndex = container.getMutator().add(idx());
     for(FieldWriter w: fields.values()){

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index c0fba66..37b8fac 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -19,9 +19,9 @@
 import java.lang.Override;
 
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.BaseRepeatedValueVector;
 import org.apache.drill.exec.vector.BaseValueVector;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
-import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
 import org.mortbay.jetty.servlet.Holder;
 
 <@pp.dropOutputFile />
@@ -48,14 +48,11 @@ package org.apache.drill.exec.vector;
  * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
  */
 
-public final class Repeated${minor.class}Vector extends BaseValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector {
+public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>VectorLike {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Repeated${minor.class}Vector.class);
 
-  private int parentValueCount;
-  private int childValueCount;
-
-  private final UInt4Vector offsets;   // offsets to start of each record
-  private final ${minor.class}Vector values;
+  // we maintain local reference to concrete vector type for performance reasons.
+  private ${minor.class}Vector values;
   private final FieldReader reader = new Repeated${minor.class}ReaderImpl(Repeated${minor.class}Vector.this);
   private final Mutator mutator = new Mutator();
   private final Accessor accessor = new Accessor();
@@ -63,56 +60,57 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
   
   public Repeated${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
-    this.offsets = new UInt4Vector(null, allocator);
-    MaterializedField mf = MaterializedField.create(field.getPath(), Types.required(field.getType().getMinorType()));
-    this.values = new ${minor.class}Vector(mf, allocator);
+    addOrGetVector(VectorDescriptor.create(Types.required(field.getType().getMinorType())));
   }
 
   @Override
-  public FieldReader getReader(){
-    return reader;
+  public Mutator getMutator() {
+    return mutator;
   }
 
-  public int getValueCapacity(){
-    return Math.min(values.getValueCapacity(), offsets.getValueCapacity() - 1);
+  @Override
+  public Accessor getAccessor() {
+    return accessor;
   }
 
-  public int getBufferSize(){
-    if(accessor.getGroupCount() == 0){
-      return 0;
-    }
-    return offsets.getBufferSize() + values.getBufferSize();
+  @Override
+  public FieldReader getReader(){
+    return reader;
   }
 
-  public UInt4Vector getOffsetVector(){
-    return offsets;
-  }
-  
-  public ${minor.class}Vector getValuesVector(){
+  @Override
+  public ${minor.class}Vector getDataVector(){
     return values;
   }
-  
-  public DrillBuf getBuffer(){
-    return values.getBuffer();
-  }
-  
+
+  @Override
   public TransferPair getTransferPair(){
     return new TransferImpl(getField());
   }
+
+  @Override
   public TransferPair getTransferPair(FieldReference ref){
     return new TransferImpl(getField().clone(ref));
   }
 
+  @Override
   public TransferPair makeTransferPair(ValueVector to) {
     return new TransferImpl((Repeated${minor.class}Vector) to);
   }
-  
+
+  @Override
+  public AddOrGetResult<${minor.class}Vector> addOrGetVector(VectorDescriptor descriptor) {
+    final AddOrGetResult<${minor.class}Vector> result = super.addOrGetVector(descriptor);
+    if (result.isCreated()) {
+      values = result.getVector();
+    }
+    return result;
+  }
+
   public void transferTo(Repeated${minor.class}Vector target){
     target.clear();
     offsets.transferTo(target.offsets);
     values.transferTo(target.values);
-    target.parentValueCount = parentValueCount;
-    target.childValueCount = childValueCount;
     clear();
   }
 
@@ -132,8 +130,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
       normalizedPos = a.get(startIndex+i) - startPos;
       m.set(i, normalizedPos);
     }
-    to.parentValueCount = groups;
-    to.childValueCount  = valuesToCopy;
     m.setValueCount(groups == 0 ? 0 : groups + 1);
   }
   
@@ -167,33 +163,27 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
   }
 
     public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){
-      int count = v.getAccessor().getCount(inIndex);
-      getMutator().startNewGroup(outIndex);
+      final int count = v.getAccessor().getInnerValueCountAt(inIndex);
+      getMutator().startNewValue(outIndex);
       for (int i = 0; i < count; i++) {
         getMutator().add(outIndex, v.getAccessor().get(inIndex, i));
       }
     }
 
     public void copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
-      int count = v.getAccessor().getCount(inIndex);
-      getMutator().startNewGroup(outIndex);
+      final int count = v.getAccessor().getInnerValueCountAt(inIndex);
+      getMutator().startNewValue(outIndex);
       for (int i = 0; i < count; i++) {
         getMutator().addSafe(outIndex, v.getAccessor().get(inIndex, i));
       }
     }
 
-  @Override
-  public void setInitialCapacity(int numRecords) {
-    offsets.setInitialCapacity(numRecords + 1);
-    values.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
-  }
 
   public boolean allocateNewSafe(){
     if(!offsets.allocateNewSafe()) return false;
     offsets.zeroVector();
     if(!values.allocateNewSafe()) return false;
     mutator.reset();
-    accessor.reset();
     return true;
   }
   
@@ -202,36 +192,28 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
     offsets.zeroVector();
     values.allocateNew();
     mutator.reset();
-    accessor.reset();
   }
 
   <#if type.major == "VarLen">
   @Override
-  public SerializedField getMetadata() {
-    return getMetadataBuilder() //
-             .setGroupCount(this.parentValueCount) //
-             .setValueCount(this.childValueCount) //
-             .setVarByteLength(values.getVarByteLength()) //
-             .setBufferLength(getBufferSize()) //
-             .build();
+  protected SerializedField.Builder getMetadataBuilder() {
+    return super.getMetadataBuilder()
+            .setVarByteLength(values.getVarByteLength());
   }
   
-  public void allocateNew(int totalBytes, int parentValueCount, int childValueCount) {
-    offsets.allocateNew(parentValueCount+1);
+  public void allocateNew(int totalBytes, int valueCount, int innerValueCount) {
+    offsets.allocateNew(valueCount+1);
     offsets.zeroVector();
-    values.allocateNew(totalBytes, childValueCount);
+    values.allocateNew(totalBytes, innerValueCount);
     mutator.reset();
-    accessor.reset();
   }
   
   @Override
-  public int load(int dataBytes, int parentValueCount, int childValueCount, DrillBuf buf){
+  public int load(int dataBytes, int valueCount, int innerValueCount, DrillBuf buf){
     clear();
-    this.parentValueCount = parentValueCount;
-    this.childValueCount = childValueCount;
     int loaded = 0;
-    loaded += offsets.load(parentValueCount+1, buf.slice(loaded, buf.capacity() - loaded));
-    loaded += values.load(dataBytes + 4*(childValueCount + 1), childValueCount, buf.slice(loaded, buf.capacity() - loaded));
+    loaded += offsets.load(valueCount+1, buf.slice(loaded, buf.capacity() - loaded));
+    loaded += values.load(dataBytes + 4*(innerValueCount + 1), innerValueCount, buf.slice(loaded, buf.capacity() - loaded));
     return loaded;
   }
   
@@ -247,32 +229,20 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
   }
 
   <#else>
-  
-  @Override
-  public SerializedField getMetadata() {
-    return getMetadataBuilder()
-             .setGroupCount(this.parentValueCount)
-             .setValueCount(this.childValueCount)
-             .setBufferLength(getBufferSize())
-             .build();
-  }
-  
-  public void allocateNew(int parentValueCount, int childValueCount) {
+
+  public void allocateNew(int valueCount, int innerValueCount) {
     clear();
-    offsets.allocateNew(parentValueCount+1);
+    offsets.allocateNew(valueCount+1);
     offsets.zeroVector();
-    values.allocateNew(childValueCount);
+    values.allocateNew(innerValueCount);
     mutator.reset();
-    accessor.reset();
   }
   
-  public int load(int parentValueCount, int childValueCount, DrillBuf buf){
+  public int load(int valueCount, int innerValueCount, DrillBuf buf){
     clear();
-    this.parentValueCount = parentValueCount;
-    this.childValueCount = childValueCount;
     int loaded = 0;
-    loaded += offsets.load(parentValueCount+1, buf.slice(loaded, buf.capacity() - loaded));
-    loaded += values.load(childValueCount, buf.slice(loaded, buf.capacity() - loaded));
+    loaded += offsets.load(valueCount+1, buf.slice(loaded, buf.capacity() - loaded));
+    loaded += values.load(innerValueCount, buf.slice(loaded, buf.capacity() - loaded));
     return loaded;
   }
   
@@ -284,49 +254,12 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
   }
   </#if>
 
-  @Override
-  public DrillBuf[] getBuffers(boolean clear) {
-    DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), values.getBuffers(false), DrillBuf.class);
-    if (clear) {
-      for (DrillBuf buffer:buffers) {
-        buffer.retain();
-      }
-      clear();
-    }
-    return buffers;
-  }
-
-  public void clear(){
-    offsets.clear();
-    values.clear();
-    parentValueCount = 0;
-    childValueCount = 0;
-  }
-
-  public Mutator getMutator(){
-    return mutator;
-  }
-  
-  public Accessor getAccessor(){
-    return accessor;
-  }
 
   // This is declared a subclass of the accessor declared inside of FixedWidthVector, this is also used for
   // variable length vectors, as they should ahve consistent interface as much as possible, if they need to diverge
   // in the future, the interface shold be declared in the respective value vector superclasses for fixed and variable
   // and we should refer to each in the generation template
-  public final class Accessor extends BaseValueVector.BaseAccessor implements RepeatedFixedWidthVector.RepeatedAccessor{
-
-    /**
-     * Get the elements at the given index.
-     */
-    public int getCount(int index) {
-      return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
-    }
-
-    public ValueVector getAllChildValues() {
-      return values;
-    }
+  public final class Accessor extends BaseRepeatedValueVector.BaseRepeatedAccessor {
 
     public List<${friendlyType}> getObject(int index) {
       List<${friendlyType}> vals = new JsonStringArrayList();
@@ -337,10 +270,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
       }
       return vals;
     }
-
-    public int getGroupSizeAtIndex(int index){
-      return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
-    }
     
     public ${friendlyType} getSingleObject(int index, int arrayIndex){
       int start = offsets.getAccessor().get(index);
@@ -360,12 +289,7 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
            </#if> get(int index, int positionIndex) {
       return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex);
     }
-        
-           
-    public boolean isNull(int index){
-      return false;
-    }
-    
+
     public void get(int index, Repeated${minor.class}Holder holder){
       holder.start = offsets.getAccessor().get(index);
       holder.end =  offsets.getAccessor().get(index+1);
@@ -375,61 +299,24 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
     public void get(int index, int positionIndex, ${minor.class}Holder holder) {
       int offset = offsets.getAccessor().get(index);
       assert offset >= 0;
-      assert positionIndex < getCount(index);
+      assert positionIndex < getInnerValueCountAt(index);
       values.getAccessor().get(offset + positionIndex, holder);
     }
     
     public void get(int index, int positionIndex, Nullable${minor.class}Holder holder) {
       int offset = offsets.getAccessor().get(index);
       assert offset >= 0;
-      if (positionIndex >= getCount(index)) {
+      if (positionIndex >= getInnerValueCountAt(index)) {
         holder.isSet = 0;
         return;
       }
       values.getAccessor().get(offset + positionIndex, holder);
     }
-
-    public MaterializedField getField() {
-      return field;
-    }
-    
-    public int getGroupCount(){
-      return parentValueCount;
-    }
-    
-    public int getValueCount(){
-      return childValueCount;
-    }
-    
-    public void reset(){
-      
-    }
   }
   
-  public final class Mutator extends BaseValueVector.BaseMutator implements RepeatedMutator {
-
-    
-    private Mutator(){
-    }
-
-    public void setRepetitionAtIndexSafe(int index, int repetitionCount) {
-      offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index) + repetitionCount);
-    }
-
-    public BaseDataValueVector getDataVector() {
-      return values;
-    }
-
-    public void setValueCounts(int parentValueCount, int childValueCount){
-      Repeated${minor.class}Vector.this.parentValueCount = parentValueCount;
-      Repeated${minor.class}Vector.this.childValueCount = childValueCount;
-      values.getMutator().setValueCount(childValueCount);
-      offsets.getMutator().setValueCount(parentValueCount == 0 ? 0 : parentValueCount + 1);
-    }
+  public final class Mutator extends BaseRepeatedValueVector.BaseRepeatedMutator implements RepeatedMutator {
 
-    public void startNewGroup(int index) {
-      offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
-    }
+    private Mutator() { }
 
     /**
      * Add an element to the given record index.  This is similar to the set() method in other
@@ -468,7 +355,7 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
     
     public void setSafe(int index, Repeated${minor.class}Holder h){
       ${minor.class}Holder ih = new ${minor.class}Holder();
-      getMutator().startNewGroup(index);
+      getMutator().startNewValue(index);
       for(int i = h.start; i < h.end; i++){
         h.vector.getAccessor().get(i, ih);
         getMutator().addSafe(index, ih);
@@ -511,17 +398,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
         add(index, innerHolder);
       }
     }
-    
-    /**
-     * Set the number of value groups in this repeated field.
-     * @param groupCount Count of Value Groups.
-     */
-    public void setValueCount(int groupCount) {
-      parentValueCount = groupCount;
-      childValueCount = offsets.getAccessor().get(groupCount);
-      offsets.getMutator().setValueCount(groupCount == 0 ? 0 : groupCount+1);
-      values.getMutator().setValueCount(childValueCount);
-    }
 
     public void generateTestData(final int valCount){
       int[] sizes = {1,2,0,6};

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java
new file mode 100644
index 0000000..f2a7e63
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class SchemaChangeRuntimeException extends DrillRuntimeException {
+  public SchemaChangeRuntimeException() {
+    super();
+  }
+
+  public SchemaChangeRuntimeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public SchemaChangeRuntimeException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SchemaChangeRuntimeException(String message) {
+    super(message);
+  }
+
+  public SchemaChangeRuntimeException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 7a5b352..00a78fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -21,15 +21,13 @@ import java.io.IOException;
 import java.util.List;
 
 import com.carrotsearch.hppc.IntOpenHashSet;
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -52,9 +50,8 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.RepeatedVector;
+import org.apache.drill.exec.vector.RepeatedValueVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
@@ -129,13 +126,13 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
 
   private void setFlattenVector() {
     try {
-      flattener.setFlattenField((RepeatedVector) incoming.getValueAccessorById(
-          incoming.getSchema().getColumn(
-              incoming.getValueVectorId(
-                  popConfig.getColumn()).getFieldIds()[0]).getValueClass(),
-          incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector());
+      final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
+      final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+      final RepeatedValueVector vector = RepeatedValueVector.class.cast(incoming.getValueAccessorById(
+          field.getValueClass(), typedFieldId.getFieldIds()).getValueVector());
+      flattener.setFlattenField(vector);
     } catch (Exception ex) {
-      throw new DrillRuntimeException("Trying to flatten a non-repeated filed.");
+      throw UserException.unsupportedError(ex).message("Trying to flatten a non-repeated field.").build();
     }
   }
 
@@ -152,7 +149,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     // inside of the the flattener for the current batch
     setFlattenVector();
 
-    int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getValueCount();
+    int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount();
     int outputRecords = flattener.flattenRecords(0, incomingRecordCount, 0);
     // TODO - change this to be based on the repeated vector length
     if (outputRecords < childCount) {
@@ -178,7 +175,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
   }
 
   private void handleRemainder() {
-    int remainingRecordCount = flattener.getFlattenField().getAccessor().getValueCount() - remainderIndex;
+    int remainingRecordCount = flattener.getFlattenField().getAccessor().getInnerValueCount() - remainderIndex;
     if (!doAlloc()) {
       outOfMemory = true;
       return;
@@ -271,7 +268,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     if (flattenField instanceof RepeatedMapVector) {
       tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference);
     } else {
-      ValueVector vvIn = ((RepeatedVector)flattenField).getAccessor().getAllChildValues();
+      final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector();
       // vvIn may be null because of fast schema return for repeated list vectors
       if (vvIn != null) {
         tp = vvIn.getTransferPair(reference);

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index 96209a2..b8d040c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -31,8 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 
 import com.google.common.collect.ImmutableList;
 
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector.RepeatedAccessor;
-import org.apache.drill.exec.vector.RepeatedVector;
+import org.apache.drill.exec.vector.RepeatedValueVector;
 
 public abstract class FlattenTemplate implements Flattener {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenTemplate.class);
@@ -43,9 +42,9 @@ public abstract class FlattenTemplate implements Flattener {
   private SelectionVector2 vector2;
   private SelectionVector4 vector4;
   private SelectionVectorMode svMode;
-  RepeatedVector fieldToFlatten;
-  RepeatedAccessor accessor;
-  private int groupIndex;
+  private RepeatedValueVector fieldToFlatten;
+  private RepeatedValueVector.RepeatedAccessor accessor;
+  private int valueIndex;
 
   // this allows for groups to be written between batches if we run out of space, for cases where we have finished
   // a batch on the boundary it will be set to 0
@@ -60,12 +59,12 @@ public abstract class FlattenTemplate implements Flattener {
   }
 
   @Override
-  public void setFlattenField(RepeatedVector flattenField) {
+  public void setFlattenField(RepeatedValueVector flattenField) {
     this.fieldToFlatten = flattenField;
-    this.accessor = flattenField.getAccessor();
+    this.accessor = RepeatedValueVector.RepeatedAccessor.class.cast(flattenField.getAccessor());
   }
 
-  public RepeatedVector getFlattenField() {
+  public RepeatedValueVector getFlattenField() {
     return fieldToFlatten;
   }
 
@@ -84,14 +83,14 @@ public abstract class FlattenTemplate implements Flattener {
           childIndexWithinCurrGroup = 0;
         }
         outer: {
-          final int groupCount = accessor.getGroupCount();
-          for ( ; groupIndex < groupCount; groupIndex++) {
-            currGroupSize = accessor.getGroupSizeAtIndex(groupIndex);
+          final int valueCount = accessor.getValueCount();
+          for ( ; valueIndex < valueCount; valueIndex++) {
+            currGroupSize = accessor.getInnerValueCountAt(valueIndex);
             for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) {
               if (firstOutputIndex == OUTPUT_BATCH_SIZE) {
                 break outer;
               }
-              doEval(groupIndex, firstOutputIndex);
+              doEval(valueIndex, firstOutputIndex);
               firstOutputIndex++;
               childIndex++;
             }
@@ -133,7 +132,7 @@ public abstract class FlattenTemplate implements Flattener {
 
   @Override
   public void resetGroupIndex() {
-    this.groupIndex = 0;
+    this.valueIndex = 0;
     this.currGroupSize = 0;
     this.childIndex = 0;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 2141ca2..323bf43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -24,14 +24,14 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.vector.RepeatedVector;
+import org.apache.drill.exec.vector.RepeatedValueVector;
 
 public interface Flattener {
 
   public abstract void setup(FragmentContext context, RecordBatch incoming,  RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException;
   public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex);
-  public void setFlattenField(RepeatedVector repeatedColumn);
-  public RepeatedVector getFlattenField();
+  public void setFlattenField(RepeatedValueVector repeatedColumn);
+  public RepeatedValueVector getFlattenField();
   public void resetGroupIndex();
 
   public static TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 32ffb6f..946d117 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -63,6 +63,7 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import com.carrotsearch.hppc.IntOpenHashSet;
@@ -415,7 +416,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
         ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
         cg.addExpr(expr);
-      } else{
+      } else {
         // need to do evaluation.
         final ValueVector vector = container.addOrGet(outputField, callBack);
         allocationVectors.add(vector);
@@ -424,6 +425,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
         final HoldingContainer hc = cg.addExpr(write);
 
+        // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector.
+        if (expr instanceof ValueVectorReadExpression) {
+          final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+          if (!vectorRead.hasReadPath()) {
+            final TypedFieldId id = vectorRead.getFieldId();
+            final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+            vvIn.makeTransferPair(vector);
+          }
+        }
         logger.debug("Added eval for project expression.");
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index 8387d49..e602fd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -18,9 +18,9 @@
 package org.apache.drill.exec.store;
 
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
 import org.apache.drill.exec.vector.RepeatedMutator;
-import org.apache.drill.exec.vector.RepeatedVariableWidthVector;
+import org.apache.drill.exec.vector.RepeatedVariableWidthVectorLike;
 import org.apache.drill.exec.vector.ValueVector;
 
 public class VectorHolder {
@@ -35,7 +35,7 @@ public class VectorHolder {
   public VectorHolder(int length, ValueVector vector) {
     this.length = length;
     this.vector = vector;
-    if (vector instanceof RepeatedFixedWidthVector || vector instanceof  RepeatedVariableWidthVector) {
+    if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) {
       repeated = true;
     }
   }
@@ -43,7 +43,7 @@ public class VectorHolder {
   public VectorHolder(ValueVector vector) {
     this.length = vector.getValueCapacity();
     this.vector = vector;
-    if (vector instanceof RepeatedFixedWidthVector || vector instanceof  RepeatedVariableWidthVector) {
+    if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) {
       repeated = true;
     }
   }
@@ -90,7 +90,7 @@ public class VectorHolder {
 
   public void populateVectorLength() {
     ValueVector.Mutator mutator = vector.getMutator();
-    if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+    if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) {
       mutator.setValueCount(groupCount);
     } else {
       mutator.setValueCount(count);

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
index 3ad5c2a..40276f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
@@ -192,7 +192,7 @@ class RepeatedVarCharOutput extends TextOutput {
   }
 
   private void loadVarCharDataAddress(){
-    DrillBuf buf = vector.getValuesVector().getBuffer();
+    DrillBuf buf = vector.getDataVector().getBuffer();
     checkBuf(buf);
     this.characterData = buf.memoryAddress();
     this.characterDataOriginal = buf.memoryAddress();
@@ -200,7 +200,7 @@ class RepeatedVarCharOutput extends TextOutput {
   }
 
   private void loadVarCharOffsetAddress(){
-    DrillBuf buf = vector.getValuesVector().getOffsetVector().getBuffer();
+    DrillBuf buf = vector.getDataVector().getOffsetVector().getBuffer();
     checkBuf(buf);
     this.charLengthOffset = buf.memoryAddress() + 4;
     this.charLengthOffsetOriginal = buf.memoryAddress() + 4; // add four as offsets conceptually start at 1. (first item is 0..1)
@@ -208,14 +208,14 @@ class RepeatedVarCharOutput extends TextOutput {
   }
 
   private void expandVarCharOffsets(){
-    vector.getValuesVector().getOffsetVector().reAlloc();
+    vector.getDataVector().getOffsetVector().reAlloc();
     long diff = charLengthOffset - charLengthOffsetOriginal;
     loadVarCharOffsetAddress();
     charLengthOffset += diff;
   }
 
   private void expandVarCharData(){
-    vector.getValuesVector().reAlloc();
+    vector.getDataVector().reAlloc();
     long diff = characterData - characterDataOriginal;
     loadVarCharDataAddress();
     characterData += diff;

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index 7f8b611..2b929a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -20,7 +20,10 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 import java.io.IOException;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
+import org.apache.drill.exec.vector.RepeatedValueVector;
+import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import parquet.column.ColumnDescriptor;
@@ -29,7 +32,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 public class FixedWidthRepeatedReader extends VarLengthColumn {
 
-  RepeatedFixedWidthVector castedRepeatedVector;
+  RepeatedValueVector castedRepeatedVector;
   ColumnReader dataReader;
   int dataTypeLengthInBytes;
   // we can do a vector copy of the data once we figure out how much we need to copy
@@ -47,9 +50,9 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
   boolean notFishedReadingList;
   byte[] leftOverBytes;
 
-  FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
+  FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
-    castedRepeatedVector = (RepeatedFixedWidthVector) valueVector;
+    this.castedRepeatedVector = valueVector;
     this.dataTypeLengthInBytes = dataTypeLengthInBytes;
     this.dataReader = dataReader;
     this.dataReader.pageReader.clear();
@@ -65,7 +68,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
     bytesReadInCurrentPass = 0;
     valuesReadInCurrentPass = 0;
     pageReader.valuesReadyToRead = 0;
-    dataReader.vectorData = castedRepeatedVector.getMutator().getDataVector().getBuffer();
+    dataReader.vectorData = BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer();
     dataReader.valuesReadInCurrentPass = 0;
     repeatedGroupsReadInCurrentPass = 0;
   }
@@ -200,8 +203,8 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
       currentValueListLength += numLeftoverVals;
     }
     // this should not fail
-    castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass,
-        currentValueListLength);
+    final UInt4Vector offsets = castedRepeatedVector.getOffsetVector();
+    offsets.getMutator().setSafe(repeatedGroupsReadInCurrentPass + 1, offsets.getAccessor().get(repeatedGroupsReadInCurrentPass));
     // This field is being referenced in the superclass determineSize method, so we need to set it here
     // again going to make this the length in BYTES to avoid repetitive multiplication/division
     dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes;
@@ -218,12 +221,13 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
     dataReader.valuesReadInCurrentPass = 0;
     dataReader.readValues(valuesToRead);
     valuesReadInCurrentPass += valuesToRead;
-    castedRepeatedVector.getMutator().setValueCounts(repeatedGroupsReadInCurrentPass, valuesReadInCurrentPass);
+    castedRepeatedVector.getMutator().setValueCount(repeatedGroupsReadInCurrentPass);
+    castedRepeatedVector.getDataVector().getMutator().setValueCount(valuesReadInCurrentPass);
   }
 
   @Override
   public int capacity() {
-    return castedRepeatedVector.getMutator().getDataVector().getBuffer().capacity();
+    return BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer().capacity();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2f07fb3..0cbd480 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -41,7 +41,7 @@ import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.parquet.DirectCodecFactory;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.RepeatedValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -274,7 +274,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
     }
 
     try {
-      ValueVector v;
+      ValueVector vector;
       SchemaElement schemaElement;
       ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
       // initialize all of the column read status objects
@@ -292,23 +292,24 @@ public class ParquetRecordReader extends AbstractRecordReader {
         }
 
         fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
-        v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+        vector = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
         if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
           if (column.getMaxRepetitionLevel() > 0) {
+            final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector);
             ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
                 column, columnChunkMetaData, recordsPerBatch,
-                ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement);
+                repeatedVector.getDataVector(), schemaElement);
             varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader,
-                getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement));
+                getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, repeatedVector, schemaElement));
           }
           else {
             columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
-                column, columnChunkMetaData, recordsPerBatch, v,
+                column, columnChunkMetaData, recordsPerBatch, vector,
                 schemaElement));
           }
         } else {
           // create a reader and add it to the appropriate list
-          varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement));
+          varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, vector, schemaElement));
         }
       }
       varLengthReader = new VarLenBinaryReader(this, varLengthColumns);

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index e25bd74..c59ade9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -162,7 +162,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
         // index of the scanned field
         int p = 0;
         int i = 0;
-        vector.getMutator().startNewGroup(recordCount);
+        vector.getMutator().startNewValue(recordCount);
         // Process each field in this line
         while (end < value.getLength() - 1) {
           if(numCols > 0 && p >= numCols) {

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java
new file mode 100644
index 0000000..7d1f08d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import com.google.common.base.Preconditions;
+
+public class AddOrGetResult<V extends ValueVector> {
+  private final V vector;
+  private final boolean created;
+
+  public AddOrGetResult(V vector, boolean created) {
+    this.vector = Preconditions.checkNotNull(vector);
+    this.created = created;
+  }
+
+  public V getVector() {
+    return vector;
+  }
+
+  public boolean isCreated() {
+    return created;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index bf465c7..eddefd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -31,10 +31,10 @@ public class AllocationHelper {
       ((FixedWidthVector) v).allocateNew(valueCount);
     } else if (v instanceof VariableWidthVector) {
       ((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount);
-    }else if(v instanceof RepeatedFixedWidthVector){
-      ((RepeatedFixedWidthVector) v).allocateNew(valueCount, childValCount);
-    }else if(v instanceof RepeatedVariableWidthVector){
-      ((RepeatedVariableWidthVector) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount);
+    }else if(v instanceof RepeatedFixedWidthVectorLike){
+      ((RepeatedFixedWidthVectorLike) v).allocateNew(valueCount, childValCount);
+    }else if(v instanceof RepeatedVariableWidthVectorLike){
+      ((RepeatedVariableWidthVectorLike) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount);
     }else{
       v.allocateNew();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 0c6097c..6d356f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -22,8 +22,8 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 
 
-public abstract class BaseDataValueVector<V extends BaseValueVector<V, A, M>, A extends BaseValueVector.BaseAccessor,
-    M extends BaseValueVector.BaseMutator> extends BaseValueVector<V, A, M> {
+public abstract class BaseDataValueVector<A extends BaseValueVector.BaseAccessor, M extends BaseValueVector.BaseMutator>
+    extends BaseValueVector<A, M> {
 
   protected DrillBuf data;
 
@@ -36,6 +36,7 @@ public abstract class BaseDataValueVector<V extends BaseValueVector<V, A, M>, A
   public void clear() {
     data.release();
     data = allocator.getEmpty();
+    super.clear();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java
new file mode 100644
index 0000000..bcf0793
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ObjectArrays;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeRuntimeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.MaterializedField;
+
+public abstract class BaseRepeatedValueVector<A extends RepeatedValueVector.RepeatedAccessor, M extends RepeatedValueVector.RepeatedMutator>
+  extends BaseValueVector<A, M> implements RepeatedValueVector<A, M> {
+
+  public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
+  public final static String OFFSETS_VECTOR_NAME = "offsets";
+  public final static String DATA_VECTOR_NAME = "data";
+
+  private final static MaterializedField offsetsField =
+      MaterializedField.create(OFFSETS_VECTOR_NAME, Types.required(TypeProtos.MinorType.UINT4));
+
+  protected final UInt4Vector offsets;
+  protected ValueVector vector;
+
+  protected BaseRepeatedValueVector(MaterializedField field, BufferAllocator allocator) {
+    this(field, allocator, DEFAULT_DATA_VECTOR);
+  }
+
+  protected BaseRepeatedValueVector(MaterializedField field, BufferAllocator allocator, ValueVector vector) {
+    super(field, allocator);
+    this.offsets = new UInt4Vector(offsetsField, allocator);
+    this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null");
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    if (!offsets.allocateNewSafe()) {
+      return false;
+    }
+    offsets.zeroVector();
+    return vector.allocateNewSafe();
+  }
+
+  @Override
+  public UInt4Vector getOffsetVector() {
+    return offsets;
+  }
+
+  @Override
+  public ValueVector getDataVector() {
+    return vector;
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    offsets.setInitialCapacity(numRecords + 1);
+    vector.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
+  }
+
+  @Override
+  public int getValueCapacity() {
+    final int offsetValueCapacity = offsets.getValueCapacity() - 1;
+    if (vector == DEFAULT_DATA_VECTOR) {
+      return offsetValueCapacity;
+    }
+    return Math.min(vector.getValueCapacity(), offsetValueCapacity);
+  }
+
+  @Override
+  protected UserBitShared.SerializedField.Builder getMetadataBuilder() {
+    return super.getMetadataBuilder()
+        .setGroupCount(getAccessor().getValueCount())
+        .setValueCount(getAccessor().getInnerValueCount())
+        .addChild(vector.getMetadata());
+  }
+
+  @Override
+  public int getBufferSize() {
+    if (getAccessor().getValueCount() == 0) {
+      return 0;
+    }
+    return offsets.getBufferSize() + vector.getBufferSize();
+  }
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return Collections.singleton(getDataVector()).iterator();
+  }
+
+  @Override
+  public void clear() {
+    offsets.clear();
+    vector.clear();
+    super.clear();
+  }
+
+  @Override
+  public DrillBuf[] getBuffers(boolean clear) {
+    final DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), vector.getBuffers(false), DrillBuf.class);
+    if (clear) {
+      for (DrillBuf buffer:buffers) {
+        buffer.retain();
+      }
+      clear();
+    }
+    return buffers;
+  }
+
+  /**
+   * Returns 1 if inner vector is explicitly set via #addOrGetVector else 0
+   *
+   * @see {@link ContainerVectorLike#size}
+   */
+  @Override
+  public int size() {
+    return vector == DEFAULT_DATA_VECTOR ? 0:1;
+  }
+
+  @Override
+  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
+    boolean created = false;
+    if (vector == DEFAULT_DATA_VECTOR) {
+      vector = TypeHelper.getNewVector(MaterializedField.create(DATA_VECTOR_NAME, descriptor.getType()), allocator);
+      getField().addChild(vector.getField());
+      created = true;
+    }
+
+    final TypeProtos.MajorType actual = vector.getField().getType();
+    if (!actual.equals(descriptor.getType())) {
+      final String msg = String.format("Inner vector type mismatch. Requested type: [%s], actual type: [%s]",
+          descriptor.getType(), actual);
+      throw new SchemaChangeRuntimeException(msg);
+    }
+
+    return new AddOrGetResult<>((T)vector, created);
+  }
+
+  public abstract class BaseRepeatedAccessor extends BaseValueVector.BaseAccessor implements RepeatedAccessor {
+
+    @Override
+    public int getValueCount() {
+      return Math.max(offsets.getAccessor().getValueCount() - 1, 0);
+    }
+
+    @Override
+    public int getInnerValueCount() {
+      return vector.getAccessor().getValueCount();
+    }
+
+    @Override
+    public int getInnerValueCountAt(int index) {
+      return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return false;
+    }
+
+    @Override
+    public boolean isEmpty(int index) {
+      return false;
+    }
+  }
+
+  public abstract class BaseRepeatedMutator extends BaseValueVector.BaseMutator implements RepeatedMutator {
+
+    @Override
+    public void startNewValue(int index) {
+      offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
+      setValueCount(index+1);
+    }
+
+    @Override
+    public void setValueCount(int valueCount) {
+      // TODO: populate offset end points
+      offsets.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount+1);
+      final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount);
+      vector.getMutator().setValueCount(childValueCount);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 22f0fe7..67c489d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -25,9 +25,10 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
 
-public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A extends BaseValueVector.BaseAccessor,
-    M extends BaseValueVector.BaseMutator> implements ValueVector<V, A, M> {
+public abstract class BaseValueVector<A extends ValueVector.Accessor, M extends ValueVector.Mutator>
+    implements ValueVector<A, M> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
 
   protected final BufferAllocator allocator;
@@ -40,6 +41,11 @@ public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A exte
   }
 
   @Override
+  public void clear() {
+    getMutator().reset();
+  }
+
+  @Override
   public void close() {
     clear();
   }
@@ -54,6 +60,11 @@ public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A exte
   }
 
   @Override
+  public TransferPair getTransferPair() {
+    return getTransferPair(new FieldReference(getField().getPath()));
+  }
+
+  @Override
   public SerializedField getMetadata() {
     return getMetadataBuilder().build();
   }
@@ -76,11 +87,15 @@ public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A exte
   public abstract static class BaseMutator implements ValueVector.Mutator {
     protected BaseMutator() { }
 
+    @Override
+    public void generateTestData(int values) { }
+
+    //TODO: consider making mutator stateless(if possible) on another issue.
     public void reset() { }
   }
 
   @Override
-  public Iterator<ValueVector<V,A,M>> iterator() {
+  public Iterator<ValueVector> iterator() {
     return Iterators.emptyIterator();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java
new file mode 100644
index 0000000..95e3365
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+/**
+ * A mix-in used for introducing container vector-like behaviour.
+ */
+public interface ContainerVectorLike {
+
+  /**
+   * Creates and adds a child vector if none with the same name exists, else returns the vector instance.
+   *
+   * @param  descriptor vector descriptor
+   * @return  result of operation wrapping vector corresponding to the given descriptor and whether it's newly created
+   * @throws org.apache.drill.common.exceptions.DrillRuntimeException
+   *    if schema change is not permissible between the given and existing data vector types.
+   */
+  <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor);
+
+  /**
+   * Returns the number of child vectors in this container vector-like instance.
+   */
+  int size();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
deleted file mode 100644
index eaae7ad..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector;
-
-import io.netty.buffer.DrillBuf;
-
-public interface RepeatedFixedWidthVector extends ValueVector, RepeatedVector {
-  /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
-   * @param parentValueCount   Number of separate repeating groupings.
-   * @param childValueCount   Number of supported values in the vector.
-   */
-  public void allocateNew(int parentValueCount, int childValueCount);
-
-  /**
-   * Load the records in the provided buffer based on the given number of values.
-   * @param parentValueCount   Number of separate repeating groupings.
-   * @param valueCount Number atomic values the buffer contains.
-   * @param buf Incoming buffer.
-   * @return The number of bytes of the buffer that were consumed.
-   */
-  public int load(int parentValueCount, int childValueCount, DrillBuf buf);
-
-  public abstract RepeatedMutator getMutator();
-
-  public interface RepeatedAccessor extends Accessor {
-    public int getGroupCount();
-    public int getValueCount();
-    public int getGroupSizeAtIndex(int index);
-    public ValueVector getAllChildValues();
-  }
-  public interface RepeatedMutator extends Mutator {
-    public void setValueCounts(int parentValueCount, int childValueCount);
-    public void setRepetitionAtIndexSafe(int index, int repetitionCount);
-    public BaseDataValueVector getDataVector();
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java
new file mode 100644
index 0000000..450c673
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * A {@link org.apache.drill.exec.vector.ValueVector} mix-in that can be used in conjunction with
+ * {@link org.apache.drill.exec.vector.RepeatedValueVector} subtypes.
+ */
+public interface RepeatedFixedWidthVectorLike {
+  /**
+   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
+   *
+   * @param valueCount   Number of separate repeating groupings.
+   * @param innerValueCount   Number of supported values in the vector.
+   */
+  public void allocateNew(int valueCount, int innerValueCount);
+
+  /**
+   * Load the records in the provided buffer based on the given number of values.
+   * @param valueCount   Number of separate repeating groupings.
+   * @param innerValueCount Number atomic values the buffer contains.
+   * @param buf Incoming buffer.
+   * @return The number of bytes of the buffer that were consumed.
+   */
+  public int load(int valueCount, int innerValueCount, DrillBuf buf);
+
+//  public interface RepeatedAccessor extends Accessor {
+//    public int getGroupCount();
+//    public int getValueCount();
+//    public int getGroupSizeAtIndex(int index);
+//    public ValueVector getAllChildValues();
+//  }
+//
+//  public interface RepeatedMutator extends Mutator {
+//    public void setValueCounts(int parentValueCount, int childValueCount);
+//    public void setRepetitionAtIndexSafe(int index, int repetitionCount);
+//    public BaseDataValueVector getDataVector();
+//  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java
new file mode 100644
index 0000000..d5a8281
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+/**
+ * An abstraction representing repeated value vectors.
+ *
+ * A repeated vector contains values that may either be flat or nested. A value consists of zero or more cells(inner values).
+ * Current design maintains data and offsets vectors. Each cell is stored in the data vector. Repeated vector
+ * uses the offset vector to determine the sequence of cells pertaining to an individual value.
+ *
+ * @param <A>  repeated accessor type
+ * @param <M>  repeated mutator type
+ */
+public interface RepeatedValueVector<A extends RepeatedValueVector.RepeatedAccessor, M extends RepeatedValueVector.RepeatedMutator>
+    extends ValueVector<A, M>, ContainerVectorLike {
+
+  final static int DEFAULT_REPEAT_PER_RECORD = 5;
+
+  /**
+   * Returns the underlying offset vector or null if none exists.
+   *
+   * TODO(DRILL-2995): eliminate exposing low-level interfaces.
+   */
+  UInt4Vector getOffsetVector();
+
+  /**
+   * Returns the underlying data vector or null if none exists.
+   */
+  ValueVector getDataVector();
+
+  @Override
+  A getAccessor();
+
+  @Override
+  M getMutator();
+
+  interface RepeatedAccessor extends ValueVector.Accessor {
+    /**
+     * Returns total number of cells that vector contains.
+     *
+     * The result includes empty, null valued cells.
+     */
+    int getInnerValueCount();
+
+
+    /**
+     * Returns number of cells that the value at the given index contains.
+     */
+    int getInnerValueCountAt(int index);
+
+    /**
+     * Returns true if the value at the given index is empty, false otherwise.
+     *
+     * @param index  value index
+     */
+    boolean isEmpty(int index);
+  }
+
+  interface RepeatedMutator extends ValueVector.Mutator {
+    /**
+     * Starts a new value that is a container of cells.
+     *
+     * @param index  index of new value to start
+     */
+    void startNewValue(int index);
+
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
deleted file mode 100644
index a499341..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector;
-
-import io.netty.buffer.DrillBuf;
-
-public interface RepeatedVariableWidthVector extends ValueVector, RepeatedVector {
-  /**
-   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
-   *
-   * @param totalBytes   Desired size of the underlying data buffer.
-   * @param parentValueCount   Number of separate repeating groupings.
-   * @param childValueCount   Number of supported values in the vector.
-   */
-  public void allocateNew(int totalBytes, int parentValueCount, int childValueCount);
-
-  /**
-   * Provide the maximum amount of variable width bytes that can be stored int his vector.
-   * @return
-   */
-  public int getByteCapacity();
-
-  /**
-   * Load the records in the provided buffer based on the given number of values.
-   * @param dataBytes   The number of bytes associated with the data array.
-   * @param parentValueCount   Number of separate repeating groupings.
-   * @param childValueCount   Number of supported values in the vector.
-   * @param buf Incoming buffer.
-   * @return The number of bytes of the buffer that were consumed.
-   */
-  public int load(int dataBytes, int parentValueCount, int childValueCount, DrillBuf buf);
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java
new file mode 100644
index 0000000..ac8589e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.DrillBuf;
+
+public interface RepeatedVariableWidthVectorLike {
+  /**
+   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
+   *
+   * @param totalBytes   Desired size of the underlying data buffer.
+   * @param parentValueCount   Number of separate repeating groupings.
+   * @param childValueCount   Number of supported values in the vector.
+   */
+  public void allocateNew(int totalBytes, int parentValueCount, int childValueCount);
+
+  /**
+   * Provide the maximum amount of variable width bytes that can be stored int his vector.
+   * @return
+   */
+  public int getByteCapacity();
+
+  /**
+   * Load the records in the provided buffer based on the given number of values.
+   * @param dataBytes   The number of bytes associated with the data array.
+   * @param parentValueCount   Number of separate repeating groupings.
+   * @param childValueCount   Number of supported values in the vector.
+   * @param buf Incoming buffer.
+   * @return The number of bytes of the buffer that were consumed.
+   */
+  public int load(int dataBytes, int parentValueCount, int childValueCount, DrillBuf buf);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
deleted file mode 100644
index df4279a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector;
-
-public interface RepeatedVector extends ValueVector {
-  public static final int DEFAULT_REPEAT_PER_RECORD = 4;
-
-  public RepeatedFixedWidthVector.RepeatedAccessor getAccessor();
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/4689468e/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
index 386ee34..de05131 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
@@ -28,9 +28,9 @@ public class SchemaChangeCallBack implements CallBack {
   }
 
   public boolean getSchemaChange() {
-    boolean schemaChange = this.schemaChange;
-    this.schemaChange = false;
-    return schemaChange;
+    final boolean current = schemaChange;
+    schemaChange = false;
+    return current;
   }
 }