You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/07/03 19:49:34 UTC

[1/3] drill git commit: DRILL-5517: Size-aware set methods in value vectors

Repository: drill
Updated Branches:
  refs/heads/master 6446e56f2 -> 63e243378


http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 170c606..0f8d90c 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -251,8 +251,8 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     bits.zeroVector();
     values.zeroVector();
   }
-  </#if>
 
+  </#if>
   @Override
   public void load(SerializedField metadata, DrillBuf buffer) {
     clear();
@@ -374,6 +374,27 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     values.copyFromSafe(fromIndex, thisIndex, from.values);
   }
 
+  @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    Nullable${minor.class}Vector fromVector = (Nullable${minor.class}Vector) from;
+    <#if type.major == "VarLen">
+
+    // This method is to be called only for loading the vector
+    // sequentially, so there should be no empties to fill.
+
+    </#if>
+    bits.copyFromSafe(fromIndex, toIndex, fromVector.bits);
+    values.copyFromSafe(fromIndex, toIndex, fromVector.values);
+  }
+
+  @Override
+  public void exchange(ValueVector other) {
+    ${className} target = (${className}) other;
+    bits.exchange(target.bits);
+    values.exchange(target.values);
+    mutator.exchange(other.getMutator());
+  }
+
   public final class Accessor extends BaseDataValueVector.BaseAccessor <#if type.major = "VarLen">implements VariableWidthVector.VariableWidthAccessor</#if> {
     final UInt1Vector.Accessor bAccessor = bits.getAccessor();
     final ${valuesName}.Accessor vAccessor = values.getAccessor();
@@ -383,7 +404,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
      *
      * @param   index   position of the value
      * @return  value of the element, if not null
-     * @throws  NullValueException if the value is null
+     * @throws  IllegalStateException if the value is null
      */
     public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
       if (isNull(index)) {
@@ -410,8 +431,8 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public int getValueLength(int index) {
       return values.getAccessor().getValueLength(index);
     }
-    </#if>
 
+    </#if>
     public void get(int index, Nullable${minor.class}Holder holder){
       vAccessor.get(index, holder);
       holder.isSet = bAccessor.get(index);
@@ -439,22 +460,21 @@ public final class ${className} extends BaseDataValueVector implements <#if type
         return vAccessor.getAsStringBuilder(index);
       }
     }
-    </#if>
 
+    </#if>
     @Override
-    public int getValueCount(){
+    public int getValueCount() {
       return bits.getAccessor().getValueCount();
     }
 
-    public void reset(){}
+    public void reset() {}
   }
 
   public final class Mutator extends BaseDataValueVector.BaseMutator implements NullableVectorDefinitionSetter<#if type.major = "VarLen">, VariableWidthVector.VariableWidthMutator</#if> {
     private int setCount;
-    <#if type.major = "VarLen"> private int lastSet = -1;</#if>
+    <#if type.major = "VarLen">private int lastSet = -1;</#if>
 
-    private Mutator(){
-    }
+    private Mutator() { }
 
     public ${valuesName} getVectorWithValues(){
       return values;
@@ -466,11 +486,12 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     }
 
     /**
-     * Set the variable length element at the specified index to the supplied byte array.
+     * Set the variable length element at the specified index to the supplied value.
      *
      * @param index   position of the bit to set
-     * @param bytes   array of bytes to write
+     * @param value   value to write
      */
+
     public void set(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
       setCount++;
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
@@ -486,7 +507,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     }
 
     <#if type.major == "VarLen">
-    private void fillEmpties(int index){
+    private void fillEmpties(int index) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       for (int i = lastSet; i < index; i++) {
         valuesMutator.setSafe(i + 1, emptyByteArray);
@@ -502,27 +523,29 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       values.getMutator().setValueLengthSafe(index, length);
       lastSet = index;
     }
-    </#if>
 
     public void setSafe(int index, byte[] value, int start, int length) {
-      <#if type.major != "VarLen">
-      throw new UnsupportedOperationException();
-      <#else>
-      if (index > lastSet + 1) {
+       if (index > lastSet + 1) {
         fillEmpties(index);
       }
 
       bits.getMutator().setSafe(index, 1);
       values.getMutator().setSafe(index, value, start, length);
       setCount++;
-      <#if type.major == "VarLen">lastSet = index;</#if>
-      </#if>
+      lastSet = index;
+    }
+
+    public void setScalar(int index, byte[] value, int start, int length) throws VectorOverflowException {
+      if (index > lastSet + 1) {
+        fillEmpties(index); // Filling empties cannot overflow the vector
+      }
+      values.getMutator().setScalar(index, value, start, length);
+      bits.getMutator().setSafe(index, 1);
+      setCount++;
+      lastSet = index;
     }
 
     public void setSafe(int index, ByteBuffer value, int start, int length) {
-      <#if type.major != "VarLen">
-      throw new UnsupportedOperationException();
-      <#else>
       if (index > lastSet + 1) {
         fillEmpties(index);
       }
@@ -530,23 +553,38 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       bits.getMutator().setSafe(index, 1);
       values.getMutator().setSafe(index, value, start, length);
       setCount++;
-      <#if type.major == "VarLen">lastSet = index;</#if>
-      </#if>
+      lastSet = index;
     }
 
-    public void setNull(int index){
+    public void setScalar(int index, DrillBuf value, int start, int length) throws VectorOverflowException {
+      if (index > lastSet + 1) {
+        fillEmpties(index); // Filling empties cannot overflow the vector
+      }
+
+      values.getMutator().setScalar(index, value, start, length);
+      bits.getMutator().setSafe(index, 1);
+      setCount++;
+      lastSet = index;
+    }
+
+    </#if>
+    public void setNull(int index) {
       bits.getMutator().setSafe(index, 0);
     }
 
-    public void setSkipNull(int index, ${minor.class}Holder holder){
+    public void setSkipNull(int index, ${minor.class}Holder holder) {
       values.getMutator().set(index, holder);
     }
 
-    public void setSkipNull(int index, Nullable${minor.class}Holder holder){
+    public void setSkipNull(int index, Nullable${minor.class}Holder holder) {
       values.getMutator().set(index, holder);
     }
 
-    public void set(int index, Nullable${minor.class}Holder holder){
+    public void setNullBounded(int index) throws VectorOverflowException {
+      bits.getMutator().setScalar(index, 0);
+    }
+
+    public void set(int index, Nullable${minor.class}Holder holder) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       for (int i = lastSet + 1; i < index; i++) {
@@ -558,7 +596,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
-    public void set(int index, ${minor.class}Holder holder){
+    public void set(int index, ${minor.class}Holder holder) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       for (int i = lastSet + 1; i < index; i++) {
@@ -575,7 +613,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     }
 
     <#assign fields = minor.fields!type.fields />
-    public void set(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ){
+    public void set(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       for (int i = lastSet + 1; i < index; i++) {
@@ -593,11 +631,22 @@ public final class ${className} extends BaseDataValueVector implements <#if type
         fillEmpties(index);
       }
       </#if>
-
       bits.getMutator().setSafe(index, isSet);
       values.getMutator().setSafe(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
       setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
+   }
+
+    public void setScalar(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) throws VectorOverflowException {
+      <#if type.major == "VarLen">
+      if (index > lastSet + 1) {
+        fillEmpties(index);
+      }
+      </#if>
+      values.getMutator().setScalar(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
+      bits.getMutator().setSafe(index, isSet);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
     public void setSafe(int index, Nullable${minor.class}Holder value) {
@@ -612,6 +661,18 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
+    public void setScalar(int index, Nullable${minor.class}Holder value) throws VectorOverflowException {
+      <#if type.major == "VarLen">
+      if (index > lastSet + 1) {
+        fillEmpties(index);
+      }
+      </#if>
+      values.getMutator().setScalar(index, value);
+      bits.getMutator().setSafe(index, value.isSet);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
+    }
+
     public void setSafe(int index, ${minor.class}Holder value) {
       <#if type.major == "VarLen">
       if (index > lastSet + 1) {
@@ -624,6 +685,18 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
+    public void setScalar(int index, ${minor.class}Holder value) throws VectorOverflowException {
+      <#if type.major == "VarLen">
+      if (index > lastSet + 1) {
+        fillEmpties(index);
+      }
+      </#if>
+      values.getMutator().setScalar(index, value);
+      bits.getMutator().setSafe(index, 1);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
+    }
+
     <#if !(type.major == "VarLen" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense" || minor.class == "Interval" || minor.class == "IntervalDay")>
     public void setSafe(int index, ${minor.javaType!type.javaType} value) {
       <#if type.major == "VarLen">
@@ -636,6 +709,17 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       setCount++;
     }
 
+    public void setScalar(int index, ${minor.javaType!type.javaType} value) throws VectorOverflowException {
+      <#if type.major == "VarLen">
+      if (index > lastSet + 1) {
+        fillEmpties(index);
+      }
+      </#if>
+      values.getMutator().setScalar(index, value);
+      bits.getMutator().setSafe(index, 1);
+      setCount++;
+    }
+
     </#if>
     <#if minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse">
     public void set(int index, BigDecimal value) {
@@ -650,6 +734,12 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       setCount++;
     }
 
+    public void setScalar(int index, BigDecimal value) throws VectorOverflowException {
+      values.getMutator().setScalar(index, value);
+      bits.getMutator().setSafe(index, 1);
+      setCount++;
+    }
+
     </#if>
     @Override
     public void setValueCount(int valueCount) {
@@ -674,6 +764,17 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       setCount = 0;
       <#if type.major = "VarLen">lastSet = -1;</#if>
     }
+
+    // For nullable vectors, exchanging buffers (done elsewhere)
+    // requires also exchanging mutator state (done here.)
+
+    @Override
+    public void exchange(ValueVector.Mutator other) {
+      final Mutator target = (Mutator) other;
+      int temp = setCount;
+      setCount = target.setCount;
+      target.setCount = temp;
+    }
   }
 }
 </#list>

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
index 159a8e7..9780b7d 100644
--- a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -36,10 +36,10 @@ package org.apache.drill.exec.vector;
 <#include "/@includes/vv_imports.ftl" />
 
 /**
- * Repeated${minor.class} implements a vector with multple values per row (e.g. JSON array or
- * repeated protobuf field).  The implementation uses two additional value vectors; one to convert
- * the index offset to the underlying element offset, and another to store the number of values
- * in the vector.
+ * Repeated${minor.class} implements a vector with multiple values per row (e.g. JSON array or
+ * repeated protobuf field).  The implementation uses an additional value vectors to convert
+ * the index offset to the underlying element offset. The count of values comes from subtracting
+ * two successive offsets.
  *
  * NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker.
  */
@@ -178,11 +178,16 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
     }
   }
 
+  @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    copyFromSafe(fromIndex, toIndex, (Repeated${minor.class}Vector) from);
+  }
+
   public boolean allocateNewSafe() {
-    /* boolean to keep track if all the memory allocation were successful
+    /* boolean to keep track if all the memory allocations were successful.
      * Used in the case of composite vectors when we need to allocate multiple
      * buffers for multiple vectors. If one of the allocations failed we need to
-     * clear all the memory that we allocated
+     * clear all the memory that we allocated.
      */
     boolean success = false;
     try {
@@ -239,12 +244,6 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
   @Override
   public void allocateNew(int valueCount, int innerValueCount) {
     clear();
-    /* boolean to keep track if all the memory allocation were successful
-     * Used in the case of composite vectors when we need to allocate multiple
-     * buffers for multiple vectors. If one of the allocations failed we need to//
-     * clear all the memory that we allocated
-     */
-    boolean success = false;
     try {
       offsets.allocateNew(valueCount + 1);
       values.allocateNew(innerValueCount);
@@ -258,9 +257,9 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
 
   </#if>
   // This is declared a subclass of the accessor declared inside of FixedWidthVector, this is also used for
-  // variable length vectors, as they should ahve consistent interface as much as possible, if they need to diverge
+  // variable length vectors, as they should have a consistent interface as much as possible, if they need to diverge
   // in the future, the interface shold be declared in the respective value vector superclasses for fixed and variable
-  // and we should refer to each in the generation template
+  // and we should refer to each in the generation template.
   public final class Accessor extends BaseRepeatedValueVector.BaseRepeatedAccessor {
     @Override
     public List<${friendlyType}> getObject(int index) {
@@ -281,7 +280,7 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
     }
 
     /**
-     * Get a value for the given record.  Each element in the repeated field is accessed by
+     * Get a value for the given record. Each element in the repeated field is accessed by
      * the positionIndex param.
      *
      * @param  index           record containing the repeated field
@@ -329,7 +328,7 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
      * @param value   value to add to the given row
      */
     public void add(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
-      int nextOffset = offsets.getAccessor().get(index+1);
+      final int nextOffset = offsets.getAccessor().get(index+1);
       values.getMutator().set(nextOffset, value);
       offsets.getMutator().set(index+1, nextOffset+1);
     }
@@ -339,12 +338,25 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
       addSafe(index, bytes, 0, bytes.length);
     }
 
+    public void addEntry(int index, byte[] bytes) throws VectorOverflowException {
+      addEntry(index, bytes, 0, bytes.length);
+    }
+
     public void addSafe(int index, byte[] bytes, int start, int length) {
       final int nextOffset = offsets.getAccessor().get(index+1);
       values.getMutator().setSafe(nextOffset, bytes, start, length);
       offsets.getMutator().setSafe(index+1, nextOffset+1);
     }
 
+    public void addEntry(int index, byte[] bytes, int start, int length) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      final int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().setArrayItem(nextOffset, bytes, start, length);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
+    }
+
     <#else>
     public void addSafe(int index, ${minor.javaType!type.javaType} srcValue) {
       final int nextOffset = offsets.getAccessor().get(index+1);
@@ -352,6 +364,15 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
       offsets.getMutator().setSafe(index+1, nextOffset+1);
     }
 
+    public void addEntry(int index, ${minor.javaType!type.javaType} srcValue) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      final int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().setArrayItem(nextOffset, srcValue);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
+    }
+
     </#if>
     public void setSafe(int index, Repeated${minor.class}Holder h) {
       final ${minor.class}Holder ih = new ${minor.class}Holder();
@@ -364,43 +385,104 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
     }
 
     public void addSafe(int index, ${minor.class}Holder holder) {
-      int nextOffset = offsets.getAccessor().get(index+1);
+      final int nextOffset = offsets.getAccessor().get(index+1);
       values.getMutator().setSafe(nextOffset, holder);
       offsets.getMutator().setSafe(index+1, nextOffset+1);
     }
 
+    public void addEntry(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      final int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().setArrayItem(nextOffset, holder);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
+    }
+
     public void addSafe(int index, Nullable${minor.class}Holder holder) {
       final int nextOffset = offsets.getAccessor().get(index+1);
       values.getMutator().setSafe(nextOffset, holder);
       offsets.getMutator().setSafe(index+1, nextOffset+1);
     }
 
+    public void addEntry(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      final int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().setArrayItem(nextOffset, holder);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
+    }
+
+    /**
+     * Backfill missing offsets from the given last written position to the
+     * given current write position. Used by the "new" size-safe column
+     * writers to allow skipping values. The <tt>set()</tt> and <tt>setSafe()</tt>
+     * <b>do not</b> fill empties. See DRILL-5529.
+     * @param lastWrite the position of the last valid write: the offset
+     * to be copied forward
+     * @param index the current write position to be initialized
+     */
+
+    public void fillEmptiesBounded(int lastWrite, int index)
+            throws VectorOverflowException {
+      if (index >= UInt4Vector.MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      // If last write was 2, offsets are [0, 3, 6]
+      // If next write is 4, offsets must be: [0, 3, 6, 6, 6]
+      // Remember the offsets are one more than row count.
+      final int fillOffset = offsets.getAccessor().get(lastWrite+1);
+      final UInt4Vector.Mutator offsetMutator = offsets.getMutator();
+      for (int i = lastWrite; i < index; i++) {
+        offsetMutator.setSafe(i + 1, fillOffset);
+      }
+    }
+
     <#if (fields?size > 1) && !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
-    public void addSafe(int arrayIndex, <#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) {
-      int nextOffset = offsets.getAccessor().get(arrayIndex+1);
+    public void addSafe(int rowIndex, <#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) {
+      final int nextOffset = offsets.getAccessor().get(rowIndex+1);
       values.getMutator().setSafe(nextOffset, <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
-      offsets.getMutator().setSafe(arrayIndex+1, nextOffset+1);
+      offsets.getMutator().setSafe(rowIndex+1, nextOffset+1);
+    }
+
+    public void addEntry(int rowIndex, <#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) throws VectorOverflowException {
+      if (rowIndex >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      final int nextOffset = offsets.getAccessor().get(rowIndex+1);
+      values.getMutator().setArrayItem(nextOffset, <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
+      offsets.getMutator().setSafe(rowIndex+1, nextOffset+1);
     }
 
     </#if>
     <#if minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse">
     public void addSafe(int index, BigDecimal value) {
-      int nextOffset = offsets.getAccessor().get(index+1);
+      final int nextOffset = offsets.getAccessor().get(index+1);
       values.getMutator().setSafe(nextOffset, value);
       offsets.getMutator().setSafe(index+1, nextOffset+1);
     }
 
+    public void addEntry(int index, BigDecimal value) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      final int nextOffset = offsets.getAccessor().get(index+1);
+      values.getMutator().setArrayItem(nextOffset, value);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
+    }
+
     </#if>
     protected void add(int index, ${minor.class}Holder holder) {
-      int nextOffset = offsets.getAccessor().get(index+1);
+      final int nextOffset = offsets.getAccessor().get(index+1);
       values.getMutator().set(nextOffset, holder);
       offsets.getMutator().set(index+1, nextOffset+1);
     }
 
     public void add(int index, Repeated${minor.class}Holder holder) {
 
-      ${minor.class}Vector.Accessor accessor = holder.vector.getAccessor();
-      ${minor.class}Holder innerHolder = new ${minor.class}Holder();
+      final ${minor.class}Vector.Accessor accessor = holder.vector.getAccessor();
+      final ${minor.class}Holder innerHolder = new ${minor.class}Holder();
 
       for(int i = holder.start; i < holder.end; i++) {
         accessor.get(i, innerHolder);

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java
index 93854e7..207e55a 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -80,6 +80,7 @@ public class UnionVector implements ValueVector {
     this.callBack = callBack;
   }
 
+  @Override
   public BufferAllocator getAllocator() {
     return allocator;
   }
@@ -248,6 +249,11 @@ public class UnionVector implements ValueVector {
     copyFrom(inIndex, outIndex, from);
   }
 
+  @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    copyFromSafe(fromIndex, toIndex, (UnionVector) from);
+  }
+
   public ValueVector addVector(ValueVector v) {
     String name = v.getField().getType().getMinorType().name().toLowerCase();
     MajorType type = v.getField().getType();
@@ -485,4 +491,9 @@ public class UnionVector implements ValueVector {
     @Override
     public void generateTestData(int values) { }
   }
+
+  @Override
+  public void exchange(ValueVector other) {
+    throw new UnsupportedOperationException("Union vector does not yet support exchange()");
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 581a9f8..9a9e178 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -42,20 +42,23 @@ package org.apache.drill.exec.vector;
  * ${minor.class}Vector implements a vector of variable width values.  Elements in the vector
  * are accessed by position from the logical start of the vector.  A fixed width offsetVector
  * is used to convert an element's position to it's offset from the start of the (0-based)
- * DrillBuf.  Size is inferred by adjacent elements.
- *   The width of each element is ${type.width} byte(s)
- *   The equivalent Java primitive is '${minor.javaType!type.javaType}'
- *
+ * DrillBuf. Size is inferred from adjacent elements.
+ * <ul>
+ * <li>The width of each element is ${type.width} byte(s). Note that the actual width is
+ * variable, this width is used as a guess for certain calculations.</li>
+ * <li>The equivalent Java primitive is '${minor.javaType!type.javaType}'<li>
+ * </ul>
  * NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker.
  */
-public final class ${minor.class}Vector extends BaseDataValueVector implements VariableWidthVector{
+
+public final class ${minor.class}Vector extends BaseDataValueVector implements VariableWidthVector {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class);
 
   private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
-  private static final int INITIAL_BYTE_COUNT = 4096 * DEFAULT_RECORD_BYTE_COUNT;
+  private static final int INITIAL_BYTE_COUNT = Math.min(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT, MAX_BUFFER_SIZE);
   private static final int MIN_BYTE_COUNT = 4096;
-
   public final static String OFFSETS_VECTOR_NAME = "$offsets$";
+
   private final MaterializedField offsetsField = MaterializedField.create(OFFSETS_VECTOR_NAME, Types.required(MinorType.UINT4));
   private final UInt${type.width}Vector offsetVector = new UInt${type.width}Vector(offsetsField, allocator);
   private final FieldReader reader = new ${minor.class}ReaderImpl(${minor.class}Vector.this);
@@ -239,6 +242,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   }
 
   @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    copyFromSafe(fromIndex, toIndex, (${minor.class}Vector) from);
+  }
+
+  @Override
   public int getAllocatedByteCount() {
     return offsetVector.getAllocatedByteCount() + super.getAllocatedByteCount();
   }
@@ -406,6 +414,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     return mutator;
   }
 
+  @Override
+  public void exchange(ValueVector other) {
+    super.exchange(other);
+    ${minor.class}Vector target = (${minor.class}Vector) other;
+    offsetVector.exchange(target.offsetVector);
+  }
+
   public final class Accessor extends BaseValueVector.BaseAccessor implements VariableWidthAccessor {
     final UInt${type.width}Vector.Accessor oAccessor = offsetVector.getAccessor();
     public long getStartEnd(int index){
@@ -519,6 +534,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       }
     }
 
+    public void setScalar(int index, byte[] bytes) throws VectorOverflowException {
+      setScalar(index, bytes, 0, bytes.length);
+    }
+
     /**
      * Set the variable length element at the specified index to the supplied byte array.
      *
@@ -537,7 +556,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public void setSafe(int index, ByteBuffer bytes, int start, int length) {
       assert index >= 0;
 
-      int currentOffset = offsetVector.getAccessor().get(index);
+      final int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().setSafe(index + 1, currentOffset + length);
       try {
         data.setBytes(currentOffset, bytes, start, length);
@@ -549,6 +568,23 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       }
     }
 
+    public void setScalar(int index, DrillBuf bytes, int start, int length) throws VectorOverflowException {
+      assert index >= 0;
+
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      int currentOffset = offsetVector.getAccessor().get(index);
+      final int newSize = currentOffset + length;
+      if (newSize > MAX_BUFFER_SIZE) {
+        throw new VectorOverflowException();
+      }
+      while (! data.setBytesBounded(currentOffset, bytes, start, length)) {
+        reAlloc();
+      }
+      offsetVector.getMutator().setSafe(index + 1, newSize);
+    }
+
     public void setSafe(int index, byte[] bytes, int start, int length) {
       assert index >= 0;
 
@@ -565,6 +601,28 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       }
     }
 
+    public void setScalar(int index, byte[] bytes, int start, int length) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setArrayItem(index, bytes, start, length);
+    }
+
+    public void setArrayItem(int index, byte[] bytes, int start, int length) throws VectorOverflowException {
+      assert index >= 0;
+
+      final int currentOffset = offsetVector.getAccessor().get(index);
+      final int newSize = currentOffset + length;
+      if (newSize > MAX_BUFFER_SIZE) {
+        throw new VectorOverflowException();
+      }
+
+      while (! data.setBytesBounded(currentOffset, bytes, start, length)) {
+        reAlloc();
+      }
+      offsetVector.getMutator().setSafe(index + 1, newSize);
+    }
+
     @Override
     public void setValueLengthSafe(int index, int length) {
       final int offset = offsetVector.getAccessor().get(index);
@@ -574,12 +632,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       offsetVector.getMutator().setSafe(index + 1, offsetVector.getAccessor().get(index) + length);
     }
 
-
-    public void setSafe(int index, int start, int end, DrillBuf buffer){
+    public void setSafe(int index, int start, int end, DrillBuf buffer) {
       final int len = end - start;
       final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
 
-      offsetVector.getMutator().setSafe( index+1,  outputStart + len);
+      offsetVector.getMutator().setSafe(index+1,  outputStart + len);
       try{
         buffer.getBytes(start, data, outputStart, len);
       } catch (IndexOutOfBoundsException e) {
@@ -588,17 +645,42 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
         }
         buffer.getBytes(start, data, outputStart, len);
       }
+    }
+
+    public void setScalar(int index, int start, int end, DrillBuf buffer) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setArrayItem(index, start, end, buffer);
+    }
 
+    public void setArrayItem(int index, int start, int end, DrillBuf buffer) throws VectorOverflowException {
+      final int len = end - start;
+      final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      final int newSize = outputStart + len;
+      if (newSize > MAX_BUFFER_SIZE) {
+        throw new VectorOverflowException();
+      }
+
+      offsetVector.getMutator().setSafe(index+1, newSize);
+      try{
+        buffer.getBytes(start, data, outputStart, len);
+      } catch (IndexOutOfBoundsException e) {
+        while (data.capacity() < newSize) {
+          reAlloc();
+        }
+        buffer.getBytes(start, data, outputStart, len);
+      }
     }
 
-    public void setSafe(int index, Nullable${minor.class}Holder holder){
+    public void setSafe(int index, Nullable${minor.class}Holder holder) {
       assert holder.isSet == 1;
 
       final int start = holder.start;
       final int end =   holder.end;
       final int len = end - start;
 
-      int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
 
       try {
         holder.buffer.getBytes(start, data, outputStart, len);
@@ -608,15 +690,45 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
         }
         holder.buffer.getBytes(start, data, outputStart, len);
       }
-      offsetVector.getMutator().setSafe( index+1,  outputStart + len);
+      offsetVector.getMutator().setSafe(index+1,  outputStart + len);
+    }
+
+    public void setScalar(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setArrayItem(index, holder);
     }
 
-    public void setSafe(int index, ${minor.class}Holder holder){
+    public void setArrayItem(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      assert holder.isSet == 1;
+
       final int start = holder.start;
       final int end =   holder.end;
       final int len = end - start;
+
       final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      final int newSize = outputStart + len;
+      if (newSize > MAX_BUFFER_SIZE) {
+        throw new VectorOverflowException();
+      }
+
+      try {
+        holder.buffer.getBytes(start, data, outputStart, len);
+      } catch (IndexOutOfBoundsException e) {
+        while (data.capacity() < newSize) {
+          reAlloc();
+        }
+        holder.buffer.getBytes(start, data, outputStart, len);
+      }
+      offsetVector.getMutator().setSafe(index+1, newSize);
+    }
 
+    public void setSafe(int index, ${minor.class}Holder holder) {
+      final int start = holder.start;
+      final int end =   holder.end;
+      final int len = end - start;
+      final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
 
       try {
         holder.buffer.getBytes(start, data, outputStart, len);
@@ -629,6 +741,68 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       offsetVector.getMutator().setSafe( index+1,  outputStart + len);
     }
 
+    public void setScalar(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setArrayItem(index, holder);
+   }
+
+    public void setArrayItem(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      final int start = holder.start;
+      final int end =   holder.end;
+      final int len = end - start;
+      final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      final int newSize = outputStart + len;
+      if (newSize > MAX_BUFFER_SIZE) {
+        throw new VectorOverflowException();
+      }
+
+      try {
+        holder.buffer.getBytes(start, data, outputStart, len);
+      } catch (IndexOutOfBoundsException e) {
+        while(data.capacity() < newSize) {
+          reAlloc();
+        }
+        holder.buffer.getBytes(start, data, outputStart, len);
+      }
+      offsetVector.getMutator().setSafe( index+1, newSize);
+    }
+
+    /**
+     * Backfill missing offsets from the given last written position to the
+     * given current write position. Used by the "new" size-safe column
+     * writers to allow skipping values. The <tt>set()</tt> and <tt>setSafe()</tt>
+     * <b>do not</b> fill empties. See DRILL-5529.
+     * @param lastWrite the position of the last valid write: the offset
+     * to be copied forward
+     * @param index the current write position filling occurs up to,
+     * but not including, this position
+     * @throws VectorOverflowException if the item was written, false if the index would
+     * overfill the vector
+     */
+
+    public void fillEmptiesBounded(int lastWrite, int index)
+            throws VectorOverflowException {
+
+      // Index is the next write index, which might be "virtual",
+      // that is, past the last row at EOF. This check only protects
+      // the actual data written here, which is up to index-1.
+
+      if (index > UInt4Vector.MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      // If last write was 2, offsets are [0, 3, 6]
+      // If next write is 4, offsets must be: [0, 3, 6, 6, 6]
+      // Remember the offsets are one more than row count.
+
+      final int fillOffset = offsetVector.getAccessor().get(lastWrite+1);
+      final UInt4Vector.Mutator offsetMutator = offsetVector.getMutator();
+      for (int i = lastWrite; i < index; i++) {
+        offsetMutator.setSafe(i + 1, fillOffset);
+      }
+    }
+
     protected void set(int index, int start, int length, DrillBuf buffer){
       assert index >= 0;
       final int currentOffset = offsetVector.getAccessor().get(index);
@@ -651,6 +825,19 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       data.setBytes(currentOffset, holder.buffer, holder.start, length);
     }
 
+  <#if (minor.class == "VarChar")>
+    public void setScalar(int index, String value) throws VectorOverflowException {
+      if (index >= MAX_ROW_COUNT) {
+        throw new VectorOverflowException();
+      }
+      // Treat a null string as an empty string.
+      if (value != null) {
+        byte encoded[] = value.getBytes(Charsets.UTF_8);
+        setScalar(index, encoded, 0, encoded.length);
+      }
+    }
+
+  </#if>
     @Override
     public void setValueCount(int valueCount) {
       final int currentByteCapacity = getByteCapacity();
@@ -685,7 +872,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
   }
 }
-
 </#if> <#-- type.major -->
 </#list>
 </#list>

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 4def5b8..5ce58ed 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,7 +24,7 @@ import org.apache.drill.exec.record.MaterializedField;
 
 public abstract class BaseDataValueVector extends BaseValueVector {
 
-  protected final static byte[] emptyByteArray = new byte[]{}; // Nullable vectors use this
+  protected final static byte[] emptyByteArray = new byte[0]; // Nullable vectors use this
 
   protected DrillBuf data;
 
@@ -78,9 +78,7 @@ public abstract class BaseDataValueVector extends BaseValueVector {
     return data.writerIndex();
   }
 
-  public DrillBuf getBuffer() {
-    return data;
-  }
+  public DrillBuf getBuffer() { return data; }
 
   /**
    * This method has a similar effect of allocateNew() without actually clearing and reallocating
@@ -89,7 +87,16 @@ public abstract class BaseDataValueVector extends BaseValueVector {
   public void reset() {}
 
   @Override
-  public int getAllocatedByteCount() {
-    return data.capacity();
+  public int getAllocatedByteCount() { return data.capacity(); }
+
+  @Override
+  public void exchange(ValueVector other) {
+    BaseDataValueVector target = (BaseDataValueVector) other;
+    DrillBuf temp = data;
+    data = target.data;
+    target.data = temp;
+    getReader().reset();
+    getMutator().exchange(target.getMutator());
+    // No state in an Accessor to reset
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index a0d5f65..f4a5847 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,6 +32,13 @@ import org.apache.drill.exec.record.TransferPair;
 public abstract class BaseValueVector implements ValueVector {
 //  private static final Logger logger = LoggerFactory.getLogger(BaseValueVector.class);
 
+  /**
+   * Physical maximum allocation. This is the value prior to Drill 1.11.
+   * This size causes memory fragmentation. Please use
+   * {@link ValueVector#MAX_BUFFER_SIZE} in new code.
+   */
+
+  @Deprecated
   public static final int MAX_ALLOCATION_SIZE = Integer.MAX_VALUE;
   public static final int INITIAL_VALUE_ALLOCATION = 4096;
 
@@ -101,6 +108,10 @@ public abstract class BaseValueVector implements ValueVector {
     //TODO: consider making mutator stateless(if possible) on another issue.
     @Override
     public void reset() {}
+
+    // TODO: If mutator becomes stateless, remove this method.
+    @Override
+    public void exchange(ValueVector.Mutator other) { }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
index a6c0cea..0062e77 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -39,6 +39,38 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
 public final class BitVector extends BaseDataValueVector implements FixedWidthVector {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitVector.class);
 
+  /**
+   * Width of each fixed-width value.
+   */
+
+  public static final int VALUE_WIDTH = 1;
+
+  /**
+   * Maximum number of values that this fixed-width vector can hold
+   * and stay below the maximum vector size limit. This is the limit
+   * enforced when the vector is used to hold values in a repeated
+   * vector.
+   */
+
+  public static final int MAX_CAPACITY = MAX_BUFFER_SIZE / VALUE_WIDTH;
+
+  /**
+   * Maximum number of values that this fixed-width vector can hold
+   * and stay below the maximum vector size limit and/or stay below
+   * the maximum item count. This lis the limit enforced when the
+   * vector is used to hold required or nullable values.
+   */
+
+  public static final int MAX_COUNT = Math.min(MAX_ROW_COUNT, MAX_CAPACITY);
+
+  /**
+   * Actual maximum vector size, in bytes, given the number of fixed-width
+   * values that either fit in the maximum overall vector size, or that
+   * is no larger than the maximum vector item count.
+   */
+
+  public static final int NET_MAX_SIZE = VALUE_WIDTH * MAX_COUNT;
+
   private final FieldReader reader = new BitReaderImpl(BitVector.this);
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
@@ -72,7 +104,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
 
   @Override
   public int getValueCapacity() {
-    return (int)Math.min((long)Integer.MAX_VALUE, data.capacity() * 8L);
+    return (int) Math.min((long)Integer.MAX_VALUE, data.capacity() * 8L);
   }
 
   private int getByteIndex(int index) {
@@ -183,6 +215,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    copyFrom(fromIndex, toIndex, (BitVector) from);
+  }
+
+  @Override
   public void load(SerializedField metadata, DrillBuf buffer) {
     Preconditions.checkArgument(this.field.getPath().equals(metadata.getNamePart().getName()), "The field %s doesn't match the provided metadata %s.", this.field, metadata);
     final int valueCount = metadata.getValueCount();
@@ -221,7 +258,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     return new TransferImpl((BitVector) to);
   }
 
-
   public void transferTo(BitVector target) {
     target.clear();
     if (target.data != null) {
@@ -401,6 +437,20 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
       set(index, value);
     }
 
+    public void setScalar(int index, int value) throws VectorOverflowException {
+      if (index >= MAX_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, value);
+    }
+
+    public void setArrayItem(int index, int value) throws VectorOverflowException {
+      if (index >= MAX_CAPACITY) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, value);
+    }
+
     public void setSafe(int index, BitHolder holder) {
       while(index >= getValueCapacity()) {
         reAlloc();
@@ -408,6 +458,20 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
       set(index, holder.value);
     }
 
+    public void setScalar(int index, BitHolder holder) throws VectorOverflowException {
+      if (index >= MAX_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, holder);
+    }
+
+    public void setArrayItem(int index, BitHolder holder) throws VectorOverflowException {
+      if (index >= MAX_CAPACITY) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, holder);
+    }
+
     public void setSafe(int index, NullableBitHolder holder) {
       while(index >= getValueCapacity()) {
         reAlloc();
@@ -415,6 +479,20 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
       set(index, holder.value);
     }
 
+    public void setScalar(int index, NullableBitHolder holder) throws VectorOverflowException {
+      if (index >= MAX_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, holder);
+    }
+
+    public void setArrayItem(int index, NullableBitHolder holder) throws VectorOverflowException {
+      if (index >= MAX_CAPACITY) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, holder);
+    }
+
     @Override
     public final void setValueCount(int valueCount) {
       int currentValueCapacity = getValueCapacity();
@@ -441,7 +519,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
       }
       setValueCount(values);
     }
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
index c2781eb..a9a1631 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/FixedWidthVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.vector;
 
 
-public interface FixedWidthVector extends ValueVector{
+public interface FixedWidthVector extends ValueVector {
 
   /**
    * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
@@ -31,5 +31,4 @@ public interface FixedWidthVector extends ValueVector{
  * Zero out the underlying buffer backing this vector.
  */
   void zeroVector();
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index f69dc98..bd8566d 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,11 +32,12 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 public class ObjectVector extends BaseValueVector {
+  private final int ALLOCATION_SIZE = 4096;
+
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
   private int maxCount = 0;
   private int count = 0;
-  private int allocationSize = 4096;
 
   private List<Object[]> objectArrayList = new ArrayList<>();
 
@@ -45,8 +46,8 @@ public class ObjectVector extends BaseValueVector {
   }
 
   public void addNewArray() {
-    objectArrayList.add(new Object[allocationSize]);
-    maxCount += allocationSize;
+    objectArrayList.add(new Object[ALLOCATION_SIZE]);
+    maxCount += ALLOCATION_SIZE;
   }
 
   @Override
@@ -57,11 +58,11 @@ public class ObjectVector extends BaseValueVector {
   public final class Mutator implements ValueVector.Mutator {
 
     public void set(int index, Object obj) {
-      int listOffset = index / allocationSize;
+      int listOffset = index / ALLOCATION_SIZE;
       if (listOffset >= objectArrayList.size()) {
         addNewArray();
       }
-      objectArrayList.get(listOffset)[index % allocationSize] = obj;
+      objectArrayList.get(listOffset)[index % ALLOCATION_SIZE] = obj;
     }
 
     public boolean setSafe(int index, long value) {
@@ -94,6 +95,9 @@ public class ObjectVector extends BaseValueVector {
     @Override
     public void generateTestData(int values) {
     }
+
+    @Override
+    public void exchange(ValueVector.Mutator other) { }
   }
 
   @Override
@@ -161,6 +165,11 @@ public class ObjectVector extends BaseValueVector {
   }
 
   @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    throw new UnsupportedOperationException("ObjectVector does not support this");
+  }
+
+  @Override
   public int getValueCapacity() {
     return maxCount;
   }
@@ -198,11 +207,11 @@ public class ObjectVector extends BaseValueVector {
   public final class Accessor extends BaseAccessor {
     @Override
     public Object getObject(int index) {
-      int listOffset = index / allocationSize;
+      int listOffset = index / ALLOCATION_SIZE;
       if (listOffset >= objectArrayList.size()) {
         addNewArray();
       }
-      return objectArrayList.get(listOffset)[index % allocationSize];
+      return objectArrayList.get(listOffset)[index % ALLOCATION_SIZE];
     }
 
     @Override
@@ -230,4 +239,18 @@ public class ObjectVector extends BaseValueVector {
     // Values not stored in direct memory?
     return 0;
   }
+
+  @Override
+  public void exchange(ValueVector other) {
+    ObjectVector target = (ObjectVector) other;
+    List<Object[]> tempList = objectArrayList;
+    objectArrayList = target.objectArrayList;
+    target.objectArrayList = tempList;
+    int tempCount = count;
+    count = target.count;
+    target.count = tempCount;
+    tempCount = maxCount;
+    maxCount = target.maxCount;
+    target.maxCount = tempCount;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index f4c7935..3bc43fa 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -40,14 +40,17 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
  * There are a few "rules" around vectors:
  *
  * <ul>
- *   <li>values need to be written in order (e.g. index 0, 1, 2, 5)</li>
- *   <li>null vectors start with all values as null before writing anything</li>
- *   <li>for variable width types, the offset vector should be all zeros before writing</li>
- *   <li>you must call setValueCount before a vector can be read</li>
- *   <li>you should never write to a vector once it has been read.</li>
+ *   <li>Values need to be written in order (e.g. index 0, 1, 2, 5).</li>
+ *   <li>Null vectors start with all values as null before writing anything.</li>
+ *   <li>For variable width types, the offset vector should be all zeros before writing.</li>
+ *   <li>You must call setValueCount before a vector can be read.</li>
+ *   <li>You should never write to a vector once it has been read.</li>
+ *   <li>Vectors may not grow larger than the number of bytes specified
+ *   in {@link #MAX_BUFFER_SIZE} to prevent memory fragmentation. Use the
+ *   <tt>setBounded()</tt> methods in the mutator to enforce this rule.</li>
  * </ul>
  *
- * Please note that the current implementation doesn't enfore those rules, hence we may find few places that
+ * Please note that the current implementation doesn't enforce those rules, hence we may find few places that
  * deviate from these rules (e.g. offset vectors in Variable Length and Repeated vector)
  *
  * This interface "should" strive to guarantee this order of operation:
@@ -56,6 +59,29 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
  * </blockquote>
  */
 public interface ValueVector extends Closeable, Iterable<ValueVector> {
+
+  /**
+   * Maximum allowed size of the buffer backing a value vector.
+   */
+
+  int MAX_BUFFER_SIZE = VectorUtils.maxSize();
+
+  /**
+   * Debug-time system option that artificially limits vector lengths
+   * for testing. Must be set prior to the first reference to this
+   * class. (Made deliberately difficult to prevent misuse...)
+   */
+
+  String MAX_BUFFER_SIZE_KEY = "drill.max_vector";
+
+  /**
+   * Maximum allowed row count in a vector. Repeated vectors
+   * may have more items, but can have no more than this number
+   * or arrays. Limited by 2-byte length in SV2: 65536 = 2<sup>16</sup>.
+   */
+
+  int MAX_ROW_COUNT = Character.MAX_VALUE + 1;
+
   /**
    * Allocate new buffers. ValueVector implements logic to determine how much to allocate.
    * @throws OutOfMemoryException Thrown if no memory can be allocated.
@@ -64,7 +90,7 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
 
   /**
    * Allocates new buffers. ValueVector implements logic to determine how much to allocate.
-   * @return Returns true if allocation was succesful.
+   * @return Returns true if allocation was successful.
    */
   boolean allocateNewSafe();
 
@@ -175,6 +201,8 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
    */
   void load(SerializedField metadata, DrillBuf buffer);
 
+  void copyEntry(int toIndex, ValueVector from, int fromIndex);
+
   /**
    * Return the total memory consumed by all buffers within this vector.
    */
@@ -188,6 +216,13 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
   int getPayloadByteCount();
 
   /**
+   * Exchange state with another value vector of the same type.
+   * Used to implement look-ahead writers.
+   */
+
+  void exchange(ValueVector other);
+
+  /**
    * An abstraction that is used to read from this vector instance.
    */
   interface Accessor {
@@ -211,7 +246,7 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
   }
 
   /**
-   * An abstractiong that is used to write into this vector instance.
+   * An abstraction that is used to write into this vector instance.
    */
   interface Mutator {
     /**
@@ -231,5 +266,12 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
      */
     @Deprecated
     void generateTestData(int values);
+
+    /**
+     * Exchanges state with the mutator of another mutator. Used when exchanging
+     * state with another vector.
+     */
+
+    void exchange(Mutator other);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/VectorOverflowException.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/VectorOverflowException.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/VectorOverflowException.java
new file mode 100644
index 0000000..8b54966
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/VectorOverflowException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+/**
+ * Indicates that an attempt to write to a vector overflowed the vector
+ * bounds: either the limit on values or the size of the buffer backing
+ * the vector. This is an expected exception: code must catch it and
+ * properly handle the partially-written, incomplete last row.
+ */
+
+@SuppressWarnings("serial")
+public class VectorOverflowException extends Exception {
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/VectorUtils.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/VectorUtils.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/VectorUtils.java
new file mode 100644
index 0000000..6b29eb2
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/VectorUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+public class VectorUtils {
+
+  /**
+   * Vectors cannot be any larger than the Netty memory allocation
+   * block size.
+   */
+
+  private static final int ABSOLUTE_MAX_SIZE = 16 * 1024 * 1024;
+
+  /**
+   * Minimum size selected to prevent pathological performance if vectors
+   * are limited to an unusably small size. This limit is a judgment call,
+   * not based on any known limits.
+   */
+
+  private static final int ABSOLUTE_MIN_SIZE = 16 * 1024;
+
+  private VectorUtils() { }
+
+  /**
+   * Static function called once per run to compute the maximum
+   * vector size, in bytes. Normally uses the hard-coded limit,
+   * but allows setting a system property to override the limit
+   * for testing. The configured value must be within reasonable
+   * bounds.
+   * @return the maximum vector size, in bytes
+   */
+
+  static int maxSize() {
+    String prop = System.getProperty( ValueVector.MAX_BUFFER_SIZE_KEY );
+    int value = ABSOLUTE_MAX_SIZE;
+    if (prop != null) {
+      try {
+        value = Integer.parseInt(prop);
+        value = Math.max(value, ABSOLUTE_MIN_SIZE);
+        value = Math.min(value, ABSOLUTE_MAX_SIZE);
+      } catch (NumberFormatException e) {
+        // Ignore
+      }
+    }
+    return value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
index 9181f20..9a0b6be 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -53,19 +53,13 @@ public class ZeroVector implements ValueVector {
 
   private final Accessor defaultAccessor = new Accessor() {
     @Override
-    public Object getObject(int index) {
-      return null;
-    }
+    public Object getObject(int index) { return null; }
 
     @Override
-    public int getValueCount() {
-      return 0;
-    }
+    public int getValueCount() { return 0; }
 
     @Override
-    public boolean isNull(int index) {
-      return true;
-    }
+    public boolean isNull(int index) { return true; }
   };
 
   private final Mutator defaultMutator = new Mutator() {
@@ -77,6 +71,9 @@ public class ZeroVector implements ValueVector {
 
     @Override
     public void generateTestData(int values) { }
+
+    @Override
+    public void exchange(Mutator other) { }
   };
 
   public ZeroVector() { }
@@ -88,9 +85,7 @@ public class ZeroVector implements ValueVector {
   public void clear() { }
 
   @Override
-  public MaterializedField getField() {
-    return field;
-  }
+  public MaterializedField getField() { return field; }
 
   @Override
   public TransferPair getTransferPair(BufferAllocator allocator) {
@@ -112,14 +107,10 @@ public class ZeroVector implements ValueVector {
   }
 
   @Override
-  public int getBufferSize() {
-    return 0;
-  }
+  public int getBufferSize() { return 0; }
 
   @Override
-  public int getBufferSizeFor(final int valueCount) {
-    return 0;
-  }
+  public int getBufferSizeFor(final int valueCount) { return 0; }
 
   @Override
   public DrillBuf[] getBuffers(boolean clear) {
@@ -132,9 +123,7 @@ public class ZeroVector implements ValueVector {
   }
 
   @Override
-  public boolean allocateNewSafe() {
-    return true;
-  }
+  public boolean allocateNewSafe() { return true; }
 
   @Override
   public BufferAllocator getAllocator() {
@@ -145,45 +134,35 @@ public class ZeroVector implements ValueVector {
   public void setInitialCapacity(int numRecords) { }
 
   @Override
-  public int getValueCapacity() {
-    return 0;
-  }
+  public int getValueCapacity() { return 0; }
 
   @Override
-  public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
-    return defaultPair;
-  }
+  public TransferPair getTransferPair(String ref, BufferAllocator allocator) { return defaultPair; }
 
   @Override
-  public TransferPair makeTransferPair(ValueVector target) {
-    return defaultPair;
-  }
+  public TransferPair makeTransferPair(ValueVector target) { return defaultPair; }
 
   @Override
-  public Accessor getAccessor() {
-    return defaultAccessor;
-  }
+  public Accessor getAccessor() { return defaultAccessor; }
 
   @Override
-  public Mutator getMutator() {
-    return defaultMutator;
-  }
+  public Mutator getMutator() { return defaultMutator; }
 
   @Override
-  public FieldReader getReader() {
-    return NullReader.INSTANCE;
-  }
+  public FieldReader getReader() { return NullReader.INSTANCE; }
 
   @Override
   public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) { }
 
   @Override
-  public int getAllocatedByteCount() {
-    return 0;
-  }
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) { }
 
   @Override
-  public int getPayloadByteCount() {
-    return 0;
-  }
+  public int getAllocatedByteCount() { return 0; }
+
+  @Override
+  public int getPayloadByteCount() { return 0; }
+
+  @Override
+  public void exchange(ValueVector other) { }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
index 8a54535..5b8f44d 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -209,7 +209,6 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
     vector = v;
   }
 
-
   @Override
   public int getAllocatedByteCount() {
     return offsets.getAllocatedByteCount() + vector.getAllocatedByteCount();
@@ -220,6 +219,13 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
     return offsets.getPayloadByteCount() + vector.getPayloadByteCount();
   }
 
+  @Override
+  public void exchange(ValueVector other) {
+    BaseRepeatedValueVector target = (BaseRepeatedValueVector) other;
+    vector.exchange(target.vector);
+    offsets.exchange(target.offsets);
+  }
+
   public abstract class BaseRepeatedAccessor extends BaseValueVector.BaseAccessor implements RepeatedAccessor {
 
     @Override
@@ -259,6 +265,14 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
       setValueCount(index+1);
     }
 
+    public boolean startNewValueBounded(int index) {
+      if (index >= MAX_ROW_COUNT) {
+        return false;
+      }
+      startNewValue(index);
+      return true;
+    }
+
     @Override
     public void setValueCount(int valueCount) {
       // TODO: populate offset end points

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
index f71baa7..7f0e939 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
@@ -1,5 +1,4 @@
-/*******************************************************************************
-
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -95,6 +94,11 @@ public class ListVector extends BaseRepeatedValueVector {
   }
 
   @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    copyFromSafe(fromIndex, toIndex, (ListVector) from);
+  }
+
+  @Override
   public ValueVector getDataVector() {
     return vector;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index af1ec8e..f755081 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -90,6 +90,11 @@ public class MapVector extends AbstractMapVector {
   }
 
   @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    copyFromSafe(fromIndex, toIndex, (MapVector) from);
+  }
+
+  @Override
   protected boolean supportsDirectRead() {
     return true;
   }
@@ -302,6 +307,13 @@ public class MapVector extends AbstractMapVector {
     return mutator;
   }
 
+  @Override
+  public void exchange(ValueVector other) {
+    // Exchange is used for look-ahead writers, but writers manage
+    // map member vectors directly.
+    throw new UnsupportedOperationException("Exchange() not supported for maps");
+  }
+
   public class Accessor extends BaseValueVector.BaseAccessor {
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index b5c97bf..969c141 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -118,7 +118,6 @@ public class RepeatedListVector extends AbstractContainerVector
       }
     }
 
-
     public class DelegateTransferPair implements TransferPair {
       private final DelegateRepeatedVector target;
       private final TransferPair[] children;
@@ -218,6 +217,10 @@ public class RepeatedListVector extends AbstractContainerVector
       ephPair.copyValueSafe(fromIndex, thisIndex);
     }
 
+    @Override
+    public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+      copyFromSafe(fromIndex, toIndex, (DelegateRepeatedVector) from);
+    }
   }
 
   protected class RepeatedListTransferPair implements TransferPair {
@@ -428,6 +431,11 @@ public class RepeatedListVector extends AbstractContainerVector
   }
 
   @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    copyFromSafe(fromIndex, toIndex, (RepeatedListVector) from);
+  }
+
+  @Override
   public int getAllocatedByteCount() {
     return delegate.getAllocatedByteCount();
   }
@@ -436,4 +444,11 @@ public class RepeatedListVector extends AbstractContainerVector
   public int getPayloadByteCount() {
     return delegate.getPayloadByteCount();
   }
+
+  @Override
+  public void exchange(ValueVector other) {
+    // TODO: Figure out how to test this scenario, then what to do...
+    throw new UnsupportedOperationException("Exchange() not yet supported for repeated lists");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index d930728..7ff36a7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -390,7 +390,6 @@ public class RepeatedMapVector extends AbstractMapVector
     }
   }
 
-
   transient private RepeatedMapTransferPair ephPair;
 
   public void copyFromSafe(int fromIndex, int thisIndex, RepeatedMapVector from) {
@@ -401,6 +400,11 @@ public class RepeatedMapVector extends AbstractMapVector
   }
 
   @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    copyFromSafe(fromIndex, toIndex, (RepeatedMapVector) from);
+  }
+
+  @Override
   public int getValueCapacity() {
     return Math.max(offsets.getValueCapacity() - 1, 0);
   }
@@ -411,6 +415,13 @@ public class RepeatedMapVector extends AbstractMapVector
   }
 
   @Override
+  public void exchange(ValueVector other) {
+    // Exchange is used for look-ahead writers, but writers manage
+    // map member vectors directly.
+    throw new UnsupportedOperationException("Exchange() not supported for maps");
+  }
+
+  @Override
   public DrillBuf[] getBuffers(boolean clear) {
     //final int expectedBufferSize = getBufferSize();
     //final int actualBufferSize = super.getBufferSize();
@@ -418,7 +429,6 @@ public class RepeatedMapVector extends AbstractMapVector
     return ArrayUtils.addAll(offsets.getBuffers(clear), super.getBuffers(clear));
   }
 
-
   @Override
   public void load(SerializedField metadata, DrillBuf buffer) {
     final List<SerializedField> children = metadata.getChildList();
@@ -572,6 +582,9 @@ public class RepeatedMapVector extends AbstractMapVector
       offsets.getMutator().setSafe(index + 1, prevEnd + 1);
       return prevEnd;
     }
+
+    @Override
+    public void exchange(ValueVector.Mutator other) { }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
index 05fe63f..fdcfcd6 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
@@ -38,18 +38,12 @@ abstract class AbstractBaseReader implements FieldReader{
     super();
   }
 
-  public void setPosition(int index){
-    this.index = index;
-  }
+  public void setPosition(int index) { this.index = index; }
 
-  int idx(){
-    return index;
-  }
+  int idx() { return index; }
 
   @Override
-  public void reset() {
-    index = 0;
-  }
+  public void reset() { index = 0; }
 
   @Override
   public Iterator<String> iterator() {


[2/3] drill git commit: DRILL-5517: Size-aware set methods in value vectors

Posted by pr...@apache.org.
DRILL-5517: Size-aware set methods in value vectors

Please see DRILL-5517 for an explanation.

Also includes a workaround for DRILL-5529.
Implements a setEmpties method for repeated and non-nullable
variable-width types in support of the revised column accessors.

Unit test included. Without the setEmpties call, the tests fail with
vector corruption. With the call, things work properly.

closes #840


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/92c9304f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/92c9304f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/92c9304f

Branch: refs/heads/master
Commit: 92c9304f77a515d8f6747514bd295613738f611e
Parents: 6446e56
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue May 16 13:20:32 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 11:39:16 2017 -0700

----------------------------------------------------------------------
 .../exec/record/vector/TestValueVector.java     |   7 +-
 .../apache/drill/vector/TestFillEmpties.java    | 242 ++++++++
 .../apache/drill/vector/TestVectorLimits.java   | 484 ++++++++++++++++
 .../org/apache/drill/vector/package-info.java   |  22 +
 .../src/main/java/io/netty/buffer/DrillBuf.java |  19 +
 .../netty/buffer/UnsafeDirectLittleEndian.java  |  75 ++-
 .../codegen/templates/FixedValueVectors.java    | 577 +++++++++++++------
 .../codegen/templates/NullableValueVectors.java | 163 +++++-
 .../codegen/templates/RepeatedValueVectors.java | 132 ++++-
 .../src/main/codegen/templates/UnionVector.java |  13 +-
 .../templates/VariableLengthVectors.java        | 218 ++++++-
 .../drill/exec/vector/BaseDataValueVector.java  |  21 +-
 .../drill/exec/vector/BaseValueVector.java      |  13 +-
 .../org/apache/drill/exec/vector/BitVector.java |  85 ++-
 .../drill/exec/vector/FixedWidthVector.java     |   5 +-
 .../apache/drill/exec/vector/ObjectVector.java  |  39 +-
 .../apache/drill/exec/vector/ValueVector.java   |  60 +-
 .../exec/vector/VectorOverflowException.java    |  30 +
 .../apache/drill/exec/vector/VectorUtils.java   |  63 ++
 .../apache/drill/exec/vector/ZeroVector.java    |  71 +--
 .../vector/complex/BaseRepeatedValueVector.java |  18 +-
 .../drill/exec/vector/complex/ListVector.java   |   8 +-
 .../drill/exec/vector/complex/MapVector.java    |  14 +-
 .../exec/vector/complex/RepeatedListVector.java |  19 +-
 .../exec/vector/complex/RepeatedMapVector.java  |  19 +-
 .../vector/complex/impl/AbstractBaseReader.java |  12 +-
 26 files changed, 2059 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index f6044e1..5191f5f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,7 +25,6 @@ import java.nio.charset.Charset;
 
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.types.MinorType;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecTest;
@@ -47,7 +46,6 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
@@ -97,6 +95,7 @@ public class TestValueVector extends ExecTest {
   @Test(expected = OversizedAllocationException.class)
   public void testFixedVectorReallocation() {
     final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
+    @SuppressWarnings("resource")
     final UInt4Vector vector = new UInt4Vector(field, allocator);
     // edge case 1: buffer size = max value capacity
     final int expectedValueCapacity = BaseValueVector.MAX_ALLOCATION_SIZE / 4;
@@ -149,10 +148,10 @@ public class TestValueVector extends ExecTest {
     }
   }
 
-
   @Test(expected = OversizedAllocationException.class)
   public void testVariableVectorReallocation() {
     final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
+    @SuppressWarnings("resource")
     final VarCharVector vector = new VarCharVector(field, allocator);
     // edge case 1: value count = MAX_VALUE_ALLOCATION
     final int expectedAllocationInBytes = BaseValueVector.MAX_ALLOCATION_SIZE;

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/java-exec/src/test/java/org/apache/drill/vector/TestFillEmpties.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/vector/TestFillEmpties.java b/exec/java-exec/src/test/java/org/apache/drill/vector/TestFillEmpties.java
new file mode 100644
index 0000000..266bff2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/vector/TestFillEmpties.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.vector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.RepeatedVarCharVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.VectorOverflowException;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.DrillBuf;
+
+public class TestFillEmpties extends DrillTest {
+
+  public static OperatorFixture fixture;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    fixture = OperatorFixture.builder().build();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    fixture.close();
+  }
+
+  // To be replaced by a test method in a separate commit.
+
+  public static MaterializedField makeField(String name, MinorType dataType, DataMode mode) {
+    MajorType type = MajorType.newBuilder()
+        .setMinorType(dataType)
+        .setMode(mode)
+        .build();
+
+    return MaterializedField.create(name, type);
+  }
+
+  @Test
+  public void testNullableVarChar() {
+    @SuppressWarnings("resource")
+    NullableVarCharVector vector = new NullableVarCharVector(makeField("a", MinorType.VARCHAR, DataMode.OPTIONAL), fixture.allocator());
+    vector.allocateNew( );
+
+    // Create "foo", null, "bar", but omit the null.
+
+    NullableVarCharVector.Mutator mutator = vector.getMutator();
+    byte[] value = makeValue( "foo" );
+    mutator.setSafe(0, value, 0, value.length);
+
+    value = makeValue("bar");
+    mutator.setSafe(2, value, 0, value.length);
+
+    visualize(vector, 3);
+    verifyOffsets(vector.getValuesVector().getOffsetVector(), new int[] {0, 3, 3, 6});
+    vector.close();
+  }
+
+  @Test
+  public void testVarChar() {
+    @SuppressWarnings("resource")
+    VarCharVector vector = new VarCharVector(makeField("a", MinorType.VARCHAR, DataMode.REQUIRED), fixture.allocator());
+    vector.allocateNew( );
+
+    // Create "foo", null, "bar", but omit the null.
+
+    VarCharVector.Mutator mutator = vector.getMutator();
+    byte[] value = makeValue( "foo" );
+    mutator.setSafe(0, value, 0, value.length);
+
+    // Work around: test fails without this. But, only the new column writers
+    // call this method.
+
+    try {
+      mutator.fillEmptiesBounded(0, 2);
+    } catch (VectorOverflowException e) {
+      fail();
+    }
+    value = makeValue("bar");
+    mutator.setSafe(2, value, 0, value.length);
+
+    visualize(vector, 3);
+    verifyOffsets(vector.getOffsetVector(), new int[] {0, 3, 3, 6});
+    vector.close();
+  }
+
+  @Test
+  public void testInt() {
+    @SuppressWarnings("resource")
+    IntVector vector = new IntVector(makeField("a", MinorType.INT, DataMode.REQUIRED), fixture.allocator());
+    vector.allocateNew( );
+
+    // Create 1, 0, 2, but omit the 0.
+
+    IntVector.Mutator mutator = vector.getMutator();
+    mutator.setSafe(0, 1);
+
+    mutator.setSafe(2, 3);
+
+    visualize(vector, 3);
+    vector.close();
+  }
+
+  @Test
+  public void testRepeatedVarChar() {
+    @SuppressWarnings("resource")
+    RepeatedVarCharVector vector = new RepeatedVarCharVector(makeField("a", MinorType.VARCHAR, DataMode.REPEATED), fixture.allocator());
+    vector.allocateNew( );
+
+    // Create "foo", null, "bar", but omit the null.
+
+    RepeatedVarCharVector.Mutator mutator = vector.getMutator();
+    mutator.startNewValue(0);
+    byte[] value = makeValue( "a" );
+    mutator.addSafe(0, value, 0, value.length);
+    value = makeValue( "b" );
+    mutator.addSafe(0, value, 0, value.length);
+
+    // Work around: test fails without this. But, only the new column writers
+    // call this method.
+
+    try {
+      mutator.fillEmptiesBounded(0, 2);
+    } catch (VectorOverflowException e) {
+      fail();
+    }
+    mutator.startNewValue(2);
+    value = makeValue( "c" );
+    mutator.addSafe(2, value, 0, value.length);
+    value = makeValue( "d" );
+    mutator.addSafe(2, value, 0, value.length);
+
+    visualize(vector, 3);
+    verifyOffsets(vector.getOffsetVector(), new int[] {0, 2, 2, 4});
+    verifyOffsets(vector.getDataVector().getOffsetVector(), new int[] {0, 1, 2, 3, 4});
+    vector.close();
+  }
+
+  private void visualize(RepeatedVarCharVector vector, int valueCount) {
+    visualize("Array Offsets", vector.getOffsetVector(), valueCount + 1);
+    visualize(vector.getDataVector(), vector.getOffsetVector().getAccessor().get(valueCount));
+  }
+
+  private void visualize(IntVector vector, int valueCount) {
+    System.out.print("Values: [");
+    IntVector.Accessor accessor = vector.getAccessor();
+    for (int i = 0; i < valueCount; i++) {
+      if (i > 0) { System.out.print(" "); }
+      System.out.print(accessor.get(i));
+    }
+    System.out.println("]");
+  }
+
+  private void visualize(NullableVarCharVector vector, int valueCount) {
+    visualize("Is-set", vector.getAccessor(), valueCount);
+    visualize(vector.getValuesVector(), valueCount);
+  }
+
+  private void visualize(VarCharVector vector, int valueCount) {
+    visualize("Offsets", vector.getOffsetVector(), valueCount + 1);
+    visualize("Data", vector.getBuffer(), vector.getOffsetVector().getAccessor().get(valueCount));
+  }
+
+  private void visualize(String label, UInt4Vector offsetVector,
+      int valueCount) {
+    System.out.print(label + ": [");
+    UInt4Vector.Accessor accessor = offsetVector.getAccessor();
+    for (int i = 0; i < valueCount; i++) {
+      if (i > 0) { System.out.print(" "); }
+      System.out.print(accessor.get(i));
+    }
+    System.out.println("]");
+  }
+
+  private void visualize(String label, DrillBuf buffer, int valueCount) {
+    System.out.print(label + ": [");
+    for (int i = 0; i < valueCount; i++) {
+      if (i > 0) { System.out.print(" "); }
+      System.out.print((char) buffer.getByte(i));
+    }
+    System.out.println("]");
+  }
+
+  private void visualize(String label, BaseDataValueVector.BaseAccessor accessor, int valueCount) {
+    System.out.print(label + ": [");
+    for (int i = 0; i < valueCount; i++) {
+      if (i > 0) { System.out.print(" "); }
+      System.out.print(accessor.isNull(i) ? 0 : 1);
+    }
+    System.out.println("]");
+  }
+
+  private void verifyOffsets(UInt4Vector offsetVector, int[] expected) {
+    UInt4Vector.Accessor accessor = offsetVector.getAccessor();
+    for (int i = 0; i < expected.length; i++) {
+      assertEquals(expected[i], accessor.get(i));
+    }
+  }
+
+  /**
+   * Create a test value. Works only for the ASCII subset of characters, obviously.
+   * @param string
+   * @return
+   */
+  private byte[] makeValue(String string) {
+    byte value[] = new byte[string.length()];
+    for (int i = 0; i < value.length; i++) {
+      value[i] = (byte) string.charAt(i);
+    }
+    return value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/java-exec/src/test/java/org/apache/drill/vector/TestVectorLimits.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/vector/TestVectorLimits.java b/exec/java-exec/src/test/java/org/apache/drill/vector/TestVectorLimits.java
new file mode 100644
index 0000000..86bd206
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/vector/TestVectorLimits.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.vector;
+
+import static org.junit.Assert.*;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.RepeatedIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.VectorOverflowException;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.bouncycastle.util.Arrays;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Test the setScalar() methods in the various generated vector
+ * classes. Rather than test all 100+ vectors, we sample a few and
+ * rely on the fact that code is generated from a common template.
+ */
+
+public class TestVectorLimits extends DrillTest {
+
+  public static OperatorFixture fixture;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    fixture = OperatorFixture.builder().build();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    fixture.close();
+  }
+
+  /**
+   * Test a vector directly using the vector mutator to ensure
+   * that the <tt>setScalar</tt> method works for the maximum
+   * row count.
+   * <p>
+   * This test is a proxy for all the other fixed types, since all
+   * share the same code template.
+   */
+
+  @Test
+  public void testFixedVector() {
+
+    // Create a non-nullable int vector: a typical fixed-size vector
+
+    @SuppressWarnings("resource")
+    IntVector vector = new IntVector(makeField(MinorType.INT, DataMode.REQUIRED), fixture.allocator() );
+
+    // Sanity test of generated constants.
+
+    assertTrue( IntVector.MAX_SCALAR_COUNT <= ValueVector.MAX_ROW_COUNT );
+    assertEquals( 4, IntVector.VALUE_WIDTH );
+    assertTrue( IntVector.NET_MAX_SCALAR_SIZE <= ValueVector.MAX_BUFFER_SIZE );
+
+    // Allocate a default size, small vector. Forces test of
+    // the auto-grow (setSafe()) aspect of setScalar().
+
+    vector.allocateNew( );
+
+    // Write to the vector until it complains. At that point,
+    // we should have written up to the static fixed value count
+    // (which is computed to stay below the capacity limit.)
+
+    IntVector.Mutator mutator = vector.getMutator();
+    for (int i = 0; i < 2 * ValueVector.MAX_ROW_COUNT; i++) {
+      try {
+        mutator.setScalar(i, i);
+      } catch (VectorOverflowException e) {
+        assertEquals(IntVector.MAX_SCALAR_COUNT, i);
+        break;
+      }
+    }
+
+    // The vector should be below the allocation limit. Since this
+    // is an int vector, in practice the size will be far below
+    // the overall limit (if the limit stays at 16 MB.) But, it should
+    // be at the type-specific limit since we filled up the vector.
+
+    assertEquals(IntVector.NET_MAX_SCALAR_SIZE, vector.getBuffer().getActualMemoryConsumed());
+    vector.close();
+  }
+
+  @Test
+  public void testNullableFixedVector() {
+
+    @SuppressWarnings("resource")
+    NullableIntVector vector = new NullableIntVector(makeField(MinorType.INT, DataMode.OPTIONAL), fixture.allocator() );
+    vector.allocateNew( );
+
+    NullableIntVector.Mutator mutator = vector.getMutator();
+    for (int i = 0; i < 2 * ValueVector.MAX_ROW_COUNT; i++) {
+      try {
+        mutator.setScalar(i, i);
+      } catch (VectorOverflowException e) {
+        assertEquals(IntVector.MAX_SCALAR_COUNT, i);
+        break;
+      }
+    }
+
+    vector.close();
+  }
+
+  /**
+   * Repeated fixed vector. Using an int vector, each column array can hold
+   * 256 / 4 = 64 values. We write only 10. The vector becomes full when we
+   * exceed 64K items.
+   */
+
+  @Test
+  public void testRepeatedFixedVectorCountLimit() {
+
+    @SuppressWarnings("resource")
+    RepeatedIntVector vector = new RepeatedIntVector(makeField(MinorType.INT, DataMode.REPEATED), fixture.allocator() );
+    vector.allocateNew( );
+
+    RepeatedIntVector.Mutator mutator = vector.getMutator();
+    top:
+    for (int i = 0; i < 2 * ValueVector.MAX_ROW_COUNT; i++) {
+      if (! mutator.startNewValueBounded(i)) {
+        assertEquals(ValueVector.MAX_ROW_COUNT, i);
+        // Continue, let's check the addBounded method also
+      }
+      for (int j = 0; j < 10; j++) {
+        try {
+          mutator.addEntry(i, i * 100 + j);
+        } catch (VectorOverflowException e) {
+          assertEquals(ValueVector.MAX_ROW_COUNT, i);
+          mutator.setValueCount(i);
+          break top;
+        }
+      }
+    }
+
+    vector.close();
+  }
+
+  /**
+   * Repeated fixed vector. Using an int vector, each column array can hold
+   * 256 / 4 = 64 values. We write 100. The vector becomes full when we
+   * exceed the 16 MB size limit.
+   */
+
+  @Test
+  public void testRepeatedFixedVectorBufferLimit() {
+
+    @SuppressWarnings("resource")
+    RepeatedIntVector vector = new RepeatedIntVector(makeField(MinorType.INT, DataMode.REPEATED), fixture.allocator() );
+    vector.allocateNew( );
+
+    RepeatedIntVector.Mutator mutator = vector.getMutator();
+    top:
+    for (int i = 0; i < 2 * ValueVector.MAX_ROW_COUNT; i++) {
+      // We'll never hit the value count limit
+      assertTrue(mutator.startNewValueBounded(i));
+      for (int j = 0; j < 100; j++) {
+        try {
+          mutator.addEntry(i, i * 100 + j);
+        } catch (VectorOverflowException e) {
+          // We should have hit the buffer limit before the value limit.
+          assertTrue(i < ValueVector.MAX_ROW_COUNT);
+          mutator.setValueCount(i);
+          break top;
+        }
+      }
+    }
+
+    vector.close();
+  }
+
+  // To be replaced by a test method in a separate commit.
+
+  public static MaterializedField makeField(MinorType dataType, DataMode mode) {
+    MajorType type = MajorType.newBuilder()
+        .setMinorType(dataType)
+        .setMode(mode)
+        .build();
+
+    return MaterializedField.create("foo", type);
+  }
+
+  /**
+   * Baseline test for a variable-width vector using <tt>setSafe</tt> and
+   * loading the vector up to the maximum size. Doing so will cause the vector
+   * to have a buffer that exceeds the maximum size, demonstrating the
+   * need for <tt>setScalar()</tt>.
+   */
+
+  @Test
+  public void variableVectorBaseline() {
+
+    // Create a non-nullable VarChar vector: a typical variable-size vector
+
+    @SuppressWarnings("resource")
+    VarCharVector vector = new VarCharVector(makeField(MinorType.VARCHAR, DataMode.REQUIRED), fixture.allocator() );
+    vector.allocateNew( );
+
+    // A 16 MB value can hold 64K values of up to 256 bytes each.
+    // To force a size overflow, write values much larger.
+    // Write the maximum number of values which will silently
+    // allow the vector to grow beyond the critical size of 16 MB.
+    // Doing this in production would lead to memory fragmentation.
+    // So, this is what the setScalar() method assures we don't do.
+
+    byte dummyValue[] = new byte[512];
+    Arrays.fill(dummyValue, (byte) 'X');
+    VarCharVector.Mutator mutator = vector.getMutator();
+    for (int i = 0; i < 2 * ValueVector.MAX_ROW_COUNT; i++) {
+      mutator.setSafe(i, dummyValue, 0, dummyValue.length);
+    }
+
+    // The vector should be above the allocation limit.
+    // This is why code must migrate to the setScalar() call
+    // away from the setSafe() call.
+
+    assertTrue(ValueVector.MAX_BUFFER_SIZE < vector.getBuffer().getActualMemoryConsumed());
+    vector.close();
+  }
+
+  /**
+   * Test a vector directly using the vector mutator to ensure
+   * that the <tt>setScalar</tt> method works for the maximum
+   * vector size.
+   */
+
+  @Test
+  public void testWideVariableVector() {
+
+    @SuppressWarnings("resource")
+    VarCharVector vector = new VarCharVector(makeField(MinorType.VARCHAR, DataMode.REQUIRED), fixture.allocator() );
+    vector.allocateNew( );
+
+    // A 16 MB value can hold 64K values of up to 256 bytes each.
+    // To force a size overflow, write values much larger.
+    // Write to the vector until it complains. At that point,
+    // we should have written up to the maximum buffer size.
+
+    byte dummyValue[] = makeVarCharValue(512);
+    VarCharVector.Mutator mutator = vector.getMutator();
+    int count = 0;
+    for ( ; count < 2 * ValueVector.MAX_ROW_COUNT; count++) {
+      try {
+        mutator.setScalar(count, dummyValue, 0, dummyValue.length);
+      } catch (VectorOverflowException e) {
+        break;
+      }
+    }
+
+    // The vector should be at the allocation limit. If it wasn't, we
+    // should have grown it to hold more data. The value count will
+    // be below the maximum.
+
+    mutator.setValueCount(count);
+    assertEquals(ValueVector.MAX_BUFFER_SIZE, vector.getBuffer().getActualMemoryConsumed());
+    assertTrue(count < ValueVector.MAX_ROW_COUNT);
+    vector.close();
+  }
+
+  private byte[] makeVarCharValue(int n) {
+    byte dummyValue[] = new byte[n];
+    Arrays.fill(dummyValue, (byte) 'X');
+    return dummyValue;
+  }
+
+  @Test
+  public void testNullableWideVariableVector() {
+
+    @SuppressWarnings("resource")
+    NullableVarCharVector vector = new NullableVarCharVector(makeField(MinorType.VARCHAR, DataMode.OPTIONAL), fixture.allocator() );
+    vector.allocateNew( );
+
+    byte dummyValue[] = makeVarCharValue(512);
+    NullableVarCharVector.Mutator mutator = vector.getMutator();
+    int count = 0;
+    for ( ; count < 2 * ValueVector.MAX_ROW_COUNT; count++) {
+      try {
+        mutator.setScalar(count, dummyValue, 0, dummyValue.length);
+      } catch (VectorOverflowException e) {
+        break;
+      }
+    }
+
+    mutator.setValueCount(count);
+    assertEquals(ValueVector.MAX_BUFFER_SIZE, vector.getValuesVector().getBuffer().getActualMemoryConsumed());
+    assertTrue(count < ValueVector.MAX_ROW_COUNT);
+    vector.close();
+  }
+
+  /**
+   * Test a vector directly using the vector mutator to ensure
+   * that the <tt>setScalar</tt> method works for the maximum
+   * value count.
+   */
+
+  @Test
+  public void testNarrowVariableVector() {
+
+    @SuppressWarnings("resource")
+    VarCharVector vector = new VarCharVector(makeField(MinorType.VARCHAR, DataMode.REQUIRED), fixture.allocator() );
+    vector.allocateNew( );
+
+    // Write small values that fit into 16 MB. We should stop writing
+    // when we reach the value count limit.
+
+    byte dummyValue[] = makeVarCharValue(254);
+    VarCharVector.Mutator mutator = vector.getMutator();
+    int count = 0;
+    for (; count < 2 * ValueVector.MAX_ROW_COUNT; count++) {
+      try {
+        mutator.setScalar(count, dummyValue, 0, dummyValue.length);
+      } catch (VectorOverflowException e) {
+        break;
+      }
+    }
+
+    // Buffer size should be at or below the maximum, with count
+    // at the maximum.
+
+    mutator.setValueCount(count);
+    assertTrue(vector.getBuffer().getActualMemoryConsumed() <= ValueVector.MAX_BUFFER_SIZE);
+    assertEquals(ValueVector.MAX_ROW_COUNT, count);
+    vector.close();
+  }
+
+  /**
+   * Test a vector directly using the vector mutator to ensure
+   * that the <tt>setScalar</tt> method works for the maximum
+   * value count. Uses a DrillBuf as input.
+   */
+
+  @Test
+  public void testDirectVariableVector() {
+
+    @SuppressWarnings("resource")
+    VarCharVector vector = new VarCharVector(makeField(MinorType.VARCHAR, DataMode.REQUIRED), fixture.allocator() );
+    vector.allocateNew( );
+
+    // Repeat the big-value test, but with data coming from a DrillBuf
+    // (direct memory) rather than a heap array.
+
+    @SuppressWarnings("resource")
+    DrillBuf drillBuf = makeVarCharValueDirect(260);
+    VarCharVector.Mutator mutator = vector.getMutator();
+    int count = 0;
+    for (; count < 2 * ValueVector.MAX_ROW_COUNT; count++) {
+      try {
+        mutator.setScalar(count, drillBuf, 0, 260);
+      } catch (VectorOverflowException e) {
+        break;
+      }
+    }
+    drillBuf.close();
+
+    // Again, vector should be at the size limit, count below the
+    // value limit.
+
+    mutator.setValueCount(count);
+    assertEquals(ValueVector.MAX_BUFFER_SIZE, vector.getBuffer().getActualMemoryConsumed());
+    assertTrue(count < ValueVector.MAX_ROW_COUNT);
+    vector.close();
+  }
+
+  private DrillBuf makeVarCharValueDirect(int n) {
+    byte dummyValue[] = makeVarCharValue(n);
+    DrillBuf drillBuf = fixture.allocator().buffer(dummyValue.length);
+    drillBuf.setBytes(0, dummyValue);
+    return drillBuf;
+  }
+
+  @Test
+  public void testDirectNullableVariableVector() {
+
+    @SuppressWarnings("resource")
+    NullableVarCharVector vector = new NullableVarCharVector(makeField(MinorType.VARCHAR, DataMode.OPTIONAL), fixture.allocator() );
+    vector.allocateNew( );
+
+    @SuppressWarnings("resource")
+    DrillBuf drillBuf = makeVarCharValueDirect(260);
+    NullableVarCharVector.Mutator mutator = vector.getMutator();
+    int count = 0;
+    for (; count < 2 * ValueVector.MAX_ROW_COUNT; count++) {
+      try {
+        mutator.setScalar(count, drillBuf, 0, 260);
+      } catch (VectorOverflowException e) {
+        break;
+      }
+    }
+    drillBuf.close();
+
+    mutator.setValueCount(count);
+    assertEquals(ValueVector.MAX_BUFFER_SIZE, vector.getValuesVector().getBuffer().getActualMemoryConsumed());
+    assertTrue(count < ValueVector.MAX_ROW_COUNT);
+    vector.close();
+  }
+
+  public static void main(String args[]) {
+    try {
+      setUpBeforeClass();
+      new TestVectorLimits().performanceTest();
+      tearDownAfterClass();
+    } catch (Exception e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  private void performanceTest() {
+    @SuppressWarnings("resource")
+    VarCharVector vector = new VarCharVector(makeField(MinorType.VARCHAR, DataMode.OPTIONAL), fixture.allocator() );
+    byte value[] = makeVarCharValue(1);
+    int warmCount = 100;
+    timeSetSafe(vector, value, warmCount);
+    runSetBounded(vector, value, warmCount);
+    int runCount = 1000;
+    timeSetSafe(vector, value, runCount);
+    runSetBounded(vector, value, runCount);
+    timeSetSafe(vector, value, runCount);
+    vector.close();
+  }
+
+  private void timeSetSafe(VarCharVector vector, byte[] value, int iterCount) {
+    long start = System.currentTimeMillis();
+    for (int i = 0; i < iterCount; i++) {
+      vector.clear();
+      vector.allocateNew( );
+
+      VarCharVector.Mutator mutator = vector.getMutator();
+      for (int j = 0; j < ValueVector.MAX_ROW_COUNT; j++) {
+        mutator.setSafe(j, value, 0, value.length);
+      }
+    }
+    long elapsed = System.currentTimeMillis() - start;
+    System.out.println( iterCount + " runs of setSafe: " + elapsed + " ms." );
+  }
+
+  private void runSetBounded(VarCharVector vector, byte[] value, int iterCount) {
+    long start = System.currentTimeMillis();
+    for (int i = 0; i < iterCount; i++) {
+      vector.clear();
+      vector.allocateNew( );
+
+      VarCharVector.Mutator mutator = vector.getMutator();
+      int posn = 0;
+      for (;;) {
+        try {
+          mutator.setScalar(posn++, value, 0, value.length);
+        } catch (VectorOverflowException e) {
+          break;
+        }
+      }
+    }
+    long elapsed = System.currentTimeMillis() - start;
+    System.out.println( iterCount + " runs of setScalar: " + elapsed + " ms." );
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/java-exec/src/test/java/org/apache/drill/vector/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/vector/package-info.java b/exec/java-exec/src/test/java/org/apache/drill/vector/package-info.java
new file mode 100644
index 0000000..c858814
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/vector/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Tests for value vectors. Is in this module to allow use of
+ * the test tools which are available only in this module.
+ */
+package org.apache.drill.vector;

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 3793f25..5139086 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -748,6 +748,25 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     return this;
   }
 
+  // Clone of UDLE's setBytes(), but with bounds checking done as a boolean,
+  // not assertion.
+
+  public boolean setBytesBounded(int index, byte[] src, int srcIndex, int length) {
+    // Must do here because Drill's UDLE is not ref counted.
+    // Done as an assert to avoid production overhead: if this is going
+    // to fail, it will do so spectacularly in tests, due to a programming error.
+    assert refCnt() > 0;
+    return udle.setBytesBounded(index, src, srcIndex, length);
+  }
+
+  // As above, but for direct memory.
+
+  public boolean setBytesBounded(int index, DrillBuf src, int srcIndex, int length) {
+    // See above.
+    assert refCnt() > 0;
+    return udle.setBytesBounded(index, src.udle, srcIndex, length);
+  }
+
   @Override
   public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
     udle.setBytes(index + offset, src, srcIndex, length);

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index 6495d5d..c91944f 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -18,15 +18,17 @@
 
 package io.netty.buffer;
 
-import io.netty.util.internal.PlatformDependent;
-
 import java.nio.ByteOrder;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.util.internal.PlatformDependent;
+
 /**
- * The underlying class we use for little-endian access to memory. Is used underneath DrillBufs to abstract away the
- * Netty classes and underlying Netty memory management.
+ * The underlying class we use for little-endian access to memory. Is used
+ * underneath DrillBufs to abstract away the Netty classes and underlying Netty
+ * memory management.
  */
+
 public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
   private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
   private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
@@ -49,7 +51,6 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
 
   UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
     this(buf, true, bufferCount, bufferSize);
-
   }
 
   private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
@@ -67,21 +68,21 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
     this.wrapped = buf;
     this.memoryAddress = buf.memoryAddress();
   }
-    private long addr(int index) {
-        return memoryAddress + index;
-    }
 
-    @Override
-    public long getLong(int index) {
-//        wrapped.checkIndex(index, 8);
-        long v = PlatformDependent.getLong(addr(index));
-        return v;
-    }
+  private long addr(int index) {
+    return memoryAddress + index;
+  }
 
-    @Override
-    public float getFloat(int index) {
-        return Float.intBitsToFloat(getInt(index));
-    }
+  @Override
+  public long getLong(int index) {
+//  wrapped.checkIndex(index, 8);
+    return PlatformDependent.getLong(addr(index));
+  }
+
+  @Override
+  public float getFloat(int index) {
+    return Float.intBitsToFloat(getInt(index));
+  }
 
   @Override
   public ByteBuf slice() {
@@ -174,6 +175,43 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
     return this;
   }
 
+  // Clone of the super class checkIndex, but this version returns a boolean rather
+  // than throwing an exception.
+
+  protected boolean hasCapacity(int index, int fieldLength) {
+    assert fieldLength >= 0;
+    return (! (index < 0 || index > capacity() - fieldLength));
+  }
+
+  /**
+   * Write bytes into the buffer at the given index, if space is available.
+   * @param index location to write
+   * @param src bytes to write
+   * @param srcIndex start of data in the source array
+   * @param length length of the data to write
+   * @return true if the value was written, false if the value was not
+   * written because the value would overflow the buffer
+   */
+
+  public boolean setBytesBounded(int index, byte[] src, int srcIndex, int length) {
+    if (! hasCapacity(index, length)) {
+      return false;
+    }
+    PlatformDependent.copyMemory(src, srcIndex, addr(index), length);
+    return true;
+  }
+
+  // Version of the super class setBytes(), but with bounds checking done as a boolean,
+  // not assertion. This version requires a direct source buffer.
+
+  public boolean setBytesBounded(int index, UnsafeDirectLittleEndian src, int srcIndex, int length) {
+    if (! hasCapacity(index, length)) {
+      return false;
+    }
+    PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, addr(index), length);
+    return true;
+  }
+
   @Override
   public ByteBuf writeShort(int value) {
     wrapped.ensureWritable(2);
@@ -266,5 +304,4 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
     assert isAssertEnabled = true;
     ASSERT_ENABLED = isAssertEnabled;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/92c9304f/exec/vector/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index 23188ce..1e83a4f 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -31,22 +31,60 @@ package org.apache.drill.exec.vector;
 import org.apache.drill.exec.util.DecimalUtility;
 
 /**
- * ${minor.class} implements a vector of fixed width values.  Elements in the vector are accessed
- * by position, starting from the logical start of the vector.  Values should be pushed onto the
- * vector sequentially, but may be randomly accessed.
- *   The width of each element is ${type.width} byte(s)
- *   The equivalent Java primitive is '${minor.javaType!type.javaType}'
+ * ${minor.class} implements a vector of fixed width values. Elements in the vector are accessed
+ * by position, starting from the logical start of the vector. Values should be pushed onto the
+ * vector sequentially, but may be accessed randomly.
+ * <ul>
+ * <li>The width of each element is {@link #VALUE_WIDTH} (= ${type.width}) byte<#if type.width != 1>s</#if>.</li>
+ * <li>The equivalent Java primitive is '${minor.javaType!type.javaType}'.</li>
+ * </ul>
  *
  * NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker.
  */
-public final class ${minor.class}Vector extends BaseDataValueVector implements FixedWidthVector{
+public final class ${minor.class}Vector extends BaseDataValueVector implements FixedWidthVector {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class);
 
+  /**
+   * Width of each fixed-width value.
+   */
+
+  public static final int VALUE_WIDTH = ${type.width};
+
+  /**
+   * Maximum number of values that this fixed-width vector can hold
+   * and stay below the maximum vector size limit. This is the limit
+   * enforced when the vector is used to hold values in a repeated
+   * vector.
+   */
+
+  public static final int MAX_VALUE_COUNT = MAX_BUFFER_SIZE / VALUE_WIDTH;
+
+  /**
+   * Maximum number of values that this fixed-width vector can hold
+   * and stay below the maximum vector size limit and/or stay below
+   * the maximum row count. This is the limit enforced when the
+   * vector is used to hold scalar (required or nullable) values.
+   * <p>
+   * Note: <tt>MAX_ROW_COUNT</tt> is defined in the parent <tt>ValueVector</tt>
+   * class as the maximum number of rows in a record batch (64K). Use this
+   * in place of the <tt>Character.MAX_SIZE</tt> value previously used.
+   */
+
+  public static final int MAX_SCALAR_COUNT = Math.min(MAX_ROW_COUNT, MAX_VALUE_COUNT);
+
+  /**
+   * Actual maximum vector size, in bytes, given the number of fixed-width
+   * values that either fit in the maximum overall vector size, or that
+   * is no larger than the maximum vector item count.
+   */
+
+  public static final int NET_MAX_SCALAR_SIZE = VALUE_WIDTH * MAX_SCALAR_COUNT;
+
   private final FieldReader reader = new ${minor.class}ReaderImpl(${minor.class}Vector.this);
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
 
-  private int allocationSizeInBytes = INITIAL_VALUE_ALLOCATION * ${type.width};
+  private int allocationSizeInBytes = Math.min(INITIAL_VALUE_ALLOCATION * VALUE_WIDTH, MAX_BUFFER_SIZE);
   private int allocationMonitor = 0;
 
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
@@ -54,45 +92,41 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   }
 
   @Override
-  public FieldReader getReader(){
-    return reader;
-  }
+  public FieldReader getReader() { return reader; }
 
   @Override
   public int getBufferSizeFor(final int valueCount) {
     if (valueCount == 0) {
       return 0;
     }
-    return valueCount * ${type.width};
+    return valueCount * VALUE_WIDTH;
   }
 
   @Override
   public int getValueCapacity(){
-    return data.capacity() / ${type.width};
+    return data.capacity() / VALUE_WIDTH;
   }
 
   @Override
-  public Accessor getAccessor(){
-    return accessor;
-  }
+  public Accessor getAccessor() { return accessor; }
 
   @Override
-  public Mutator getMutator(){
-    return mutator;
-  }
+  public Mutator getMutator() { return mutator; }
 
   @Override
   public void setInitialCapacity(final int valueCount) {
-    final long size = 1L * valueCount * ${type.width};
+    final long size = (long) valueCount * VALUE_WIDTH;
+    // TODO: Replace this with MAX_BUFFER_SIZE once all
+    // code is aware of the maximum vector size.
     if (size > MAX_ALLOCATION_SIZE) {
       throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
     }
-    allocationSizeInBytes = (int)size;
+    allocationSizeInBytes = (int) size;
   }
 
   @Override
   public void allocateNew() {
-    if(!allocateNewSafe()){
+    if (!allocateNewSafe()){
       throw new OutOfMemoryException("Failure while allocating buffer.");
     }
   }
@@ -123,11 +157,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
    * Note that the maximum number of values a vector can allocate is Integer.MAX_VALUE / value width.
    *
    * @param valueCount
-   * @throws org.apache.drill.exec.memory.OutOfMemoryException if it can't allocate the new buffer
+   * @throws OutOfMemoryException if it can't allocate the new buffer
    */
   @Override
   public void allocateNew(final int valueCount) {
-    allocateBytes(valueCount * ${type.width});
+    allocateBytes(valueCount * VALUE_WIDTH);
   }
 
   @Override
@@ -139,6 +173,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   }
 
   private void allocateBytes(final long size) {
+    // TODO: Replace this with MAX_BUFFER_SIZE once all
+    // code is aware of the maximum vector size.
     if (size > MAX_ALLOCATION_SIZE) {
       throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
     }
@@ -150,13 +186,15 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     allocationSizeInBytes = curSize;
   }
 
-/**
- * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one
- *
- * @throws org.apache.drill.exec.memory.OutOfMemoryException if it can't allocate the new buffer
- */
+  /**
+   * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one
+   *
+   * @throws org.apache.drill.exec.memory.OutOfMemoryException if it can't allocate the new buffer
+   */
   public void reAlloc() {
     final long newAllocationSize = allocationSizeInBytes * 2L;
+    // TODO: Replace this with MAX_BUFFER_SIZE once all
+    // code is aware of the maximum vector size.
     if (newAllocationSize > MAX_ALLOCATION_SIZE)  {
       throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
     }
@@ -185,7 +223,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     Preconditions.checkArgument(this.field.getPath().equals(metadata.getNamePart().getName()), "The field %s doesn't match the provided metadata %s.", this.field, metadata);
     final int actualLength = metadata.getBufferLength();
     final int valueCount = metadata.getValueCount();
-    final int expectedLength = valueCount * ${type.width};
+    final int expectedLength = valueCount * VALUE_WIDTH;
     assert actualLength == expectedLength : String.format("Expected to load %d bytes but actually loaded %d bytes", expectedLength, actualLength);
 
     clear();
@@ -220,8 +258,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   }
 
   public void splitAndTransferTo(int startIndex, int length, ${minor.class}Vector target) {
-    final int startPoint = startIndex * ${type.width};
-    final int sliceLength = length * ${type.width};
+    final int startPoint = startIndex * VALUE_WIDTH;
+    final int sliceLength = length * VALUE_WIDTH;
     target.clear();
     target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer;
     target.data.writerIndex(sliceLength);
@@ -229,7 +267,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
   @Override
   public int getPayloadByteCount() {
-    return getAccessor().getValueCount() * ${type.width};
+    return getAccessor().getValueCount() * VALUE_WIDTH;
   }
 
   private class TransferImpl implements TransferPair{
@@ -266,10 +304,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
   public void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
     <#if (type.width > 8)>
-    from.data.getBytes(fromIndex * ${type.width}, data, thisIndex * ${type.width}, ${type.width});
+    from.data.getBytes(fromIndex * VALUE_WIDTH, data, thisIndex * VALUE_WIDTH, VALUE_WIDTH);
     <#else> <#-- type.width <= 8 -->
-    data.set${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width},
-        from.data.get${(minor.javaType!type.javaType)?cap_first}(fromIndex * ${type.width})
+    data.set${(minor.javaType!type.javaType)?cap_first}(thisIndex * VALUE_WIDTH,
+        from.data.get${(minor.javaType!type.javaType)?cap_first}(fromIndex * VALUE_WIDTH)
     );
     </#if> <#-- type.width -->
   }
@@ -281,6 +319,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     copyFrom(fromIndex, thisIndex, from);
   }
 
+  @Override
+  public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
+    ((${minor.class}Vector) from).data.getBytes(fromIndex * ${type.width}, data, toIndex * ${type.width}, ${type.width});
+  }
+
   public void decrementAllocationMonitor() {
     if (allocationMonitor > 0) {
       allocationMonitor = 0;
@@ -295,7 +338,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   public final class Accessor extends BaseDataValueVector.BaseAccessor {
     @Override
     public int getValueCount() {
-      return data.writerIndex() / ${type.width};
+      return data.writerIndex() / VALUE_WIDTH;
     }
 
     @Override
@@ -305,20 +348,19 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
     <#if (type.width > 8)>
     public ${minor.javaType!type.javaType} get(int index) {
-      return data.slice(index * ${type.width}, ${type.width});
+      return data.slice(index * VALUE_WIDTH, VALUE_WIDTH);
     }
 
     <#if (minor.class == "Interval")>
     public void get(int index, ${minor.class}Holder holder){
-
-      final int offsetIndex = index * ${type.width};
+      final int offsetIndex = index * VALUE_WIDTH;
       holder.months = data.getInt(offsetIndex);
       holder.days = data.getInt(offsetIndex + ${minor.daysOffset});
       holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset});
     }
 
     public void get(int index, Nullable${minor.class}Holder holder){
-      final int offsetIndex = index * ${type.width};
+      final int offsetIndex = index * VALUE_WIDTH;
       holder.isSet = 1;
       holder.months = data.getInt(offsetIndex);
       holder.days = data.getInt(offsetIndex + ${minor.daysOffset});
@@ -327,7 +369,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
     @Override
     public ${friendlyType} getObject(int index) {
-      final int offsetIndex = index * ${type.width};
+      final int offsetIndex = index * VALUE_WIDTH;
       final int months  = data.getInt(offsetIndex);
       final int days    = data.getInt(offsetIndex + ${minor.daysOffset});
       final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
@@ -337,7 +379,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
     public StringBuilder getAsStringBuilder(int index) {
 
-      final int offsetIndex = index * ${type.width};
+      final int offsetIndex = index * VALUE_WIDTH;
 
       int months  = data.getInt(offsetIndex);
       final int days    = data.getInt(offsetIndex + ${minor.daysOffset});
@@ -372,14 +414,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
     <#elseif (minor.class == "IntervalDay")>
     public void get(int index, ${minor.class}Holder holder){
-
-      final int offsetIndex = index * ${type.width};
+      final int offsetIndex = index * VALUE_WIDTH;
       holder.days = data.getInt(offsetIndex);
       holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset});
     }
 
     public void get(int index, Nullable${minor.class}Holder holder){
-      final int offsetIndex = index * ${type.width};
+      final int offsetIndex = index * VALUE_WIDTH;
       holder.isSet = 1;
       holder.days = data.getInt(offsetIndex);
       holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset});
@@ -387,7 +428,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
     @Override
     public ${friendlyType} getObject(int index) {
-      final int offsetIndex = index * ${type.width};
+      final int offsetIndex = index * VALUE_WIDTH;
       final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
       final int  days   = data.getInt(offsetIndex);
       final Period p = new Period();
@@ -395,7 +436,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
 
     public StringBuilder getAsStringBuilder(int index) {
-      final int offsetIndex = index * ${type.width};
+      final int offsetIndex = index * VALUE_WIDTH;
 
       int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
       final int  days   = data.getInt(offsetIndex);
@@ -419,10 +460,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
               append(millis));
     }
 
-    <#elseif (minor.class == "Decimal28Sparse") || (minor.class == "Decimal38Sparse") || (minor.class == "Decimal28Dense") || (minor.class == "Decimal38Dense")>
-
+    <#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense">
     public void get(int index, ${minor.class}Holder holder) {
-      holder.start = index * ${type.width};
+      holder.start = index * VALUE_WIDTH;
       holder.buffer = data;
       holder.scale = getField().getScale();
       holder.precision = getField().getPrecision();
@@ -430,7 +470,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
     public void get(int index, Nullable${minor.class}Holder holder) {
       holder.isSet = 1;
-      holder.start = index * ${type.width};
+      holder.start = index * VALUE_WIDTH;
       holder.buffer = data;
       holder.scale = getField().getScale();
       holder.precision = getField().getPrecision();
@@ -440,65 +480,61 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     public ${friendlyType} getObject(int index) {
       <#if (minor.class == "Decimal28Sparse") || (minor.class == "Decimal38Sparse")>
       // Get the BigDecimal object
-      return DecimalUtility.getBigDecimalFromSparse(data, index * ${type.width}, ${minor.nDecimalDigits}, getField().getScale());
+      return DecimalUtility.getBigDecimalFromSparse(data, index * VALUE_WIDTH, ${minor.nDecimalDigits}, getField().getScale());
       <#else>
-      return DecimalUtility.getBigDecimalFromDense(data, index * ${type.width}, ${minor.nDecimalDigits}, getField().getScale(), ${minor.maxPrecisionDigits}, ${type.width});
+      return DecimalUtility.getBigDecimalFromDense(data, index * VALUE_WIDTH, ${minor.nDecimalDigits}, getField().getScale(), ${minor.maxPrecisionDigits}, VALUE_WIDTH);
       </#if>
     }
 
     <#else>
     public void get(int index, ${minor.class}Holder holder){
       holder.buffer = data;
-      holder.start = index * ${type.width};
+      holder.start = index * VALUE_WIDTH;
     }
 
     public void get(int index, Nullable${minor.class}Holder holder){
       holder.isSet = 1;
       holder.buffer = data;
-      holder.start = index * ${type.width};
+      holder.start = index * VALUE_WIDTH;
     }
 
     @Override
     public ${friendlyType} getObject(int index) {
-      return data.slice(index * ${type.width}, ${type.width})
+      return data.slice(index * VALUE_WIDTH, VALUE_WIDTH)
     }
 
     </#if>
     <#else> <#-- type.width <= 8 -->
-
     public ${minor.javaType!type.javaType} get(int index) {
-      return data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      return data.get${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH);
     }
 
     <#if type.width == 4>
     public long getTwoAsLong(int index) {
-      return data.getLong(index * ${type.width});
+      return data.getLong(index * VALUE_WIDTH);
     }
 
     </#if>
-
     <#if minor.class == "Date">
     @Override
     public ${friendlyType} getObject(int index) {
-        org.joda.time.DateTime date = new org.joda.time.DateTime(get(index), org.joda.time.DateTimeZone.UTC);
-        date = date.withZoneRetainFields(org.joda.time.DateTimeZone.getDefault());
-        return date;
+      org.joda.time.DateTime date = new org.joda.time.DateTime(get(index), org.joda.time.DateTimeZone.UTC);
+      date = date.withZoneRetainFields(org.joda.time.DateTimeZone.getDefault());
+      return date;
     }
 
     <#elseif minor.class == "TimeStamp">
     @Override
     public ${friendlyType} getObject(int index) {
-        org.joda.time.DateTime date = new org.joda.time.DateTime(get(index), org.joda.time.DateTimeZone.UTC);
-        date = date.withZoneRetainFields(org.joda.time.DateTimeZone.getDefault());
-        return date;
+      org.joda.time.DateTime date = new org.joda.time.DateTime(get(index), org.joda.time.DateTimeZone.UTC);
+      date = date.withZoneRetainFields(org.joda.time.DateTimeZone.getDefault());
+      return date;
     }
 
     <#elseif minor.class == "IntervalYear">
     @Override
     public ${friendlyType} getObject(int index) {
-
       final int value = get(index);
-
       final int years  = (value / org.apache.drill.exec.expr.fn.impl.DateUtility.yearsToMonths);
       final int months = (value % org.apache.drill.exec.expr.fn.impl.DateUtility.yearsToMonths);
       final Period p = new Period();
@@ -523,18 +559,16 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     <#elseif minor.class == "Time">
     @Override
     public DateTime getObject(int index) {
-
-        org.joda.time.DateTime time = new org.joda.time.DateTime(get(index), org.joda.time.DateTimeZone.UTC);
-        time = time.withZoneRetainFields(org.joda.time.DateTimeZone.getDefault());
-        return time;
+      org.joda.time.DateTime time = new org.joda.time.DateTime(get(index), org.joda.time.DateTimeZone.UTC);
+      time = time.withZoneRetainFields(org.joda.time.DateTimeZone.getDefault());
+      return time;
     }
 
     <#elseif minor.class == "Decimal9" || minor.class == "Decimal18">
     @Override
     public ${friendlyType} getObject(int index) {
-
-        final BigInteger value = BigInteger.valueOf(((${type.boxedType})get(index)).${type.javaType}Value());
-        return new BigDecimal(value, getField().getScale());
+      final BigInteger value = BigInteger.valueOf(((${type.boxedType})get(index)).${type.javaType}Value());
+      return new BigDecimal(value, getField().getScale());
     }
 
     <#else>
@@ -546,191 +580,298 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     public ${minor.javaType!type.javaType} getPrimitiveObject(int index) {
       return get(index);
     }
-    </#if>
 
+    </#if>
     public void get(int index, ${minor.class}Holder holder){
       <#if minor.class.startsWith("Decimal")>
       holder.scale = getField().getScale();
       holder.precision = getField().getPrecision();
       </#if>
 
-      holder.value = data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      holder.value = data.get${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH);
     }
 
     public void get(int index, Nullable${minor.class}Holder holder){
       holder.isSet = 1;
-      holder.value = data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      holder.value = data.get${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH);
     }
     </#if> <#-- type.width -->
- }
-
- /**
-  * ${minor.class}.Mutator implements a mutable vector of fixed width values.  Elements in the
-  * vector are accessed by position from the logical start of the vector.  Values should be pushed
-  * onto the vector sequentially, but may be randomly accessed.
-  *   The width of each element is ${type.width} byte(s)
-  *   The equivalent Java primitive is '${minor.javaType!type.javaType}'
-  *
-  * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
-  */
-  public final class Mutator extends BaseDataValueVector.BaseMutator {
-
-    private Mutator(){};
-   /**
-    * Set the element at the given index to the given value.  Note that widths smaller than
-    * 32 bits are handled by the DrillBuf interface.
-    *
-    * @param index   position of the bit to set
-    * @param value   value to set
-    */
+  }
+
+  /**
+   * ${minor.class}.Mutator implements a mutable vector of fixed width values.  Elements in the
+   * vector are accessed by position from the logical start of the vector.  Values should be pushed
+   * onto the vector sequentially, but may be randomly accessed.
+   * <ul>
+   * <li>The width of each element is {@link #VALUE_WIDTH} (= ${type.width}) byte(s).</li>
+   * <li>The equivalent Java primitive is '${minor.javaType!type.javaType}'</li>
+   * </ul>
+   *
+   * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
+   */
+   public final class Mutator extends BaseDataValueVector.BaseMutator {
+
+    private Mutator() {};
+
+    /**
+     * Set the element at the given index to the given value.  Note that widths smaller than
+     * 32 bits are handled by the DrillBuf interface.
+     *
+     * @param index   position of the bit to set
+     * @param value   value to set
+     */
+
   <#if (type.width > 8)>
     public void set(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
-      data.setBytes(index * ${type.width}, value, 0, ${type.width});
+      data.setBytes(index * VALUE_WIDTH, value, 0, VALUE_WIDTH);
     }
 
     public void setSafe(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
       while(index >= getValueCapacity()) {
         reAlloc();
       }
-      data.setBytes(index * ${type.width}, value, 0, ${type.width});
+      data.setBytes(index * VALUE_WIDTH, value, 0, VALUE_WIDTH);
     }
 
-  <#if (minor.class == "Interval")>
-    public void set(int index, int months, int days, int milliseconds){
-      final int offsetIndex = index * ${type.width};
-      data.setInt(offsetIndex, months);
-      data.setInt((offsetIndex + ${minor.daysOffset}), days);
-      data.setInt((offsetIndex + ${minor.millisecondsOffset}), milliseconds);
+    /**
+     * Set the value of a required or nullable vector. Enforces the value
+     * and size limits.
+     * @param index item to write
+     * @param value value to set
+     * @throws VectorOverflowException if the item was written, false if the index would
+     * overfill the vector
+     */
+
+    public void setScalar(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) throws VectorOverflowException {
+      if (index >= MAX_SCALAR_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, value);
     }
 
-    protected void set(int index, ${minor.class}Holder holder){
-      set(index, holder.months, holder.days, holder.milliseconds);
+    /**
+     * Set the value of a repeated vector. Enforces only the size limit.
+     * @param index item to write
+     * @param value value to set
+     * @throws VectorOverflowException if the item was written, false if the index would
+     * overfill the vector
+     */
+
+    public void setArrayItem(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) throws VectorOverflowException {
+      if (index >= MAX_VALUE_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, value);
     }
 
-    protected void set(int index, Nullable${minor.class}Holder holder){
-      set(index, holder.months, holder.days, holder.milliseconds);
+    <#if minor.class == "Interval">
+    public void set(int index, int months, int days, int milliseconds) {
+      final int offsetIndex = index * VALUE_WIDTH;
+      data.setInt(offsetIndex, months);
+      data.setInt((offsetIndex + ${minor.daysOffset}), days);
+      data.setInt((offsetIndex + ${minor.millisecondsOffset}), milliseconds);
     }
 
-    public void setSafe(int index, int months, int days, int milliseconds){
+    public void setSafe(int index, int months, int days, int milliseconds) {
       while(index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, months, days, milliseconds);
     }
 
-    public void setSafe(int index, Nullable${minor.class}Holder holder){
+    public void setScalar(int index, int months, int days, int milliseconds) throws VectorOverflowException {
+      if (index >= MAX_SCALAR_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, months, days, milliseconds);
+    }
+
+    public void setArrayItem(int index, int months, int days, int milliseconds) throws VectorOverflowException {
+      if (index >= MAX_VALUE_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, months, days, milliseconds);
+    }
+
+    protected void set(int index, ${minor.class}Holder holder) {
+      set(index, holder.months, holder.days, holder.milliseconds);
+    }
+
+    public void setSafe(int index, ${minor.class}Holder holder) {
       setSafe(index, holder.months, holder.days, holder.milliseconds);
     }
 
-    public void setSafe(int index, ${minor.class}Holder holder){
+    public void setScalar(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      setScalar(index, holder.months, holder.days, holder.milliseconds);
+    }
+
+    public void setArrayItem(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      setArrayItem(index, holder.months, holder.days, holder.milliseconds);
+    }
+
+    protected void set(int index, Nullable${minor.class}Holder holder) {
+      set(index, holder.months, holder.days, holder.milliseconds);
+    }
+
+    public void setSafe(int index, Nullable${minor.class}Holder holder) {
       setSafe(index, holder.months, holder.days, holder.milliseconds);
     }
 
-  <#elseif (minor.class == "IntervalDay")>
-    public void set(int index, int days, int milliseconds){
-      final int offsetIndex = index * ${type.width};
-      data.setInt(offsetIndex, days);
-      data.setInt((offsetIndex + ${minor.millisecondsOffset}), milliseconds);
+    public void setScalar(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      setScalar(index, holder.months, holder.days, holder.milliseconds);
     }
 
-    protected void set(int index, ${minor.class}Holder holder){
-      set(index, holder.days, holder.milliseconds);
+    public void setArrayItem(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      setArrayItem(index, holder.months, holder.days, holder.milliseconds);
     }
 
-    protected void set(int index, Nullable${minor.class}Holder holder){
-      set(index, holder.days, holder.milliseconds);
+    <#elseif minor.class == "IntervalDay">
+    public void set(int index, int days, int milliseconds) {
+      final int offsetIndex = index * VALUE_WIDTH;
+      data.setInt(offsetIndex, days);
+      data.setInt((offsetIndex + ${minor.millisecondsOffset}), milliseconds);
     }
 
-    public void setSafe(int index, int days, int milliseconds){
+    public void setSafe(int index, int days, int milliseconds) {
       while(index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, days, milliseconds);
     }
 
-    public void setSafe(int index, ${minor.class}Holder holder){
-      setSafe(index, holder.days, holder.milliseconds);
+    public void setScalar(int index, int days, int milliseconds) throws VectorOverflowException {
+      if (index >= MAX_SCALAR_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, days, milliseconds);
     }
 
-    public void setSafe(int index, Nullable${minor.class}Holder holder){
+    public void setArrayItem(int index, int days, int milliseconds) throws VectorOverflowException {
+      if (index >= MAX_VALUE_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, days, milliseconds);
+    }
+
+    protected void set(int index, ${minor.class}Holder holder) {
+      set(index, holder.days, holder.milliseconds);
+    }
+
+    public void setSafe(int index, ${minor.class}Holder holder) {
       setSafe(index, holder.days, holder.milliseconds);
     }
 
-  <#elseif (minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse") || (minor.class == "Decimal28Dense") || (minor.class == "Decimal38Dense")>
-    public void set(int index, ${minor.class}Holder holder){
-      set(index, holder.start, holder.buffer);
+    public void setScalar(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      setScalar(index, holder.days, holder.milliseconds);
     }
 
-    void set(int index, Nullable${minor.class}Holder holder){
-      set(index, holder.start, holder.buffer);
+    public void setArrayItem(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      setArrayItem(index, holder.days, holder.milliseconds);
     }
 
-    public void setSafe(int index,  Nullable${minor.class}Holder holder){
-      setSafe(index, holder.start, holder.buffer);
+    protected void set(int index, Nullable${minor.class}Holder holder) {
+      set(index, holder.days, holder.milliseconds);
     }
 
-    public void setSafe(int index,  ${minor.class}Holder holder){
-      setSafe(index, holder.start, holder.buffer);
+    public void setSafe(int index, Nullable${minor.class}Holder holder){
+      setSafe(index, holder.days, holder.milliseconds);
+    }
+
+    public void setScalar(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      setScalar(index, holder.days, holder.milliseconds);
     }
 
-    public void setSafe(int index, int start, DrillBuf buffer){
+    public void setArrayItem(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      setArrayItem(index, holder.days, holder.milliseconds);
+    }
+
+    <#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense">
+    public void setSafe(int index, int start, DrillBuf buffer) {
       while(index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, start, buffer);
     }
 
-  <#if minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse">
-    public void set(int index, BigDecimal value) {
-      DecimalUtility.getSparseFromBigDecimal(value, data, index * ${type.width},
-           field.getScale(), field.getPrecision(), ${minor.nDecimalDigits});
+    public void setScalar(int index, int start, DrillBuf buffer) throws VectorOverflowException {
+      if (index >= MAX_SCALAR_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, start, buffer);
     }
 
-    public void setSafe(int index, BigDecimal value) {
-      while(index >= getValueCapacity()) {
-        reAlloc();
+    public void setArrayItem(int index, int start, DrillBuf buffer) throws VectorOverflowException {
+      if (index >= MAX_VALUE_COUNT) {
+        throw new VectorOverflowException();
       }
-      set(index, value);
+      setSafe(index, start, buffer);
     }
 
-  </#if>
-    public void set(int index, int start, DrillBuf buffer){
-      data.setBytes(index * ${type.width}, buffer, start, ${type.width});
+    public void set(int index, ${minor.class}Holder holder) {
+      set(index, holder.start, holder.buffer);
     }
 
-  <#else>
-    protected void set(int index, ${minor.class}Holder holder){
-      set(index, holder.start, holder.buffer);
+    public void setSafe(int index, ${minor.class}Holder holder) {
+      setSafe(index, holder.start, holder.buffer);
     }
 
-    public void set(int index, Nullable${minor.class}Holder holder){
-      set(index, holder.start, holder.buffer);
+    public void setScalar(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      setScalar(index, holder.start, holder.buffer);
     }
 
-    public void set(int index, int start, DrillBuf buffer){
-      data.setBytes(index * ${type.width}, buffer, start, ${type.width});
+    public void setArrayItem(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      setArrayItem(index, holder.start, holder.buffer);
     }
 
-    public void setSafe(int index, ${minor.class}Holder holder){
-      setSafe(index, holder.start, holder.buffer);
+    void set(int index, Nullable${minor.class}Holder holder) {
+      set(index, holder.start, holder.buffer);
     }
 
-    public void setSafe(int index, Nullable${minor.class}Holder holder){
+    public void setSafe(int index, Nullable${minor.class}Holder holder) {
       setSafe(index, holder.start, holder.buffer);
     }
 
-    public void setSafe(int index, int start, DrillBuf buffer){
+    public void setScalar(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      setScalar(index, holder.start, holder.buffer);
+    }
+
+    public void setArrayItem(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      setArrayItem(index, holder.start, holder.buffer);
+    }
+
+      <#if minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse">
+    public void set(int index, BigDecimal value) {
+      DecimalUtility.getSparseFromBigDecimal(value, data, index * VALUE_WIDTH,
+           field.getScale(), field.getPrecision(), ${minor.nDecimalDigits});
+    }
+
+    public void setSafe(int index, BigDecimal value) {
       while(index >= getValueCapacity()) {
         reAlloc();
       }
-      set(index, holder);
+      set(index, value);
     }
 
-    public void set(int index, Nullable${minor.class}Holder holder){
-      data.setBytes(index * ${type.width}, holder.buffer, holder.start, ${type.width});
+    public void setScalar(int index, BigDecimal value) throws VectorOverflowException {
+      if (index >= MAX_SCALAR_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, value);
     }
-   </#if>
 
+    public void setArrayItem(int index, BigDecimal value) throws VectorOverflowException {
+      if (index >= MAX_VALUE_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, value);
+    }
+
+      </#if>
+    public void set(int index, int start, DrillBuf buffer){
+      data.setBytes(index * VALUE_WIDTH, buffer, start, VALUE_WIDTH);
+    }
+
+    </#if>
     @Override
     public void generateTestData(int count) {
       setValueCount(count);
@@ -738,46 +879,105 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
       final int valueCount = getAccessor().getValueCount();
       for(int i = 0; i < valueCount; i++, even = !even) {
         final byte b = even ? Byte.MIN_VALUE : Byte.MAX_VALUE;
-        for(int w = 0; w < ${type.width}; w++){
+        for(int w = 0; w < VALUE_WIDTH; w++){
           data.setByte(i + w, b);
         }
       }
     }
 
-   <#else> <#-- type.width <= 8 -->
+  <#else> <#-- type.width <= 8 -->
     public void set(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
-      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, value);
+      data.set${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH, value);
     }
 
-   public void setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
-     while(index >= getValueCapacity()) {
+    public void setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
+      while(index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, value);
     }
 
-    protected void set(int index, ${minor.class}Holder holder){
-      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
+    /**
+     * Set the value of a required or nullable vector. Enforces the value
+     * and size limits.
+     * @param index item to write
+     * @param value value to set
+     * @throws VectorOverflowException if the item was written, false if the index would
+     * overfill the vector
+     */
+
+    public void setScalar(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) throws VectorOverflowException {
+      if (index >= MAX_SCALAR_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, value);
+    }
+
+    /**
+     * Set the value of a repeated vector. Enforces only the size limit.
+     * @param index item to write
+     * @param value value to set
+     * @throws VectorOverflowException if the item was written, false if the index would
+     * overfill the vector
+     */
+
+    public void setArrayItem(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) throws VectorOverflowException {
+      if (index >= MAX_VALUE_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, value);
+    }
+
+    protected void set(int index, ${minor.class}Holder holder) {
+      data.set${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH, holder.value);
     }
 
-    public void setSafe(int index, ${minor.class}Holder holder){
+    public void setSafe(int index, ${minor.class}Holder holder) {
       while(index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, holder);
     }
 
-    protected void set(int index, Nullable${minor.class}Holder holder){
-      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
+    public void setScalar(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      if (index >= MAX_SCALAR_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, holder);
     }
 
-    public void setSafe(int index, Nullable${minor.class}Holder holder){
+    public void setArrayItem(int index, ${minor.class}Holder holder) throws VectorOverflowException {
+      if (index >= MAX_VALUE_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, holder);
+    }
+
+    protected void set(int index, Nullable${minor.class}Holder holder) {
+      data.set${(minor.javaType!type.javaType)?cap_first}(index * VALUE_WIDTH, holder.value);
+    }
+
+    public void setSafe(int index, Nullable${minor.class}Holder holder) {
       while(index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, holder);
     }
 
+    public void setScalar(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      if (index >= MAX_SCALAR_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, holder);
+    }
+
+    public void setArrayItem(int index, Nullable${minor.class}Holder holder) throws VectorOverflowException {
+      if (index >= MAX_VALUE_COUNT) {
+        throw new VectorOverflowException();
+      }
+      setSafe(index, holder);
+    }
+
     @Override
     public void generateTestData(int size) {
       setValueCount(size);
@@ -806,10 +1006,34 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
 
   </#if> <#-- type.width -->
+    /**
+     * Backfill missing offsets from the given last written position to the
+     * given current write position. Used by the "new" size-safe column
+     * writers to allow skipping values. The <tt>set()</tt> and <tt>setSafe()</tt>
+     * <b>do not</b> fill empties. See DRILL-5529 and DRILL-5530.
+     * @param lastWrite the position of the last valid write: the offset
+     * to be copied forward
+     * @param index the current write position filling occurs up to,
+     * but not including, this position
+     * @throws VectorOverflowException if the item was written, false if the index would
+     * overfill the vector
+     */
+
+    public void fillEmptiesBounded(int lastWrite, int index)
+            throws VectorOverflowException {
+  <#if type.width <= 8>
+      for (int i = lastWrite + 1; i <= index; i++) {
+        setSafe(i, <#if (type.width >= 4)>(${minor.javaType!type.javaType})</#if> 0);
+      }
+  <#else>
+      throw new UnsupportedOperationException("Cannot zero-fill ${minor.class} vectors.");
+  </#if>
+    }
+
     @Override
     public void setValueCount(int valueCount) {
       final int currentValueCapacity = getValueCapacity();
-      final int idx = (${type.width} * valueCount);
+      final int idx = (VALUE_WIDTH * valueCount);
       while(valueCount > getValueCapacity()) {
         reAlloc();
       }
@@ -819,11 +1043,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
         allocationMonitor = 0;
       }
       VectorTrimmer.trim(data, idx);
-      data.writerIndex(valueCount * ${type.width});
+      data.writerIndex(valueCount * VALUE_WIDTH);
     }
   }
 }
-
 </#if> <#-- type.major -->
 </#list>
 </#list>


[3/3] drill git commit: DRILL-5518: Test framework enhancements

Posted by pr...@apache.org.
DRILL-5518: Test framework enhancements

* Create a SubOperatorTest base class to do routine setup and shutdown.
* Additional methods to simplify creating complex schemas with field
widths.
* Define a test workspace with plugin-specific options (as for the CSV
storage plugin)
* When verifying row sets, add methods to verify and release just the
"actual" batch in addition to the existing method for verify and free
both the actual and expected batches.
* Allow reading of row set values as object for generic comparisons.
* "Column builder" within schema builder to simplify building a single
MatrializedField for tests.
* Misc. code cleanup.

closes #851


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/63e24337
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/63e24337
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/63e24337

Branch: refs/heads/master
Commit: 63e243378f3be125f1e8bfb52c74b8211c87bfc3
Parents: 92c9304
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue May 16 15:55:41 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 11:41:20 2017 -0700

----------------------------------------------------------------------
 .../exec/ops/AbstractOperatorExecContext.java   |  17 +--
 .../drill/exec/record/VectorContainer.java      |   2 -
 .../exec/cache/TestBatchSerialization.java      |   2 +-
 .../impl/xsort/managed/SortTestUtilities.java   |   2 +-
 .../physical/impl/xsort/managed/TestSorter.java |  13 +--
 .../drill/exec/record/TestVectorContainer.java  |   2 +-
 .../exec/store/easy/text/compliant/TestCsv.java |  10 +-
 .../org/apache/drill/test/ClientFixture.java    |   6 ++
 .../java/org/apache/drill/test/ClusterTest.java |   1 -
 .../org/apache/drill/test/OperatorFixture.java  |   9 +-
 .../org/apache/drill/test/SubOperatorTest.java  |  36 +++++++
 .../apache/drill/test/rowSet/RowSetBuilder.java |  33 ++++++
 .../drill/test/rowSet/RowSetComparison.java     |  18 +++-
 .../apache/drill/test/rowSet/SchemaBuilder.java |  98 +++++++++++++++--
 .../drill/test/rowSet/test/RowSetTest.java      | 108 +++++++++++++------
 .../exec/vector/accessor/ColumnReader.java      |   1 +
 .../accessor/impl/AbstractColumnReader.java     |  28 +++++
 .../vector/accessor/impl/TupleReaderImpl.java   |  73 ++++++++++---
 18 files changed, 367 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
index a517fdf..ebef55c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
@@ -36,15 +36,16 @@ public class AbstractOperatorExecContext implements OperatorExecContext {
   protected final ExecutionControls executionControls;
   protected final PhysicalOperator popConfig;
   protected final BufferManager manager;
-  protected final OperatorStatReceiver statsWriter;
+  protected OperatorStatReceiver statsWriter;
 
   public AbstractOperatorExecContext(BufferAllocator allocator, PhysicalOperator popConfig,
                                      ExecutionControls executionControls,
                                      OperatorStatReceiver stats) {
     this.allocator = allocator;
     this.popConfig = popConfig;
-    manager = new BufferManagerImpl(allocator);
+    this.manager = new BufferManagerImpl(allocator);
     statsWriter = stats;
+
     this.executionControls = executionControls;
   }
 
@@ -64,10 +65,9 @@ public class AbstractOperatorExecContext implements OperatorExecContext {
   }
 
   @Override
-  public ExecutionControls getExecutionControls() { return executionControls; }
-
-  @Override
-  public OperatorStatReceiver getStatsWriter() { return statsWriter; }
+  public ExecutionControls getExecutionControls() {
+    return executionControls;
+  }
 
   @Override
   public BufferAllocator getAllocator() {
@@ -87,4 +87,9 @@ public class AbstractOperatorExecContext implements OperatorExecContext {
       }
     }
   }
+
+  @Override
+  public OperatorStatReceiver getStatsWriter() {
+    return statsWriter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 54a04bc..99353ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -41,7 +41,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 public class VectorContainer implements VectorAccessible {
-  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
 
   protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
   private BatchSchema schema;
@@ -148,7 +147,6 @@ public class VectorContainer implements VectorAccessible {
         return (T) newVector;
       }
     } else {
-
       vector = TypeHelper.getNewVector(field, this.getAllocator(), callBack);
       add(vector);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
index 363c08c..05670c5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -137,7 +137,7 @@ public class TestBatchSerialization extends DrillTest {
 
     assertTrue(origSize >= result.size());
     new RowSetComparison(expected)
-      .verifyAndClear(result);
+      .verifyAndClearAll(result);
     outFile.delete();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
index 034da2c..1a4d4b2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
@@ -123,7 +123,7 @@ public class SortTestUtilities {
         assertTrue(merger.next());
         RowSet rowSet = new DirectRowSet(fixture.allocator(), dest);
         new RowSetComparison(expectedSet)
-              .verifyAndClear(rowSet);
+              .verifyAndClearAll(rowSet);
       }
       assertFalse(merger.next());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
index dd371d7..9da8968 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
@@ -85,7 +85,7 @@ public class TestSorter extends DrillTest {
     sorter.sortBatch(rowSet.container(), rowSet.getSv2());
 
     new RowSetComparison(expected)
-        .verifyAndClear(rowSet);
+        .verifyAndClearAll(rowSet);
     sorter.close();
   }
 
@@ -219,16 +219,12 @@ public class TestSorter extends DrillTest {
       DataItem expected[] = Arrays.copyOf(data, data.length);
       doSort(expected);
       RowSet expectedRows = makeDataSet(actual.allocator(), actual.schema().batch(), expected);
-//      System.out.println("Expected:");
-//      expectedRows.print();
-//      System.out.println("Actual:");
-//      actual.print();
       doVerify(expected, expectedRows, actual);
     }
 
     protected void doVerify(DataItem[] expected, RowSet expectedRows, RowSet actual) {
       new RowSetComparison(expectedRows)
-            .verifyAndClear(actual);
+            .verifyAndClearAll(actual);
     }
 
     protected abstract void doSort(DataItem[] expected);
@@ -300,7 +296,7 @@ public class TestSorter extends DrillTest {
             .offset(offset)
             .span(nullCount)
             .withMask(true, false)
-            .verifyAndClear(actual);
+            .verifyAndClearAll(actual);
     }
   }
 
@@ -370,7 +366,7 @@ public class TestSorter extends DrillTest {
         int mo = rand.nextInt(12);
         int yr = rand.nextInt(10);
         Period period = makePeriod(yr, mo, day, hr, min, sec, ms);
-         builder.add(period);
+        builder.add(period);
       }
       return builder.build();
     }
@@ -385,7 +381,6 @@ public class TestSorter extends DrillTest {
       long prevMs = 0;
       while (reader.next()) {
         Period period = reader.column(0).getPeriod().normalizedStandard();
-//        System.out.println(period);
         int years = period.getYears();
         assertTrue(prevYears <= years);
         if (prevYears != years) {

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
index d7a59bf..930e2a8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java
@@ -108,7 +108,7 @@ public class TestVectorContainer extends DrillTest {
     // Merge containers via row set facade
 
     RowSet mergedRs = left.merge(right);
-    comparison.verifyAndClear(mergedRs);
+    comparison.verifyAndClearAll(mergedRs);
 
     // Add a selection vector. Merging is forbidden, in the present code,
     // for batches that have a selection vector.

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
index c18adc9..b7bc9fd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
@@ -98,13 +98,11 @@ public class TestCsv extends ClusterTest {
         .add("b", MinorType.VARCHAR)
         .add("c", MinorType.VARCHAR)
         .build();
-    assertEquals(expectedSchema, actual.batchSchema());
-
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
         .add("10", "foo", "bar")
         .build();
     new RowSetComparison(expected)
-      .verifyAndClear(actual);
+      .verifyAndClearAll(actual);
   }
 
   String invalidHeaders[] = {
@@ -126,13 +124,11 @@ public class TestCsv extends ClusterTest {
         .add("c_2", MinorType.VARCHAR)
         .add("c_2_2", MinorType.VARCHAR)
         .build();
-    assertEquals(expectedSchema, actual.batchSchema());
-
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
         .add("10", "foo", "bar", "fourth", "fifth", "sixth")
         .build();
     new RowSetComparison(expected)
-      .verifyAndClear(actual);
+      .verifyAndClearAll(actual);
   }
 
   // Test fix for DRILL-5590
@@ -154,7 +150,7 @@ public class TestCsv extends ClusterTest {
         .add("10", "foo", "bar")
         .build();
     new RowSetComparison(expected)
-      .verifyAndClear(actual);
+      .verifyAndClearAll(actual);
   }
 
   private String makeStatement(String fileName) {

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index a63a287..5a06ec2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -27,12 +27,14 @@ import org.apache.drill.TestBuilder;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.testing.Controls;
 import org.apache.drill.exec.testing.ControlsInjectionUtil;
 import org.apache.drill.test.ClusterFixture.FixtureTestServices;
 import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.apache.drill.test.rowSet.RowSetBuilder;
 
 /**
  * Represents a Drill client. Provides many useful test-specific operations such
@@ -225,4 +227,8 @@ public class ClientFixture implements AutoCloseable {
     ControlsInjectionUtil.validateControlsString(controls);
     alterSession(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls);
   }
+
+  public RowSetBuilder rowSetBuilder(BatchSchema schema) {
+    return new RowSetBuilder(allocator(), schema);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
index 62beedd..e204fde 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
@@ -118,5 +118,4 @@ public class ClusterTest extends DrillTest {
   public QueryBuilder queryBuilder( ) {
     return client.queryBuilder();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index 2c72c3c..976812c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -32,10 +32,12 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.ops.AbstractOperatorExecContext;
 import org.apache.drill.exec.ops.FragmentExecContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperExecContext;
 import org.apache.drill.exec.ops.OperExecContextImpl;
+import org.apache.drill.exec.ops.OperatorExecContext;
 import org.apache.drill.exec.ops.OperatorStatReceiver;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema;
@@ -50,7 +52,6 @@ import org.apache.drill.test.rowSet.HyperRowSetImpl;
 import org.apache.drill.test.rowSet.IndirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 
 /**
@@ -222,7 +223,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
   }
 
   /**
-   * Implements a write-only version of the stats collector for use by opeators,
+   * Implements a write-only version of the stats collector for use by operators,
    * then provides simplified test-time accessors to get the stats values when
    * validating code in tests.
    */
@@ -328,4 +329,8 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
       throw new IllegalStateException( "Unexpected selection mode" );
     }
   }
+
+  public OperatorExecContext operatorContext(PhysicalOperator config) {
+    return new AbstractOperatorExecContext(allocator(), config, context.getExecutionControls(), stats);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
new file mode 100644
index 0000000..6bc2afc
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class SubOperatorTest extends DrillTest {
+
+  protected static OperatorFixture fixture;
+
+  @BeforeClass
+  public static void classSetup() throws Exception {
+    fixture = OperatorFixture.standardFixture();
+  }
+
+  @AfterClass
+  public static void classTeardown() throws Exception {
+    fixture.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
index 74e9356..80e8ae4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetBuilder.java
@@ -56,6 +56,8 @@ public final class RowSetBuilder {
    * <tt>add(10, new int[] {100, 200});</tt><br>
    * @param values column values in column index order
    * @return this builder
+   * @see {@link #addSingleCol(Object)} to create a row of a single column when
+   * the value to <tt>add()</tt> is ambiguous
    */
 
   public RowSetBuilder add(Object...values) {
@@ -64,6 +66,37 @@ public final class RowSetBuilder {
   }
 
   /**
+   * The {@link #add(Object...)} method uses Java variable-length arguments to
+   * pass a row of values. But, when the row consists of a single array, Java
+   * gets confused: is that an array for variable-arguments or is it the value
+   * of the first argument? This method clearly states that the single value
+   * (including an array) is meant to be the value of the first (and only)
+   * column.
+   * <p>
+   * Examples:<code><pre>
+   *     RowSetBuilder twoColsBuilder = ...
+   *     // Fine, second item is an array of strings for a repeated Varchar
+   *     // column.
+   *     twoColsBuilder.add("First", new String[] {"a", "b", "c"});
+   *     ...
+   *     RowSetBuilder oneColBuilder = ...
+   *     // Ambiguous: is this a varargs array of three items?
+   *     // That is how Java will perceive it.
+   *     oneColBuilder.add(new String[] {"a", "b", "c"});
+   *     // Unambiguous: this is a single column value for the
+   *     // repeated Varchar column.
+   *     oneColBuilder.addSingleCol(new String[] {"a", "b", "c"});
+   * </pre></code>
+   * @param value value of the first column, which may be an array for a
+   * repeated column
+   * @return this builder
+   */
+
+  public RowSetBuilder addSingleCol(Object value) {
+    return add(new Object[] { value });
+  }
+
+  /**
    * Build the row set with a selection vector 2. The SV2 is
    * initialized to have a 1:1 index to the rows: SV2 0 points
    * to row 1, SV2 position 1 points to row 1 and so on.

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
index 3ba7471..ea50074 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetComparison.java
@@ -138,7 +138,7 @@ public class RowSetComparison {
 
   /**
    * Convenience method to verify the actual results, then free memory
-   * for both the expected and actual result sets.
+   * for the actual result sets.
    * @param actual the actual results to verify
    */
 
@@ -146,6 +146,20 @@ public class RowSetComparison {
     try {
       verify(actual);
     } finally {
+      actual.clear();
+    }
+  }
+
+  /**
+   * Convenience method to verify the actual results, then free memory
+   * for both the expected and actual result sets.
+   * @param actual the actual results to verify
+   */
+
+  public void verifyAndClearAll(RowSet actual) {
+    try {
+      verify(actual);
+    } finally {
       expected.clear();
       actual.clear();
     }
@@ -158,7 +172,7 @@ public class RowSetComparison {
       }
       ColumnReader ec = er.column(i);
       ColumnReader ac = ar.column(i);
-      String label = er.index() + ":" + i;
+      String label = (er.index() + 1) + ":" + i;
       assertEquals(label, ec.valueType(), ac.valueType());
       if (ec.isNull()) {
         assertTrue(label + " - column not null", ac.isNull());

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
index b946ab9..39b0128 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java
@@ -53,6 +53,47 @@ import org.apache.drill.exec.record.MaterializedField;
 public class SchemaBuilder {
 
   /**
+   * Build a column schema (AKA "materialized field") based on name and a
+   * variety of schema options. Every column needs a name and (minor) type,
+   * some may need a mode other than required, may need a width, may
+   * need scale and precision, and so on.
+   */
+
+  // TODO: Add map methods
+
+  public static class ColumnBuilder {
+    private final String name;
+    private final MajorType.Builder typeBuilder;
+
+    public ColumnBuilder(String name, MinorType type) {
+      this.name = name;
+      typeBuilder = MajorType.newBuilder()
+          .setMinorType(type)
+          .setMode(DataMode.REQUIRED);
+    }
+
+    public ColumnBuilder setMode(DataMode mode) {
+      typeBuilder.setMode(mode);
+      return this;
+    }
+
+    public ColumnBuilder setWidth(int width) {
+      typeBuilder.setPrecision(width);
+      return this;
+    }
+
+    public ColumnBuilder setScale(int scale, int precision) {
+      typeBuilder.setScale(scale);
+      typeBuilder.setPrecision(precision);
+      return this;
+    }
+
+    public MaterializedField build() {
+      return MaterializedField.create(name, typeBuilder.build());
+    }
+  }
+
+  /**
    * Internal structure for building a map. A map is just a schema,
    * but one that is part of a parent column.
    */
@@ -73,11 +114,7 @@ public class SchemaBuilder {
 
     @Override
     public SchemaBuilder buildMap() {
-      MaterializedField col = MaterializedField.create(memberName,
-          MajorType.newBuilder()
-            .setMinorType(MinorType.MAP)
-            .setMode(DataMode.REQUIRED)
-            .build());
+      MaterializedField col = columnSchema(memberName, MinorType.MAP, DataMode.REQUIRED);
       for (MaterializedField childCol : columns) {
         col.addChild(childCol);
       }
@@ -96,27 +133,68 @@ public class SchemaBuilder {
 
   public SchemaBuilder() { }
 
+  /**
+   * Create a new schema starting with the base schema. Allows appending
+   * additional columns to an additional schema.
+   */
+
+  public SchemaBuilder(BatchSchema baseSchema) {
+    for (MaterializedField field : baseSchema) {
+      add(field);
+    }
+  }
+
   public SchemaBuilder add(String pathName, MajorType type) {
-    MaterializedField col = MaterializedField.create(pathName, type);
+    return add(MaterializedField.create(pathName, type));
+  }
+
+  public SchemaBuilder add(MaterializedField col) {
     columns.add(col);
     return this;
   }
 
+  /**
+   * Create a column schema using the "basic three" properties of name, type and
+   * cardinality (AKA "data mode.") Use the {@link ColumnBuilder} for to set
+   * other schema attributes.
+   */
+
+  public static MaterializedField columnSchema(String pathName, MinorType type, DataMode mode) {
+    return MaterializedField.create(pathName,
+        MajorType.newBuilder()
+          .setMinorType(type)
+          .setMode(mode)
+          .build());
+  }
+
   public SchemaBuilder add(String pathName, MinorType type, DataMode mode) {
-    return add(pathName, MajorType.newBuilder()
-        .setMinorType(type)
-        .setMode(mode)
-        .build());
+    return add(columnSchema(pathName, type, mode));
   }
 
   public SchemaBuilder add(String pathName, MinorType type) {
     return add(pathName, type, DataMode.REQUIRED);
   }
 
+  public SchemaBuilder add(String pathName, MinorType type, int width) {
+    MaterializedField field = new SchemaBuilder.ColumnBuilder(pathName, type)
+        .setMode(DataMode.REQUIRED)
+        .setWidth(width)
+        .build();
+    return add(field);
+  }
+
   public SchemaBuilder addNullable(String pathName, MinorType type) {
     return add(pathName, type, DataMode.OPTIONAL);
   }
 
+  public SchemaBuilder addNullable(String pathName, MinorType type, int width) {
+    MaterializedField field = new SchemaBuilder.ColumnBuilder(pathName, type)
+        .setMode(DataMode.OPTIONAL)
+        .setWidth(width)
+        .build();
+    return add(field);
+  }
+
   public SchemaBuilder addArray(String pathName, MinorType type) {
     return add(pathName, type, DataMode.REPEATED);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/RowSetTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/RowSetTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/RowSetTest.java
index 8d9179b..03417ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/RowSetTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/RowSetTest.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.TupleAccessor.TupleSchema;
-import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
 import org.apache.drill.test.rowSet.RowSet.RowSetReader;
 import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
@@ -40,25 +40,11 @@ import org.apache.drill.test.rowSet.RowSetSchema;
 import org.apache.drill.test.rowSet.RowSetSchema.FlattenedSchema;
 import org.apache.drill.test.rowSet.RowSetSchema.PhysicalSchema;
 import org.apache.drill.test.rowSet.SchemaBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.base.Splitter;
 
-public class RowSetTest {
-
-  private static OperatorFixture fixture;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    fixture = OperatorFixture.standardFixture();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    fixture.close();
-  }
+public class RowSetTest extends SubOperatorTest {
 
   /**
    * Test a simple physical schema with no maps.
@@ -105,6 +91,17 @@ public class RowSetTest {
     assertEquals("b", physical.column(2).field().getName());
   }
 
+  /**
+   * Validate that the actual column metadata is as expected by
+   * cross-checking: validate that the column at the index and
+   * the column at the column name are both correct.
+   *
+   * @param schema the schema for the row set
+   * @param index column index
+   * @param fullName expected column name
+   * @param type expected type
+   */
+
   public void crossCheck(TupleSchema schema, int index, String fullName, MinorType type) {
     String name = null;
     for (String part : Splitter.on(".").split(fullName)) {
@@ -116,6 +113,10 @@ public class RowSetTest {
     assertEquals(type, schema.column(index).getType().getMinorType());
   }
 
+  /**
+   * Verify that a nested map schema works as expected.
+   */
+
   @Test
   public void testMapSchema() {
     BatchSchema batchSchema = new SchemaBuilder()
@@ -185,17 +186,13 @@ public class RowSetTest {
     assertEquals("a.e.f", eSchema.column(0).fullName());
   }
 
-  @Test
-  public void testScalarReaderWriter() {
-    testTinyIntRW();
-    testSmallIntRW();
-    testIntRW();
-    testLongRW();
-    testFloatRW();
-    testDoubleRW();
-  }
+  /**
+   * Verify that simple scalar (non-repeated) column readers
+   * and writers work as expected. This is for tiny ints.
+   */
 
-  private void testTinyIntRW() {
+  @Test
+  public void testTinyIntRW() {
     BatchSchema batchSchema = new SchemaBuilder()
         .add("col", MinorType.TINYINT)
         .build();
@@ -204,18 +201,21 @@ public class RowSetTest {
         .add(Byte.MAX_VALUE)
         .add(Byte.MIN_VALUE)
         .build();
+    assertEquals(3, rs.rowCount());
     RowSetReader reader = rs.reader();
     assertTrue(reader.next());
     assertEquals(0, reader.column(0).getInt());
     assertTrue(reader.next());
     assertEquals(Byte.MAX_VALUE, reader.column(0).getInt());
+    assertEquals((int) Byte.MAX_VALUE, reader.column(0).getObject());
     assertTrue(reader.next());
     assertEquals(Byte.MIN_VALUE, reader.column(0).getInt());
     assertFalse(reader.next());
     rs.clear();
   }
 
-  private void testSmallIntRW() {
+  @Test
+  public void testSmallIntRW() {
     BatchSchema batchSchema = new SchemaBuilder()
         .add("col", MinorType.SMALLINT)
         .build();
@@ -229,13 +229,15 @@ public class RowSetTest {
     assertEquals(0, reader.column(0).getInt());
     assertTrue(reader.next());
     assertEquals(Short.MAX_VALUE, reader.column(0).getInt());
+    assertEquals((int) Short.MAX_VALUE, reader.column(0).getObject());
     assertTrue(reader.next());
     assertEquals(Short.MIN_VALUE, reader.column(0).getInt());
     assertFalse(reader.next());
     rs.clear();
   }
 
-  private void testIntRW() {
+  @Test
+  public void testIntRW() {
     BatchSchema batchSchema = new SchemaBuilder()
         .add("col", MinorType.INT)
         .build();
@@ -249,13 +251,15 @@ public class RowSetTest {
     assertEquals(0, reader.column(0).getInt());
     assertTrue(reader.next());
     assertEquals(Integer.MAX_VALUE, reader.column(0).getInt());
+    assertEquals(Integer.MAX_VALUE, reader.column(0).getObject());
     assertTrue(reader.next());
     assertEquals(Integer.MIN_VALUE, reader.column(0).getInt());
     assertFalse(reader.next());
     rs.clear();
   }
 
-  private void testLongRW() {
+  @Test
+  public void testLongRW() {
     BatchSchema batchSchema = new SchemaBuilder()
         .add("col", MinorType.BIGINT)
         .build();
@@ -269,13 +273,15 @@ public class RowSetTest {
     assertEquals(0, reader.column(0).getLong());
     assertTrue(reader.next());
     assertEquals(Long.MAX_VALUE, reader.column(0).getLong());
+    assertEquals(Long.MAX_VALUE, reader.column(0).getObject());
     assertTrue(reader.next());
     assertEquals(Long.MIN_VALUE, reader.column(0).getLong());
     assertFalse(reader.next());
     rs.clear();
   }
 
-  private void testFloatRW() {
+  @Test
+  public void testFloatRW() {
     BatchSchema batchSchema = new SchemaBuilder()
         .add("col", MinorType.FLOAT4)
         .build();
@@ -288,14 +294,16 @@ public class RowSetTest {
     assertTrue(reader.next());
     assertEquals(0, reader.column(0).getDouble(), 0.000001);
     assertTrue(reader.next());
-    assertEquals(Float.MAX_VALUE, reader.column(0).getDouble(), 0.000001);
+    assertEquals((double) Float.MAX_VALUE, reader.column(0).getDouble(), 0.000001);
+    assertEquals((double) Float.MAX_VALUE, (double) reader.column(0).getObject(), 0.000001);
     assertTrue(reader.next());
-    assertEquals(Float.MIN_VALUE, reader.column(0).getDouble(), 0.000001);
+    assertEquals((double) Float.MIN_VALUE, reader.column(0).getDouble(), 0.000001);
     assertFalse(reader.next());
     rs.clear();
   }
 
-  private void testDoubleRW() {
+  @Test
+  public void testDoubleRW() {
     BatchSchema batchSchema = new SchemaBuilder()
         .add("col", MinorType.FLOAT8)
         .build();
@@ -309,6 +317,7 @@ public class RowSetTest {
     assertEquals(0, reader.column(0).getDouble(), 0.000001);
     assertTrue(reader.next());
     assertEquals(Double.MAX_VALUE, reader.column(0).getDouble(), 0.000001);
+    assertEquals(Double.MAX_VALUE, (double) reader.column(0).getObject(), 0.000001);
     assertTrue(reader.next());
     assertEquals(Double.MIN_VALUE, reader.column(0).getDouble(), 0.000001);
     assertFalse(reader.next());
@@ -316,6 +325,30 @@ public class RowSetTest {
   }
 
   @Test
+  public void testStringRW() {
+    BatchSchema batchSchema = new SchemaBuilder()
+        .add("col", MinorType.VARCHAR)
+        .build();
+    SingleRowSet rs = fixture.rowSetBuilder(batchSchema)
+        .add("")
+        .add("abcd")
+        .build();
+    RowSetReader reader = rs.reader();
+    assertTrue(reader.next());
+    assertEquals("", reader.column(0).getString());
+    assertTrue(reader.next());
+    assertEquals("abcd", reader.column(0).getString());
+    assertEquals("abcd", reader.column(0).getObject());
+    assertFalse(reader.next());
+    rs.clear();
+  }
+
+  /**
+   * Test writing to and reading from a row set with nested maps.
+   * Map fields are flattened into a logical schema.
+   */
+
+  @Test
   public void testMap() {
     BatchSchema batchSchema = new SchemaBuilder()
         .add("a", MinorType.INT)
@@ -343,8 +376,13 @@ public class RowSetTest {
     rs.clear();
   }
 
+  /**
+   * Test an array of ints (as an example fixed-width type)
+   * at the top level of a schema.
+   */
+
   @Test
-  public void TestTopScalarArray() {
+  public void TestTopFixedWidthArray() {
     BatchSchema batchSchema = new SchemaBuilder()
         .add("c", MinorType.INT)
         .addArray("a", MinorType.INT)
@@ -394,7 +432,7 @@ public class RowSetTest {
       .build();
 
     new RowSetComparison(rs1)
-      .verifyAndClear(rs2);
+      .verifyAndClearAll(rs2);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnReader.java
index 860a866..4932567 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnReader.java
@@ -58,6 +58,7 @@ public interface ColumnReader extends ColumnAccessor {
   byte[] getBytes();
   BigDecimal getDecimal();
   Period getPeriod();
+  Object getObject();
   TupleReader map();
   ArrayReader array();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/AbstractColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/AbstractColumnReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/AbstractColumnReader.java
index 1ef2243..b88b08b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/AbstractColumnReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/AbstractColumnReader.java
@@ -47,6 +47,34 @@ public abstract class AbstractColumnReader extends AbstractColumnAccessor implem
   }
 
   @Override
+  public Object getObject() {
+    switch (valueType()) {
+    case ARRAY:
+      // TODO: build an array. Just a bit tedious...
+      throw new UnsupportedOperationException();
+    case BYTES:
+      return getBytes();
+    case DECIMAL:
+      return getDecimal();
+    case DOUBLE:
+      return getDouble();
+    case INTEGER:
+      return getInt();
+    case LONG:
+      return getLong();
+    case MAP:
+      // TODO: build an array. Just a bit tedious...
+      throw new UnsupportedOperationException();
+    case PERIOD:
+      return getPeriod();
+    case STRING:
+      return getString();
+    default:
+      throw new IllegalStateException("Unexpected type: " + valueType());
+    }
+  }
+
+  @Override
   public boolean isNull() {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/63e24337/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/TupleReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/TupleReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/TupleReaderImpl.java
index 041023b..97a6e3c 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/TupleReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/TupleReaderImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.vector.accessor.impl;
 
+import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.TupleReader;
 
@@ -76,21 +77,7 @@ public class TupleReaderImpl extends AbstractTupleAccessor implements TupleReade
     }
     switch (colReader.valueType()) {
     case BYTES:
-      StringBuilder buf = new StringBuilder()
-          .append("[");
-      byte value[] = colReader.getBytes();
-      int len = Math.min(value.length, 20);
-      for (int i = 0; i < len;  i++) {
-        if (i > 0) {
-          buf.append(", ");
-        }
-        buf.append((int) value[i]);
-      }
-      if (value.length > len) {
-        buf.append("...");
-      }
-      buf.append("]");
-      return buf.toString();
+      return bytesToString(colReader.getBytes());
     case DOUBLE:
       return Double.toString(colReader.getDouble());
     case INTEGER:
@@ -101,8 +88,64 @@ public class TupleReaderImpl extends AbstractTupleAccessor implements TupleReade
       return "\"" + colReader.getString() + "\"";
     case DECIMAL:
       return colReader.getDecimal().toPlainString();
+    case ARRAY:
+      return getArrayAsString(colReader.array());
     default:
       throw new IllegalArgumentException("Unsupported type " + colReader.valueType());
     }
   }
+
+  private String bytesToString(byte[] value) {
+    StringBuilder buf = new StringBuilder()
+        .append("[");
+    int len = Math.min(value.length, 20);
+    for (int i = 0; i < len;  i++) {
+      if (i > 0) {
+        buf.append(", ");
+      }
+      buf.append((int) value[i]);
+    }
+    if (value.length > len) {
+      buf.append("...");
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+
+  private String getArrayAsString(ArrayReader array) {
+    StringBuilder buf = new StringBuilder();
+    buf.append("[");
+    for (int i = 0; i < array.size(); i++) {
+      if (i > 0) {
+        buf.append( ", " );
+      }
+      switch (array.valueType()) {
+      case BYTES:
+        buf.append(bytesToString(array.getBytes(i)));
+        break;
+      case DOUBLE:
+        buf.append(Double.toString(array.getDouble(i)));
+        break;
+      case INTEGER:
+        buf.append(Integer.toString(array.getInt(i)));
+        break;
+      case LONG:
+        buf.append(Long.toString(array.getLong(i)));
+        break;
+      case STRING:
+        buf.append("\"" + array.getString(i) + "\"");
+        break;
+      case DECIMAL:
+        buf.append(array.getDecimal(i).toPlainString());
+        break;
+      case MAP:
+      case ARRAY:
+        throw new UnsupportedOperationException("Unsupported type " + array.valueType());
+      default:
+        throw new IllegalArgumentException("Unexpected type " + array.valueType());
+      }
+    }
+    buf.append("]");
+    return buf.toString();
+  }
 }