You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/03/08 02:55:15 UTC

[6/8] drill git commit: DRILL-6162: Enhance record batch sizer to retain nesting information. Refactor record batch sizer and add unit tests for sizing and vector allocation.

DRILL-6162: Enhance record batch sizer to retain nesting information.
Refactor record batch sizer and add unit tests for sizing and vector allocation.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/47c5d1fe
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/47c5d1fe
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/47c5d1fe

Branch: refs/heads/master
Commit: 47c5d1feaaaf9b6384ed8ef1011fa58b9272b362
Parents: 0a8a3f1
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Tue Mar 6 16:05:48 2018 -0800
Committer: Ben-Zvi <bb...@mapr.com>
Committed: Wed Mar 7 15:41:11 2018 -0800

----------------------------------------------------------------------
 .../drill/exec/record/RecordBatchSizer.java     | 479 ++++++++---
 .../drill/exec/record/VectorInitializer.java    |  23 +-
 .../impl/xsort/managed/TestShortArrays.java     |   6 +-
 .../exec/physical/unit/TestOutputBatchSize.java | 118 ++-
 .../drill/exec/record/TestRecordBatchSizer.java | 806 +++++++++++++++++++
 .../apache/drill/exec/vector/ValueVector.java   |   1 +
 6 files changed, 1296 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/47c5d1fe/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 536c8bc..9525c91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -27,7 +27,11 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.UInt1Vector;
 import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.UntypedNullVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
@@ -36,6 +40,7 @@ import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
 import com.google.common.collect.Sets;
+import org.apache.drill.exec.vector.complex.RepeatedVariableWidthVectorLike;
 
 /**
  * Given a record batch or vector container, determines the actual memory
@@ -43,6 +48,9 @@ import com.google.common.collect.Sets;
  */
 
 public class RecordBatchSizer {
+  private static final int OFFSET_VECTOR_WIDTH = UInt4Vector.VALUE_WIDTH;
+  private static final int BIT_VECTOR_WIDTH = UInt1Vector.VALUE_WIDTH;
+  private static final int STD_REPETITION_FACTOR = 10;
 
   /**
    * Column size information.
@@ -52,20 +60,19 @@ public class RecordBatchSizer {
     public final MaterializedField metadata;
 
     /**
-     * Assumed size from Drill metadata. Note that this information is
-     * 100% bogus for variable-width columns. Do not use it for such
-     * columns.
+     * This is the total size of just pure data for the column
+     * for all entries.
      */
 
-    public int stdSize;
+    private int totalDataSize;
 
     /**
-     * Actual average column width as determined from actual memory use. This
-     * size is larger than the actual data size since this size includes per-
-     * column overhead such as any unused vector space, etc.
+     * This is the total size of data for the column + additional
+     * metadata vector overhead we add for cardinality, variable length etc.
+     * for all entries.
      */
 
-    public final int estSize;
+    private int totalNetSize;
 
     /**
      * Number of occurrences of the value in the batch. This is trivial
@@ -76,104 +83,324 @@ public class RecordBatchSizer {
      * greater than (but unlikely) same as the row count.
      */
 
-    public final int valueCount;
+    private final int valueCount;
 
     /**
-     * Total number of elements for a repeated type, or 1 if this is
-     * a non-repeated type. That is, a batch of 100 rows may have an
-     * array with 10 elements per row. In this case, the element count
-     * is 1000.
+     * Total number of elements for a repeated type, or same as
+     * valueCount if this is a non-repeated type. That is, a batch
+     * of 100 rows may have an array with 10 elements per row.
+     * In this case, the element count is 1000.
      */
 
-    public final int elementCount;
+    private int elementCount;
 
     /**
-     * Size of the top level value vector. For map and repeated list,
-     * this is just size of offset vector.
+     * The estimated, average number of elements per parent value.
+     * Always 1 for a non-repeated type. For a repeated type,
+     * this is the average entries per array (per repeated element).
      */
-    public int dataSize;
+
+    private float cardinality;
 
     /**
-     * Total size of the column includes the sum total of memory for all
-     * value vectors representing the column.
+     * Indicates if it is variable width column.
+     * For map columns, this is true if any of the children is variable
+     * width column.
      */
-    public int netSize;
+
+    private boolean isVariableWidth;
 
     /**
-     * The estimated, average number of elements per parent value.
-     * Always 1 for a non-repeated type. For a repeated type,
-     * this is the average entries per array (per repeated element).
+     * Indicates if cardinality is repeated(top level only).
+     */
+
+    private boolean isRepeated;
+
+    /**
+     * Indicates if cardinality is optional i.e. nullable(top level only).
+     */
+    private boolean isOptional;
+
+    /**
+     * Child columns if this is a map column.
+     */
+    private Map<String, ColumnSize> children = CaseInsensitiveMap.newHashMap();
+
+    /**
+     * std pure data size per entry from Drill metadata, based on type.
+     * Does not include metadata vector overhead we add for cardinality,
+     * variable length etc.
+     * For variable-width columns, we use 50 as std size for entry width.
+     * For repeated column, we assume repetition of 10.
+     */
+    public int getStdDataSizePerEntry() {
+      int stdDataSize;
+
+      try {
+        stdDataSize = TypeHelper.getSize(metadata.getType());
+
+        // For variable width, typeHelper includes offset vector width. Adjust for that.
+        if (isVariableWidth) {
+          stdDataSize -= OFFSET_VECTOR_WIDTH;
+        }
+
+        if (isRepeated) {
+          stdDataSize = stdDataSize * STD_REPETITION_FACTOR;
+        }
+      } catch (Exception e) {
+        // For unsupported types, just set stdSize to 0.
+        // Map, Union, List etc.
+        stdDataSize = 0;
+      }
+
+      // Add sizes of children.
+      for (ColumnSize columnSize : children.values()) {
+        stdDataSize += columnSize.getStdDataSizePerEntry();
+      }
+
+      if (isRepeatedList()) {
+        stdDataSize = stdDataSize * STD_REPETITION_FACTOR;
+      }
+
+      return stdDataSize;
+    }
+
+    /**
+     * std net size per entry taking into account additional metadata vectors
+     * we add on top for variable length, cardinality etc.
+     * For variable-width columns, we use 50 as std data size for entry width.
+     * For repeated column, we assume repetition of 10.
      */
+    public int getStdNetSizePerEntry() {
+      int stdNetSize;
+      try {
+        stdNetSize = TypeHelper.getSize(metadata.getType());
+      } catch (Exception e) {
+        stdNetSize = 0;
+      }
 
-    public final float estElementCountPerArray;
-    public final boolean isVariableWidth;
+      if (isOptional) {
+        stdNetSize += BIT_VECTOR_WIDTH;
+      }
+
+      if (isRepeated) {
+        stdNetSize = (stdNetSize * STD_REPETITION_FACTOR) + OFFSET_VECTOR_WIDTH;
+      }
+
+      for (ColumnSize columnSize : children.values()) {
+        stdNetSize += columnSize.getStdNetSizePerEntry();
+      }
+
+      if (isRepeatedList()) {
+        stdNetSize = (stdNetSize * STD_REPETITION_FACTOR) + OFFSET_VECTOR_WIDTH;
+      }
+
+      return stdNetSize;
+    }
+
+    /**
+     * This is the average actual per entry data size in bytes. Does not
+     * include any overhead of metadata vectors.
+     * For repeated columns, it is average for the repeated array, not
+     * individual entry in the array.
+     */
+    public int getDataSizePerEntry() {
+      return safeDivide(getTotalDataSize(), getValueCount());
+    }
+
+    /**
+     * This is the average per entry size of just pure data plus
+     * overhead of additional vectors we add on top like bits vector,
+     * offset vector etc. This
+     * size is larger than the actual data size since this size includes per-
+     * column overhead for additional vectors we add for
+     * cardinality, variable length etc.
+     */
+    public int getNetSizePerEntry() {
+      return safeDivide(getTotalNetSize(), getValueCount());
+    }
+
+    /**
+     * This is the total data size for the column, including children for map
+     * columns. Does not include any overhead of metadata vectors.
+     */
+    public int getTotalDataSize() {
+      int dataSize = this.totalDataSize;
+      for (ColumnSize columnSize : children.values()) {
+        dataSize += columnSize.getTotalDataSize();
+      }
+      return dataSize;
+    }
+
+    /**
+     * This is the total net size for the column, including children for map
+     * columns. Includes overhead of metadata vectors.
+     */
+    public int getTotalNetSize() {
+      return this.totalNetSize;
+    }
+
+    public int getValueCount() {
+      return valueCount;
+    }
+
+    public int getElementCount() {
+      return elementCount;
+    }
+
+    public float getCardinality() {
+      return cardinality;
+    }
+
+    public boolean isVariableWidth() {
+      return isVariableWidth;
+    }
+
+    public Map<String, ColumnSize> getChildren() {
+      return children;
+    }
+
+    public boolean isComplex() {
+      return metadata.getType().getMinorType() == MinorType.MAP ||
+        metadata.getType().getMinorType() == MinorType.UNION ||
+        metadata.getType().getMinorType() == MinorType.LIST;
+    }
+
+    public boolean isRepeatedList() {
+      if (metadata.getType().getMinorType() == MinorType.LIST &&
+        metadata.getDataMode() == DataMode.REPEATED) {
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * This is the average per entry width, used for vector allocation.
+     */
+    public int getEntryWidth() {
+      int width = 0;
+      if (isVariableWidth) {
+        width = getNetSizePerEntry() - OFFSET_VECTOR_WIDTH;
+
+        // Subtract out the bits (is-set) vector width
+        if (metadata.getDataMode() == DataMode.OPTIONAL) {
+          width -= BIT_VECTOR_WIDTH;
+        }
+      }
+
+      return (safeDivide(width, cardinality));
+    }
 
     public ColumnSize(ValueVector v, String prefix) {
       this.prefix = prefix;
       valueCount = v.getAccessor().getValueCount();
       metadata = v.getField();
-      isVariableWidth = v instanceof VariableWidthVector;
-
-      // The amount of memory consumed by the payload: the actual
-      // data stored in the vectors.
-
-      if (v.getField().getDataMode() == DataMode.REPEATED) {
-        elementCount = buildRepeated(v);
-        estElementCountPerArray = valueCount == 0 ? 0 : elementCount * 1.0f / valueCount;
-      } else {
-        elementCount = 1;
-        estElementCountPerArray = 1;
+      isVariableWidth = (v instanceof VariableWidthVector || v instanceof RepeatedVariableWidthVectorLike);
+      elementCount = valueCount;
+      cardinality = 1;
+      totalNetSize = v.getPayloadByteCount(valueCount);
+
+      // Special case. For union and list vectors, it is very complex
+      // to figure out raw data size. Make it same as net size.
+      if (metadata.getType().getMinorType() == MinorType.UNION ||
+        (metadata.getType().getMinorType() == MinorType.LIST && v.getField().getDataMode() != DataMode.REPEATED)) {
+        totalDataSize = totalNetSize;
       }
-      switch (metadata.getType().getMinorType()) {
-      case LIST:
-        buildList(v);
-        break;
-      case MAP:
-      case UNION:
-        // No standard size for Union type
-        dataSize = v.getPayloadByteCount(valueCount);
-        break;
-      default:
-        dataSize = v.getPayloadByteCount(valueCount);
-        try {
-          stdSize = TypeHelper.getSize(metadata.getType()) * elementCount;
-        } catch (Exception e) {
-          // For unsupported types, just set stdSize to 0.
-          stdSize = 0;
-        }
+
+      switch(v.getField().getDataMode()) {
+        case REPEATED:
+          isRepeated = true;
+          elementCount  = getElementCount(v);
+          cardinality = valueCount == 0 ? 0 : elementCount * 1.0f / valueCount;
+
+          // For complex types, there is nothing more to do for top columns.
+          // Data size is calculated recursively for children later.
+          if (isComplex()) {
+            return;
+          }
+
+          // Calculate pure data size.
+          if (isVariableWidth) {
+            UInt4Vector offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+            int innerValueCount = offsetVector.getAccessor().get(valueCount);
+            VariableWidthVector dataVector = ((VariableWidthVector) ((RepeatedValueVector) v).getDataVector());
+            totalDataSize = dataVector.getOffsetVector().getAccessor().get(innerValueCount);
+          } else {
+            ValueVector dataVector = ((RepeatedValueVector) v).getDataVector();
+            totalDataSize = dataVector.getPayloadByteCount(elementCount);
+          }
+
+          break;
+        case OPTIONAL:
+          isOptional = true;
+
+          // For complex types, there is nothing more to do for top columns.
+          // Data size is calculated recursively for children later.
+          if (isComplex()) {
+            return;
+          }
+
+          // Calculate pure data size.
+          if (isVariableWidth) {
+            VariableWidthVector variableWidthVector = ((VariableWidthVector) ((NullableVector) v).getValuesVector());
+            totalDataSize = variableWidthVector.getOffsetVector().getAccessor().get(valueCount);
+          } else {
+            // Another special case.
+            if (v instanceof UntypedNullVector) {
+              return;
+            }
+            totalDataSize = ((NullableVector) v).getValuesVector().getPayloadByteCount(valueCount);
+          }
+          break;
+
+        case REQUIRED:
+          // For complex types, there is nothing more to do for top columns.
+          // Data size is calculated recursively for children later.
+          if (isComplex()) {
+            return;
+          }
+
+          // Calculate pure data size.
+          if (isVariableWidth) {
+            UInt4Vector  offsetVector = ((VariableWidthVector)v).getOffsetVector();
+            totalDataSize = offsetVector.getAccessor().get(valueCount);
+          } else {
+            totalDataSize = v.getPayloadByteCount(valueCount);
+          }
+          break;
+
+        default:
+            break;
       }
-      estSize = safeDivide(dataSize, valueCount);
-      netSize = v.getPayloadByteCount(valueCount);
     }
 
     @SuppressWarnings("resource")
-    private int buildRepeated(ValueVector v) {
-
+    private int getElementCount(ValueVector v) {
       // Repeated vectors are special: they have an associated offset vector
       // that changes the value count of the contained vectors.
-
       UInt4Vector offsetVector = ((RepeatedValueVector) v).getOffsetVector();
       int childCount = valueCount == 0 ? 0 : offsetVector.getAccessor().get(valueCount);
-      if (metadata.getType().getMinorType() == MinorType.MAP) {
 
-        // For map, the only data associated with the map vector
-        // itself is the offset vector, if any.
+      return childCount;
+    }
+
+    private void allocateMap(AbstractMapVector map, int recordCount) {
+      if (map instanceof RepeatedMapVector) {
+        ((RepeatedMapVector) map).allocateOffsetsNew(recordCount);
+          recordCount *= getCardinality();
+        }
 
-        dataSize = offsetVector.getPayloadByteCount(valueCount);
+      for (ValueVector vector : map) {
+        children.get(vector.getField().getName()).allocateVector(vector, recordCount);
       }
-      return childCount;
     }
 
-    @SuppressWarnings("resource")
-    private void buildList(ValueVector v) {
-      // complex ListVector cannot be casted to RepeatedListVector.
-      // check the mode.
-      if (v.getField().getDataMode() != DataMode.REPEATED) {
-        dataSize = v.getPayloadByteCount(valueCount);
+    public void allocateVector(ValueVector vector, int recordCount) {
+      if (vector instanceof AbstractMapVector) {
+        allocateMap((AbstractMapVector) vector, recordCount);
         return;
       }
-      UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector();
-      dataSize = offsetVector.getPayloadByteCount(valueCount);
+      AllocationHelper.allocate(vector, recordCount, getEntryWidth(), getCardinality());
     }
 
     @Override
@@ -191,15 +418,21 @@ public class RecordBatchSizer {
         buf.append(", elements: ")
            .append(elementCount)
            .append(", per-array: ")
-           .append(estElementCountPerArray);
+           .append(cardinality);
       }
-      buf .append(", std size: ")
-          .append(stdSize)
-          .append(", actual size: ")
-          .append(estSize)
-          .append(", data size: ")
-          .append(dataSize)
-          .append(")");
+      buf.append("Per entry: std data size: ")
+         .append(getStdDataSizePerEntry())
+         .append(", std net size: ")
+         .append(getStdNetSizePerEntry())
+         .append(", actual data size: ")
+         .append(getDataSizePerEntry())
+         .append(", actual net size: ")
+         .append(getNetSizePerEntry())
+         .append("  Totals: data size: ")
+         .append(getTotalDataSize())
+         .append(", net size: ")
+         .append(getTotalNetSize())
+         .append(")");
       return buf.toString();
     }
 
@@ -214,7 +447,7 @@ public class RecordBatchSizer {
      * for this column
      */
 
-    private void buildVectorInitializer(VectorInitializer initializer) {
+    public void buildVectorInitializer(VectorInitializer initializer) {
       int width = 0;
       switch(metadata.getType().getMinorType()) {
       case VAR16CHAR:
@@ -222,11 +455,11 @@ public class RecordBatchSizer {
       case VARCHAR:
 
         // Subtract out the offset vector width
-        width = estSize - 4;
+        width = getNetSizePerEntry() - OFFSET_VECTOR_WIDTH;
 
         // Subtract out the bits (is-set) vector width
         if (metadata.getDataMode() == DataMode.OPTIONAL) {
-          width -= 1;
+          width -= BIT_VECTOR_WIDTH;
         }
         break;
       default:
@@ -237,23 +470,32 @@ public class RecordBatchSizer {
         if (width > 0) {
           // Estimated width is width of entire column. Divide
           // by element count to get per-element size.
-          initializer.variableWidthArray(name, width / estElementCountPerArray, estElementCountPerArray);
+          initializer.variableWidthArray(name, width / cardinality, cardinality);
         } else {
-          initializer.fixedWidthArray(name, estElementCountPerArray);
+          initializer.fixedWidthArray(name, cardinality);
         }
       }
       else if (width > 0) {
         initializer.variableWidth(name, width);
       }
+
+      for (ColumnSize columnSize : children.values()) {
+        columnSize.buildVectorInitializer(initializer);
+      }
     }
+
   }
 
   public static ColumnSize getColumn(ValueVector v, String prefix) {
     return new ColumnSize(v, prefix);
   }
 
-  public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB
+  public ColumnSize getColumn(String name) {
+    return columnSizes.get(name);
+  }
 
+  // This keeps information for only top level columns. Information for nested
+  // columns can be obtained from children of topColumns.
   private Map<String, ColumnSize> columnSizes = CaseInsensitiveMap.newHashMap();
 
   /**
@@ -334,7 +576,15 @@ public class RecordBatchSizer {
   public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) {
     rowCount = va.getRecordCount();
     for (VectorWrapper<?> vw : va) {
-      measureColumn(vw.getValueVector(), "");
+      ColumnSize colSize = measureColumn(vw.getValueVector(), "");
+      columnSizes.put(vw.getField().getName(), colSize);
+      stdRowWidth += colSize.getStdDataSizePerEntry();
+      netBatchSize += colSize.getTotalNetSize();
+      maxSize = Math.max(maxSize, colSize.getTotalDataSize());
+      if (colSize.metadata.isNullable()) {
+        nullableCount++;
+      }
+      netRowWidth += colSize.getNetSizePerEntry();
     }
 
     for (BufferLedger ledger : ledgers) {
@@ -385,44 +635,35 @@ public class RecordBatchSizer {
     return 64;
   }
 
-  private void measureColumn(ValueVector v, String prefix) {
-
+  private ColumnSize measureColumn(ValueVector v, String prefix) {
     ColumnSize colSize = new ColumnSize(v, prefix);
-    columnSizes.put(v.getField().getName(), colSize);
-    stdRowWidth += colSize.stdSize;
-    netBatchSize += colSize.dataSize;
-    maxSize = Math.max(maxSize, colSize.dataSize);
-    if (colSize.metadata.isNullable()) {
-      nullableCount++;
-    }
-
-    // Maps consume no size themselves. However, their contained
-    // vectors do consume space, so visit columns recursively.
-
     switch (v.getField().getType().getMinorType()) {
       case MAP:
-        expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
+        // Maps consume no size themselves. However, their contained
+        // vectors do consume space, so visit columns recursively.
+        expandMap(colSize, (AbstractMapVector) v, prefix + v.getField().getName() + ".");
         break;
       case LIST:
         // complex ListVector cannot be casted to RepeatedListVector.
         // do not expand the list if it is not repeated mode.
         if (v.getField().getDataMode() == DataMode.REPEATED) {
-          expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
+          expandList(colSize, (RepeatedListVector) v, prefix + v.getField().getName() + ".");
         }
         break;
       default:
         v.collectLedgers(ledgers);
     }
 
-    netRowWidth += colSize.estSize;
-    netRowWidthCap50 += ! colSize.isVariableWidth ? colSize.estSize :
-        8 /* offset vector */ + roundUpToPowerOf2(Math.min(colSize.estSize,50));
+    netRowWidthCap50 += ! colSize.isVariableWidth ? colSize.getNetSizePerEntry() :
+        8 /* offset vector */ + roundUpToPowerOf2(Math.min(colSize.getNetSizePerEntry(),50));
         // above change 8 to 4 after DRILL-5446 is fixed
+
+    return colSize;
   }
 
-  private void expandMap(AbstractMapVector mapVector, String prefix) {
+  private void expandMap(ColumnSize colSize, AbstractMapVector mapVector, String prefix) {
     for (ValueVector vector : mapVector) {
-      measureColumn(vector, prefix);
+      colSize.children.put(vector.getField().getName(), measureColumn(vector, prefix));
     }
 
     // For a repeated map, we need the memory for the offset vector (only).
@@ -433,11 +674,10 @@ public class RecordBatchSizer {
     }
   }
 
-  private void expandList(RepeatedListVector vector, String prefix) {
-    measureColumn(vector.getDataVector(), prefix);
+  private void expandList(ColumnSize colSize, RepeatedListVector vector, String prefix) {
+    colSize.children.put(vector.getField().getName(), measureColumn(vector.getDataVector(), prefix));
 
     // Determine memory for the offset vector (only).
-
     vector.collectLedgers(ledgers);
   }
 
@@ -448,6 +688,20 @@ public class RecordBatchSizer {
     return (int) Math.ceil((double) num / denom);
   }
 
+  public static int safeDivide(int num, int denom) {
+    if (denom == 0) {
+      return 0;
+    }
+    return (int) Math.ceil((double) num / denom);
+  }
+
+  public static int safeDivide(int num, float denom) {
+    if (denom == 0) {
+      return 0;
+    }
+    return (int) Math.ceil((double) num / denom);
+  }
+
   public int rowCount() { return rowCount; }
   public int stdRowWidth() { return stdRowWidth; }
   public int grossRowWidth() { return grossRowWidth; }
@@ -507,4 +761,11 @@ public class RecordBatchSizer {
     }
     return initializer;
   }
+
+  public void allocateVectors(VectorContainer container, int recordCount) {
+    for (VectorWrapper w : container) {
+      ColumnSize colSize = columnSizes.get(w.getField().getName());
+      colSize.allocateVector(w.getValueVector(), recordCount);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/47c5d1fe/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
index 30c2a7a..90f507d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.record;
 
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -25,6 +25,7 @@ import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -56,24 +57,24 @@ public class VectorInitializer {
     @Override
     public String toString() {
       StringBuilder buf = new StringBuilder()
-          .append("{");
+        .append("{");
       String sep = "";
       if (entryWidth > 0) {
         buf.append("width=")
-           .append(entryWidth);
+          .append(entryWidth);
         sep = ", ";
       }
       if (elementCount > 0) {
         buf.append(sep)
-           .append("elements=")
-           .append(elementCount);
+          .append("elements=")
+          .append(elementCount);
       }
       buf.append("}");
       return buf.toString();
     }
   }
 
-  private Map<String, AllocationHint> hints = new HashMap<>();
+  private Map<String, AllocationHint> hints = CaseInsensitiveMap.newHashMap();
 
   public void variableWidth(String name, int width) {
     hints.put(name, new AllocationHint(width, 1));
@@ -97,7 +98,13 @@ public class VectorInitializer {
     }
   }
 
-  private void allocateVector(ValueVector vector, String prefix, int recordCount) {
+  public void allocateVectors(List<ValueVector> valueVectors, int recordCount) {
+    for (ValueVector v : valueVectors) {
+      allocateVector(v, v.getField().getName(), recordCount);
+    }
+  }
+
+  public void allocateVector(ValueVector vector, String prefix, int recordCount) {
     String key = prefix + vector.getField().getName();
     AllocationHint hint = hints.get(key);
     if (vector instanceof AbstractMapVector) {
@@ -117,7 +124,7 @@ public class VectorInitializer {
 //        ", " + size);
   }
 
-  private void allocateVector(ValueVector vector, int recordCount, AllocationHint hint) {
+  public void allocateVector(ValueVector vector, int recordCount, AllocationHint hint) {
     if (hint == null) {
       // Use hard-coded values. Same as ScanBatch
 

http://git-wip-us.apache.org/repos/asf/drill/blob/47c5d1fe/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java
index 3c210f7..735c5ce 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestShortArrays.java
@@ -72,15 +72,15 @@ public class TestShortArrays extends SubOperatorTest {
     RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
     assertEquals(2, sizer.columns().size());
     ColumnSize bCol = sizer.columns().get("b");
-    assertEquals(0.1, bCol.estElementCountPerArray, 0.01);
-    assertEquals(1, bCol.elementCount);
+    assertEquals(0.1, bCol.getCardinality(), 0.01);
+    assertEquals(1, bCol.getElementCount());
 
     // Create a vector initializer using the sizer info.
 
     VectorInitializer vi = sizer.buildVectorInitializer();
     AllocationHint bHint = vi.hint("b");
     assertNotNull(bHint);
-    assertEquals(bHint.elementCount, bCol.estElementCountPerArray, 0.001);
+    assertEquals(bHint.elementCount, bCol.getCardinality(), 0.001);
 
     // Create a new batch, and new vector, using the sizer and
     // initializer inferred from the previous batch.

http://git-wip-us.apache.org/repos/asf/drill/blob/47c5d1fe/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 741164f..edd4cbf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -31,15 +31,23 @@ import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.util.Text;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedListVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
   private static final long initReservation = AbstractBase.INIT_ALLOCATION;
   private static final long maxAllocation = AbstractBase.MAX_ALLOCATION;
@@ -111,14 +119,14 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     long totalSize = getExpectedSize(expectedJsonBatches);
 
     // set the output batch size to 1/2 of total size expected.
-    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     OperatorTestBuilder opTestBuilder = opTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
-      .expectedNumBatches(4)  // verify number of batches
+      .expectedNumBatches(2)  // verify number of batches
       .expectedBatchSize(totalSize / 2); // verify batch size.
 
     for (int i = 0; i < numRows + 1; i++) {
@@ -179,14 +187,14 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
     long totalSize = getExpectedSize(expectedJsonBatches);
 
     // set the output batch size to 1/2 of total size expected.
-    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     OperatorTestBuilder opTestBuilder = opTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
-      .expectedNumBatches(4) // verify number of batches
+      .expectedNumBatches(2) // verify number of batches
       .expectedBatchSize(totalSize / 2); // verify batch size.
 
     for (int i = 0; i < numRows + 1; i++) {
@@ -241,14 +249,14 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     long totalSize = getExpectedSize(expectedJsonBatches);
     // set the output batch size to 1/2 of total size expected.
-    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     OperatorTestBuilder opTestBuilder = opTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
-      .expectedNumBatches(4) // verify number of batches
+      .expectedNumBatches(2) // verify number of batches
       .expectedBatchSize(totalSize);  // verify batch size.
 
     for (int i = 0; i < numRows + 1; i++) {
@@ -302,14 +310,14 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     long totalSize = getExpectedSize(expectedJsonBatches);
     // set the output batch size to 1/2 of total size expected.
-    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     OperatorTestBuilder opTestBuilder = opTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
-      .expectedNumBatches(4) // verify number of batches
+      .expectedNumBatches(2) // verify number of batches
       .expectedBatchSize(totalSize);  // verify batch size.
 
     final JsonStringArrayList<Text> birds1 = new JsonStringArrayList<Text>() {{
@@ -390,14 +398,14 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     long totalSize = getExpectedSize(expectedJsonBatches);
     // set the output batch size to 1/2 of total size expected.
-    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     OperatorTestBuilder opTestBuilder = opTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
-      .expectedNumBatches(4) // verify number of batches
+      .expectedNumBatches(2) // verify number of batches
       .expectedBatchSize(totalSize / 2);  // verify batch size.
 
     JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>();
@@ -499,14 +507,14 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     long totalSize = getExpectedSize(expectedJsonBatches);
     // set the output batch size to 1/2 of total size expected.
-    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     OperatorTestBuilder opTestBuilder = opTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
-      .expectedNumBatches(4) // verify number of batches
+      .expectedNumBatches(2) // verify number of batches
       .expectedBatchSize(totalSize / 2);  // verify batch size.
 
     final JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>();
@@ -597,14 +605,14 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     long totalSize = getExpectedSize(expectedJsonBatches);
     // set the output batch size to 1/2 of total size expected.
-    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     OperatorTestBuilder opTestBuilder = opTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
-      .expectedNumBatches(4) // verify number of batches
+      .expectedNumBatches(2) // verify number of batches
       .expectedBatchSize(totalSize / 2);  // verify batch size.
 
     JsonStringHashMap<String, Object> innerMapResult = new JsonStringHashMap<>();
@@ -694,7 +702,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     long totalSize = getExpectedSize(expectedJsonBatches);
     // set the output batch size to 1/2 of total size expected.
-    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    // We will get 16 batches because of upper bound of 65535 rows.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     // Here we expect 16 batches because each batch will be limited by upper limit of 65535 records.
@@ -863,14 +871,14 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     long totalSize = getExpectedSize(expectedJsonBatches);
     // set the output batch size to 1/2 of total size expected.
-    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    // We will get approximately get 2 batches and max of 4.
     fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
 
     OperatorTestBuilder opTestBuilder = opTestBuilder()
       .physicalOperator(flatten)
       .inputDataStreamJson(inputJsonBatches)
       .baselineColumns("a", "b", "c")
-      .expectedNumBatches(4) // verify number of batches
+      .expectedNumBatches(2) // verify number of batches
       .expectedBatchSize(totalSize / 2);  // verify batch size.
 
     for (long k = 0; k < ((numRows + 1)); k++) {
@@ -1126,4 +1134,80 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
 
     opTestBuilder.go();
   }
+
+  @Test
+  public void testSizerRepeatedList() throws Exception {
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+
+    StringBuilder newString = new StringBuilder();
+    newString.append("[ [1,2,3,4], [5,6,7,8] ]");
+
+    numRows = 9;
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"c\" : " + newString);
+      batchString.append("},");
+    }
+    batchString.append("{\"c\" : " + newString);
+    batchString.append("}");
+
+    batchString.append("]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Create a dummy scanBatch to figure out the size.
+    RecordBatch scanBatch = new ScanBatch(new MockPhysicalOperator(),
+      fragContext, getReaderListForJsonBatches(inputJsonBatches, fragContext));
+
+    VectorAccessible va = new BatchIterator(scanBatch).iterator().next();
+    RecordBatchSizer sizer = new RecordBatchSizer(va);
+
+    assertEquals(1, sizer.columns().size());
+    RecordBatchSizer.ColumnSize column = sizer.columns().get("c");
+    assertNotNull(column);
+
+    /**
+     * stdDataSize:8*10*10, stdNetSize:8*10*10 + 4*10 + 4*10 + 4,
+     * dataSizePerEntry:8*8, netSizePerEntry:8*8 + 4*2 + 4,
+     * totalDataSize:8*8*10, totalNetSize:netSizePerEntry*10, valueCount:10,
+     * elementCount:10, estElementCountPerArray:1, isVariableWidth:false
+     */
+    assertEquals(800, column.getStdDataSizePerEntry());
+    assertEquals(884, column.getStdNetSizePerEntry());
+    assertEquals(64, column.getDataSizePerEntry());
+    assertEquals(76, column.getNetSizePerEntry());
+    assertEquals(640, column.getTotalDataSize());
+    assertEquals(760, column.getTotalNetSize());
+    assertEquals(10, column.getValueCount());
+    assertEquals(20, column.getElementCount());
+    assertEquals(2, column.getCardinality(), 0.01);
+    assertEquals(false, column.isVariableWidth());
+
+    final int testRowCount = 1000;
+    final int testRowCountPowerTwo = 2048;
+
+    for (VectorWrapper<?> vw : va) {
+      ValueVector v = vw.getValueVector();
+      v.clear();
+
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount * 2) << 1), offsetVector.getValueCapacity());
+      ValueVector dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(Integer.highestOneBit((testRowCount * 2)  << 1) - 1, dataVector.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      // -1 is done for adjustment needed for offset vector.
+      colSize.allocateVector(v, testRowCountPowerTwo - 1);
+      offsetVector = ((RepeatedListVector) v).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo)-1, dataVector.getValueCapacity());
+      v.clear();
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/47c5d1fe/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordBatchSizer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordBatchSizer.java
new file mode 100644
index 0000000..0490406
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordBatchSizer.java
@@ -0,0 +1,806 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.Test;
+
+public class TestRecordBatchSizer extends SubOperatorTest {
+  private final int testRowCount = 1000;
+  private final int testRowCountPowerTwo = 2048;
+
+
+  private void verifyColumnValues(ColumnSize column, int stdDataSizePerEntry, int stdNetSizePerEntry,
+                                  int dataSizePerEntry, int netSizePerEntry, int totalDataSize,
+                                  int totalNetSize, int valueCount, int elementCount,
+                                  int estElementCountPerArray, boolean isVariableWidth) {
+    assertNotNull(column);
+
+    assertEquals(stdDataSizePerEntry, column.getStdDataSizePerEntry());
+    assertEquals(stdNetSizePerEntry, column.getStdNetSizePerEntry());
+
+    assertEquals(dataSizePerEntry, column.getDataSizePerEntry());
+    assertEquals(netSizePerEntry, column.getNetSizePerEntry());
+
+    assertEquals(totalDataSize, column.getTotalDataSize());
+    assertEquals(totalNetSize, column.getTotalNetSize());
+
+    assertEquals(valueCount, column.getValueCount());
+    assertEquals(elementCount, column.getElementCount());
+
+    assertEquals(estElementCountPerArray, column.getCardinality(), 0.01);
+    assertEquals(isVariableWidth, column.isVariableWidth());
+  }
+
+  @Test
+  public void testSizerFixedWidth() {
+    BatchSchema schema = new SchemaBuilder().add("a", MinorType.BIGINT).add("b", MinorType.FLOAT8).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+
+    for (long i = 0; i < 10; i++) {
+      builder.addRow(i, (float) i * 0.1);
+    }
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(2, sizer.columns().size());
+
+    ColumnSize aColumn = sizer.columns().get("a");
+
+    /**
+     * stdDataSize:8, stdNetSize:8, dataSizePerEntry:8, netSizePerEntry:8,
+     * totalDataSize:8*10, totalNetSize:8*10, valueCount:10,
+     * elementCount:10, estElementCountPerArray:1, isVariableWidth:false
+     */
+    verifyColumnValues(aColumn, 8, 8, 8, 8, 80, 80, 10, 10, 1, false);
+
+    ColumnSize bColumn = sizer.columns().get("b");
+    verifyColumnValues(bColumn, 8, 8, 8, 8, 80, 80, 10, 10, 1,false);
+
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), v.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo);
+      assertEquals(testRowCountPowerTwo, v.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT);
+      assertEquals(ValueVector.MAX_ROW_COUNT, v.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v,  0);
+      assertEquals(ValueVector.MIN_ROW_COUNT, v.getValueCapacity());
+      v.clear();
+    }
+
+    rows.clear();
+    empty.clear();
+  }
+
+
+  @Test
+  public void testSizerRepeatedFixedWidth() {
+    BatchSchema schema = new SchemaBuilder().addArray("a", MinorType.BIGINT).addArray("b", MinorType.FLOAT8).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+
+    for (long i = 0; i < 10; i++) {
+      builder.addRow(new long[] {1, 2, 3, 4, 5}, new double[] {(double)i*0.1, (double)i*0.1, (double)i*0.1, (double)i*0.2, (double)i*0.3});
+    }
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(2, sizer.columns().size());
+
+    /**
+     * stdDataSize:8*10, stdNetSize:8*10+4, dataSizePerEntry:5*8, netSizePerEntry:5*8+4,
+     * totalDataSize:5*8*10, totalNetSize:5*8*10+5*8, valueCount:10,
+     * elementCount:50, estElementCountPerArray:5, isVariableWidth:false
+     */
+    verifyColumnValues(sizer.columns().get("a"),
+      80, 84, 40, 44, 400, 440, 10, 50, 5, false);
+
+    verifyColumnValues(sizer.columns().get("b"),
+      80, 84, 40, 44, 400, 440, 10, 50, 5, false);
+
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    UInt4Vector offsetVector;
+    ValueVector dataVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), offsetVector.getValueCapacity());
+      dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(Integer.highestOneBit((testRowCount * 5)  << 1), dataVector.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      // -1 is done for adjustment needed for offset vector.
+      colSize.allocateVector(v, testRowCountPowerTwo - 1);
+      offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(Integer.highestOneBit((testRowCountPowerTwo -1) * 5) << 1, dataVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT - 1);
+      offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(Integer.highestOneBit(((ValueVector.MAX_ROW_COUNT - 1)* 5) << 1), dataVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT + 1, offsetVector.getValueCapacity());
+      dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT, dataVector.getValueCapacity());
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+  @Test
+  public void testSizerNullableFixedWidth() {
+    BatchSchema schema = new SchemaBuilder().addNullable("a", MinorType.BIGINT).addNullable("b", MinorType.FLOAT8).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+
+    for (long i = 0; i < 10; i++) {
+      builder.addRow(i, (float)i*0.1);
+    }
+
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(2, sizer.columns().size());
+
+    ColumnSize aColumn = sizer.columns().get("a");
+    ColumnSize bColumn = sizer.columns().get("b");
+
+    /**
+     * stdDataSize:8, stdNetSize:8+1, dataSizePerEntry:8, netSizePerEntry:8+1,
+     * totalDataSize:8*10, totalNetSize:(8+1)*10, valueCount:10,
+     * elementCount:10, estElementCountPerArray:1, isVariableWidth:false
+     */
+    verifyColumnValues(aColumn,
+      8, 9, 8, 9, 80, 90, 10, 10, 1, false);
+
+    verifyColumnValues(bColumn,
+      8, 9, 8, 9, 80, 90, 10, 10, 1, false);
+
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    ValueVector bitVector, valueVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), bitVector.getValueCapacity());
+      valueVector = ((NullableVector) v).getValuesVector();
+      assertEquals(Integer.highestOneBit(testRowCount << 1), valueVector.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals(testRowCountPowerTwo, bitVector.getValueCapacity());
+      valueVector = ((NullableVector) v).getValuesVector();
+      assertEquals(testRowCountPowerTwo, valueVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, bitVector.getValueCapacity());
+      valueVector = ((NullableVector) v).getValuesVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, valueVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT, bitVector.getValueCapacity());
+      valueVector = ((NullableVector) v).getValuesVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT, valueVector.getValueCapacity());
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+  @Test
+  public void testSizerVariableWidth() {
+    BatchSchema schema = new SchemaBuilder().add("a", MinorType.VARCHAR).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+
+    StringBuilder stringBuilder = new StringBuilder();
+
+    // a, aa, aaa, ... aaaaaaaaaa. totalSize = (10*11)/2 = 55
+    for (long i = 0; i < 10; i++) {
+      stringBuilder.append("a");
+      builder.addRow(stringBuilder.toString());
+    }
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    ColumnSize aColumn = sizer.columns().get("a");
+
+    /**
+     * stdDataSize:50, stdNetSize:50+4, dataSizePerEntry:8, netSizePerEntry:8,
+     * totalDataSize:(10*11)/2, totalNetSize:(10*11)/2 + 4*10, valueCount:10,
+     * elementCount:10, estElementCountPerArray:1, isVariableWidth:true
+     */
+    verifyColumnValues(aColumn,
+      50, 54, 6, 10, 55, 95, 10, 10, 1, true);
+
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    UInt4Vector offsetVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      offsetVector = ((VariableWidthVector)v).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, v.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      // -1 is done for adjustment needed for offset vector.
+      colSize.allocateVector(v, testRowCountPowerTwo - 1);
+      offsetVector = ((VariableWidthVector)v).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(testRowCountPowerTwo - 1, v.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT - 1);
+      offsetVector = ((VariableWidthVector)v).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT - 1, v.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      offsetVector = ((VariableWidthVector)v).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT + 1, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, v.getValueCapacity());
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+
+  @Test
+  public void testSizerRepeatedVariableWidth() {
+    BatchSchema schema = new SchemaBuilder().addArray("b", MinorType.VARCHAR).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+
+    // size = (5*6)/2 = 15
+    String[] newString = new String [] {"a", "aa", "aaa", "aaaa", "aaaaa"};
+
+    for (long i = 0; i < 10; i++) {
+      builder.addRow((Object) (newString));
+    }
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    ColumnSize bColumn = sizer.columns().get("b");
+
+    /**
+     * stdDataSize:50*10, stdNetSize:50*10+4*10+4, dataSizePerEntry:(5*6)/2, netSizePerEntry:(5*6)/2+5*4+4,
+     * totalDataSize:(5*6)/2 * 10, totalNetSize: ((5*6)/2+5*4+4)*10, valueCount:10,
+     * elementCount:50, estElementCountPerArray:5, isVariableWidth:true
+     */
+    verifyColumnValues(bColumn, 500, 544, 15, 39, 150, 390, 10, 50, 5,true);
+
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount - 1);
+      UInt4Vector offsetVector = ((RepeatedValueVector)v).getOffsetVector();
+      assertEquals(Integer.highestOneBit(testRowCount) << 1, offsetVector.getValueCapacity());
+      VariableWidthVector vwVector = ((VariableWidthVector) ((RepeatedValueVector) v).getDataVector());
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals((Integer.highestOneBit((testRowCount-1) * 5) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit((testRowCount-1) * 5  << 1)-1, vwVector.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo);
+      offsetVector = ((RepeatedValueVector)v).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCountPowerTwo) << 1), offsetVector.getValueCapacity());
+      vwVector = ((VariableWidthVector) ((RepeatedValueVector) v).getDataVector());
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCountPowerTwo * 5) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo * 5  << 1)-1, vwVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT);
+      offsetVector = ((RepeatedValueVector)v).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT << 1, offsetVector.getValueCapacity());
+      vwVector = ((VariableWidthVector) ((RepeatedValueVector) v).getDataVector());
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals((Integer.highestOneBit(ValueVector.MAX_ROW_COUNT * 5) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(ValueVector.MAX_ROW_COUNT * 5  << 1)-1, vwVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v,  0);
+      offsetVector = ((RepeatedValueVector)v).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT + 1, offsetVector.getValueCapacity());
+      vwVector = ((VariableWidthVector) ((RepeatedValueVector) v).getDataVector());
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT + 1, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, vwVector.getValueCapacity());
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+
+  @Test
+  public void testSizerNullableVariableWidth() {
+    BatchSchema schema = new SchemaBuilder().addNullable("b", MinorType.VARCHAR).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+
+    StringBuilder stringBuilder = new StringBuilder();
+
+    for (long i = 0; i < 10; i++) {
+      stringBuilder.append("a");
+      builder.addRow( (Object) stringBuilder.toString());
+    }
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    /**
+     * stdDataSize:50, stdNetSize:50+4+1, dataSizePerEntry:ceil((10*11)/2)*10),
+     * netSizePerEntry: dataSizePerEntry+4+1,
+     * totalDataSize:(10*11)/2, totalNetSize: (10*11)/2 + (4*10) + (1*10),
+     * valueCount:10,
+     * elementCount:10, estElementCountPerArray:1, isVariableWidth:true
+     */
+    verifyColumnValues(sizer.columns().get("b"),
+      50, 55, 6, 11, 55, 105, 10, 10, 1,true);
+
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    ValueVector bitVector, valueVector;
+    VariableWidthVector vwVector;
+    UInt4Vector offsetVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), bitVector.getValueCapacity());
+      vwVector = (VariableWidthVector) ((NullableVector) v).getValuesVector();
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, vwVector.getValueCapacity());
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v,  testRowCountPowerTwo-1);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo), bitVector.getValueCapacity());
+      vwVector = (VariableWidthVector) ((NullableVector) v).getValuesVector();
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo)-1, vwVector.getValueCapacity());
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT-1);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals((Integer.highestOneBit(ValueVector.MAX_ROW_COUNT)), bitVector.getValueCapacity());
+      vwVector = (VariableWidthVector) ((NullableVector) v).getValuesVector();
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT-1, vwVector.getValueCapacity());
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v,  0);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals((Integer.highestOneBit(ValueVector.MIN_ROW_COUNT)), bitVector.getValueCapacity());
+      vwVector = (VariableWidthVector) ((NullableVector) v).getValuesVector();
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, vwVector.getValueCapacity());
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+
+  @Test
+  public void testSizerMap() {
+    BatchSchema schema = new SchemaBuilder()
+      .addMap("map")
+        .add("key", MinorType.INT)
+        .add("value", MinorType.VARCHAR)
+      .buildMap()
+      .build();
+
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+
+    for (int i = 0; i < 10; i++) {
+      builder.addRow((Object) (new Object[] {10, "a"}));
+    }
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    /**
+     * stdDataSize:50+4, stdNetSize:50+4+4, dataSizePerEntry:4+1,
+     * netSizePerEntry: 4+1+4,
+     * totalDataSize:5*10, totalNetSize:4*10+4*10+1*10,
+     * valueCount:10,
+     * elementCount:10, estElementCountPerArray:1, isVariableWidth:true
+     */
+    verifyColumnValues(sizer.columns().get("map"), 54, 58, 5, 9, 50, 90, 10, 10, 1, false);
+
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      MapVector mapVector = (MapVector)v;
+      ValueVector keyVector = mapVector.getChild("key");
+      ValueVector valueVector1 = mapVector.getChild("value");
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), keyVector.getValueCapacity());
+      UInt4Vector offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, valueVector1.getValueCapacity());
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo-1);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals((Integer.highestOneBit(testRowCountPowerTwo -1) << 1), keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo)-1, valueVector1.getValueCapacity());
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT -1);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MAX_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT-1, valueVector1.getValueCapacity());
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MIN_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, valueVector1.getValueCapacity());
+
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+
+  }
+
+  @Test
+  public void testSizerRepeatedMap() {
+    BatchSchema schema = new SchemaBuilder().addMapArray("map").
+      add("key", MinorType.INT).
+      add("value", MinorType.VARCHAR).
+      buildMap().build();
+
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+
+    for (int i = 0; i < 10; i++) {
+      builder.addRow((Object) new Object[] {
+        new Object[] {110, "a"},
+        new Object[] {120, "b"}});
+    }
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    /**
+     * stdDataSize:50+4, stdNetSize:50+4+4+4, dataSizePerEntry:(4+1)*2,
+     * netSizePerEntry: 4*2+1*2+4*2+4,
+     * totalDataSize:5*2*10, totalNetSize:netSizePerEntry*2,
+     * valueCount:10,
+     * elementCount:20, estElementCountPerArray:2, isVariableWidth:true
+     */
+    verifyColumnValues(sizer.columns().get("map"), 54,62, 10, 22, 100, 220, 10, 20, 2, false);
+
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    UInt4Vector offsetVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      RepeatedMapVector mapVector = (RepeatedMapVector)v;
+
+      offsetVector = ((RepeatedValueVector)mapVector).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), offsetVector.getValueCapacity());
+
+      ValueVector keyVector = mapVector.getChild("key");
+      ValueVector valueVector1 = mapVector.getChild("value");
+      assertEquals(((Integer.highestOneBit(testRowCount) << 1) * 2), keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1)*2, offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)*2 - 1, valueVector1.getValueCapacity());
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo-1);
+      mapVector = (RepeatedMapVector)v;
+
+      offsetVector = ((RepeatedValueVector)mapVector).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(testRowCountPowerTwo*2, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(testRowCountPowerTwo*2, offsetVector.getValueCapacity());
+      assertEquals(testRowCountPowerTwo*2 - 1, valueVector1.getValueCapacity());
+
+      // Allocate for max rows.
+      colSize.allocateVector(v,  ValueVector.MAX_ROW_COUNT -1);
+      mapVector = (RepeatedMapVector)v;
+
+      offsetVector = ((RepeatedValueVector)mapVector).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MAX_ROW_COUNT * 2, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT * 2, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT * 2 - 1, valueVector1.getValueCapacity());
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      mapVector = (RepeatedMapVector)v;
+
+      offsetVector = ((RepeatedValueVector)mapVector).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT, offsetVector.getValueCapacity());
+
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MIN_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, valueVector1.getValueCapacity());
+
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+  @Test
+  public void testSizerNestedMap() {
+    BatchSchema schema = new SchemaBuilder()
+      .addMap("map")
+        .add("key", MinorType.INT)
+        .add("value", MinorType.VARCHAR)
+        .addMap("childMap")
+          .add("childKey", MinorType.INT)
+          .add("childValue", MinorType.VARCHAR)
+          .buildMap()
+       .buildMap()
+      .build();
+
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+
+    for (int i = 0; i < 10; i++) {
+      builder.addRow((Object) (new Object[] {10, "a", new Object[] {5, "b"}}));
+    }
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    /**
+     * stdDataSize:(50+4)*2, stdNetSize:(50+4)*2+4+4, dataSizePerEntry:(4+1)*2,
+     * netSizePerEntry: 4*2+1*2+4*2,
+     * totalDataSize:5*2*10, totalNetSize:netSizePerEntry*2,
+     * valueCount:10,
+     * elementCount:10, estElementCountPerArray:1, isVariableWidth:true
+     */
+    verifyColumnValues(sizer.columns().get("map"), 108, 116, 10, 18, 100, 180, 10, 10, 1, false);
+
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    UInt4Vector offsetVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      MapVector mapVector = (MapVector)v;
+      ValueVector keyVector = mapVector.getChild("key");
+      ValueVector valueVector1 = mapVector.getChild("value");
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, valueVector1.getValueCapacity());
+      MapVector childMapVector = (MapVector) mapVector.getChild("childMap");
+      ValueVector childKeyVector = childMapVector.getChild("childKey");
+      ValueVector childValueVector1 = childMapVector.getChild("childValue");
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), childKeyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, childValueVector1.getValueCapacity());
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo-1);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(testRowCountPowerTwo, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(testRowCountPowerTwo-1, valueVector1.getValueCapacity());
+      childMapVector = (MapVector) mapVector.getChild("childMap");
+      childKeyVector = childMapVector.getChild("childKey");
+      childValueVector1 = childMapVector.getChild("childValue");
+      assertEquals(testRowCountPowerTwo, childKeyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(testRowCountPowerTwo-1, childValueVector1.getValueCapacity());
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT-1);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MAX_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT-1, valueVector1.getValueCapacity());
+      childMapVector = (MapVector) mapVector.getChild("childMap");
+      childKeyVector = childMapVector.getChild("childKey");
+      childValueVector1 = childMapVector.getChild("childValue");
+      assertEquals(ValueVector.MAX_ROW_COUNT, childKeyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT-1, childValueVector1.getValueCapacity());
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v,  0);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MIN_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, valueVector1.getValueCapacity());
+      childMapVector = (MapVector) mapVector.getChild("childMap");
+      childKeyVector = childMapVector.getChild("childKey");
+      childValueVector1 = childMapVector.getChild("childValue");
+      assertEquals(ValueVector.MIN_ROW_COUNT, childKeyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, childValueVector1.getValueCapacity());
+
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/47c5d1fe/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 44a467e..58ed57b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -84,6 +84,7 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
    */
 
   int MAX_ROW_COUNT = Character.MAX_VALUE + 1;
+  int MIN_ROW_COUNT = 1;
 
   // Commonly-used internal vector names