You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/07/02 07:45:28 UTC

[drill] branch master updated: DRILL-6310: limit batch size for hash aggregate

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new f1a3bd1  DRILL-6310: limit batch size for hash aggregate
f1a3bd1 is described below

commit f1a3bd12c2fd3ad525642933ace2d7b9cedaacc9
Author: Padma Penumarthy <pp...@yahoo.com>
AuthorDate: Tue Jun 26 14:00:07 2018 -0700

    DRILL-6310: limit batch size for hash aggregate
    
    closes #1324
---
 .../exec/physical/impl/aggregate/HashAggBatch.java | 137 ++++++++++++--
 .../physical/impl/aggregate/HashAggTemplate.java   | 139 ++++++++------
 .../physical/impl/aggregate/HashAggregator.java    |   2 +
 .../exec/physical/impl/common/HashPartition.java   |   4 +-
 .../drill/exec/physical/impl/common/HashTable.java |  10 +-
 .../impl/common/HashTableAllocationTracker.java    |  25 +--
 .../physical/impl/common/HashTableTemplate.java    | 132 ++++++++------
 .../exec/record/RecordBatchMemoryManager.java      |  26 ++-
 .../common/HashTableAllocationTrackerTest.java     |  46 ++---
 .../exec/physical/unit/TestOutputBatchSize.java    | 199 +++++++++++++++++++++
 10 files changed, 557 insertions(+), 163 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 57e9bd7..d37631b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -19,15 +19,19 @@ package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.FunctionHolderExpression;
 import org.apache.drill.common.expression.IfExpression;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
@@ -49,11 +53,14 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.sun.codemodel.JExpr;
@@ -71,6 +78,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   private BatchSchema incomingSchema;
   private boolean wasKilled;
 
+  private int numGroupByExprs, numAggrExprs;
+
+  // This map saves the mapping between outgoing column and incoming column.
+  private Map<String, String> columnMapping;
+  private final HashAggMemoryManager hashAggMemoryManager;
+
   private final GeneratorMapping UPDATE_AGGR_INSIDE =
       GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
           "resetValues" /* reset */, "cleanup" /* cleanup */);
@@ -84,6 +97,67 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
           "htRowIdx" /* workspace index */, "incoming" /* read container */, "outgoing" /* write container */,
           "aggrValuesContainer" /* workspace container */, UPDATE_AGGR_INSIDE, UPDATE_AGGR_OUTSIDE, UPDATE_AGGR_INSIDE);
 
+  public int getOutputRowCount() {
+    return hashAggMemoryManager.getOutputRowCount();
+  }
+
+  public RecordBatchMemoryManager getRecordBatchMemoryManager() {
+    return hashAggMemoryManager;
+  }
+
+  private class HashAggMemoryManager extends RecordBatchMemoryManager {
+    private int valuesRowWidth = 0;
+
+    HashAggMemoryManager(int outputBatchSize) {
+      super(outputBatchSize);
+    }
+
+    @Override
+    public void update() {
+      // Get sizing information for the batch.
+      setRecordBatchSizer(new RecordBatchSizer(incoming));
+
+      int fieldId = 0;
+      int newOutgoingRowWidth = 0;
+      for (VectorWrapper<?> w : container) {
+        if (w.getValueVector() instanceof FixedWidthVector) {
+          newOutgoingRowWidth += ((FixedWidthVector) w.getValueVector()).getValueWidth();
+          if (fieldId >= numGroupByExprs) {
+            valuesRowWidth += ((FixedWidthVector) w.getValueVector()).getValueWidth();
+          }
+        } else {
+          int columnWidth;
+          if (columnMapping.get(w.getValueVector().getField().getName()) == null) {
+             columnWidth = TypeHelper.getSize(w.getField().getType());
+          } else {
+            RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(columnMapping.get(w.getValueVector().getField().getName()));
+            if (columnSize == null) {
+              columnWidth = TypeHelper.getSize(w.getField().getType());
+            } else {
+              columnWidth = columnSize.getAllocSizePerEntry();
+            }
+          }
+          newOutgoingRowWidth += columnWidth;
+          if (fieldId >= numGroupByExprs) {
+            valuesRowWidth += columnWidth;
+          }
+        }
+        fieldId++;
+      }
+
+      if (updateIfNeeded(newOutgoingRowWidth)) {
+        // There is an update to outgoing row width.
+        // un comment this if we want to adjust the batch row count of in flight batches.
+        // To keep things simple, we are not doing this adjustment for now.
+        // aggregator.adjustOutputCount(getOutputBatchSize(), getOutgoingRowWidth(), newOutgoingRowWidth);
+      }
+
+      updateIncomingStats();
+      if (logger.isDebugEnabled()) {
+        logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
+      }
+    }
+  }
 
   public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) {
     super(popConfig, context);
@@ -103,6 +177,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
     boolean allowed = oContext.getAllocator().setLenient();
     logger.debug("Config: Is allocator lenient? {}", allowed);
+
+    // get the output batch size from config.
+    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    hashAggMemoryManager = new HashAggMemoryManager(configuredBatchSize);
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+    columnMapping = CaseInsensitiveMap.newHashMap();
   }
 
   @Override
@@ -136,6 +217,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     for (VectorWrapper<?> w : container) {
       AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 0);
     }
+    if (incoming.getRecordCount() > 0) {
+      hashAggMemoryManager.update();
+    }
   }
 
   @Override
@@ -239,8 +323,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     // top.saveCodeForDebugging(true);
     container.clear();
 
-    int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0;
-    int numAggrExprs = (popConfig.getAggrExprs() != null) ? popConfig.getAggrExprs().size() : 0;
+    numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0;
+    numAggrExprs = (popConfig.getAggrExprs() != null) ? popConfig.getAggrExprs().size() : 0;
     aggrExprs = new LogicalExpression[numAggrExprs];
     groupByOutFieldIds = new TypedFieldId[numGroupByExprs];
     aggrOutFieldIds = new TypedFieldId[numAggrExprs];
@@ -263,13 +347,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
       // add this group-by vector to the output container
       groupByOutFieldIds[i] = container.add(vv);
+      columnMapping.put(outputField.getName(), ne.getExpr().toString().replace('`',' ').trim());
     }
 
     int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column
     for (i = 0; i < numAggrExprs; i++) {
       NamedExpression ne = popConfig.getAggrExprs().get(i);
-      final LogicalExpression expr =
-          ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
 
       if (expr instanceof IfExpression) {
         throw UserException.unsupportedError(new UnsupportedOperationException("Union type not supported in aggregate functions")).build(logger);
@@ -283,16 +367,28 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         continue;
       }
 
-      if ( expr instanceof FunctionHolderExpression ) {
-         String funcName = ((FunctionHolderExpression) expr).getName();
-         if ( funcName.equals("sum") || funcName.equals("max") || funcName.equals("min") ) {extraNonNullColumns++;}
-      }
       final MaterializedField outputField = MaterializedField.create(ne.getRef().getAsNamePart().getName(), expr.getMajorType());
-      @SuppressWarnings("resource")
-      ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+      @SuppressWarnings("resource") ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
       aggrOutFieldIds[i] = container.add(vv);
 
       aggrExprs[i] = new ValueVectorWriteExpression(aggrOutFieldIds[i], expr, true);
+
+      if (expr instanceof FunctionHolderExpression) {
+        String funcName = ((FunctionHolderExpression) expr).getName();
+        if (funcName.equals("sum") || funcName.equals("max") || funcName.equals("min")) {
+          extraNonNullColumns++;
+        }
+        if (((FunctionCall) ne.getExpr()).args.get(0) instanceof SchemaPath) {
+          columnMapping.put(outputField.getName(), ((SchemaPath) ((FunctionCall) ne.getExpr()).args.get(0)).getAsNamePart().getName());
+        }  else if (((FunctionCall) ne.getExpr()).args.get(0) instanceof FunctionCall) {
+          FunctionCall functionCall = (FunctionCall) ((FunctionCall) ne.getExpr()).args.get(0);
+          if (functionCall.args.get(0) instanceof SchemaPath) {
+            columnMapping.put(outputField.getName(), ((SchemaPath) functionCall.args.get(0)).getAsNamePart().getName());
+          }
+        }
+      } else {
+        columnMapping.put(outputField.getName(), ne.getRef().getAsNamePart().getName());
+      }
     }
 
     setupUpdateAggrValues(cgInner);
@@ -345,11 +441,32 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     }
   }
 
+  private void updateStats() {
+    stats.setLongStat(HashAggTemplate.Metric.INPUT_BATCH_COUNT, hashAggMemoryManager.getNumIncomingBatches());
+    stats.setLongStat(HashAggTemplate.Metric.AVG_INPUT_BATCH_BYTES, hashAggMemoryManager.getAvgInputBatchSize());
+    stats.setLongStat(HashAggTemplate.Metric.AVG_INPUT_ROW_BYTES, hashAggMemoryManager.getAvgInputRowWidth());
+    stats.setLongStat(HashAggTemplate.Metric.INPUT_RECORD_COUNT, hashAggMemoryManager.getTotalInputRecords());
+    stats.setLongStat(HashAggTemplate.Metric.OUTPUT_BATCH_COUNT, hashAggMemoryManager.getNumOutgoingBatches());
+    stats.setLongStat(HashAggTemplate.Metric.AVG_OUTPUT_BATCH_BYTES, hashAggMemoryManager.getAvgOutputBatchSize());
+    stats.setLongStat(HashAggTemplate.Metric.AVG_OUTPUT_ROW_BYTES, hashAggMemoryManager.getAvgOutputRowWidth());
+    stats.setLongStat(HashAggTemplate.Metric.OUTPUT_RECORD_COUNT, hashAggMemoryManager.getTotalOutputRecords());
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(),
+        hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords());
+
+      logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(),
+        hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords());
+    }
+  }
   @Override
   public void close() {
     if (aggregator != null) {
       aggregator.cleanup();
     }
+    updateStats();
     super.close();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 3b50471..2f3bc23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -79,6 +79,7 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import org.apache.drill.exec.vector.VariableWidthVector;
 
+import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
 import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE;
 
 public abstract class HashAggTemplate implements HashAggregator {
@@ -159,8 +160,6 @@ public abstract class HashAggTemplate implements HashAggregator {
   private int operatorId; // for the spill file name
 
   private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put()
-  private IndexPointer outStartIdxHolder;
-  private IndexPointer outNumRecordsHolder;
   private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields
   private TypedFieldId[] groupByOutFieldIds;
 
@@ -185,7 +184,15 @@ public abstract class HashAggTemplate implements HashAggregator {
                       // then later re-read. So, disk I/O is twice this amount.
                       // For first phase aggr -- this is an estimate of the amount of data
                       // returned early (analogous to a spill in the 2nd phase).
-    SPILL_CYCLE       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+    SPILL_CYCLE,       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+    INPUT_BATCH_COUNT,
+    AVG_INPUT_BATCH_BYTES,
+    AVG_INPUT_ROW_BYTES,
+    INPUT_RECORD_COUNT,
+    OUTPUT_BATCH_COUNT,
+    AVG_OUTPUT_BATCH_BYTES,
+    AVG_OUTPUT_ROW_BYTES,
+    OUTPUT_RECORD_COUNT;
     ;
 
     @Override
@@ -195,16 +202,29 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   public class BatchHolder {
-
     private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
     private int maxOccupiedIdx = -1;
-    private int batchOutputCount = 0;
+    private int targetBatchRowCount = 0;
+
+    public int getTargetBatchRowCount() {
+      return targetBatchRowCount;
+    }
+
+    public void setTargetBatchRowCount(int batchRowCount) {
+      this.targetBatchRowCount = batchRowCount;
+    }
+
+    public int getCurrentRowCount() {
+      return (maxOccupiedIdx + 1);
+    }
 
     @SuppressWarnings("resource")
-    public BatchHolder() {
+    public BatchHolder(int batchRowCount) {
 
       aggrValuesContainer = new VectorContainer();
       boolean success = false;
+      this.targetBatchRowCount = batchRowCount;
+
       try {
         ValueVector vector;
 
@@ -220,12 +240,12 @@ public abstract class HashAggTemplate implements HashAggregator {
           // BatchHolder in HashTable, causing the HashTable to be space inefficient. So it is better to allocate space
           // to fit as close to as BATCH_SIZE records.
           if (vector instanceof FixedWidthVector) {
-            ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE);
+            ((FixedWidthVector) vector).allocateNew(batchRowCount);
           } else if (vector instanceof VariableWidthVector) {
             // This case is never used .... a varchar falls under ObjectVector which is allocated on the heap !
-            ((VariableWidthVector) vector).allocateNew(maxColumnWidth, HashTable.BATCH_SIZE);
+            ((VariableWidthVector) vector).allocateNew(maxColumnWidth, batchRowCount);
           } else if (vector instanceof ObjectVector) {
-            ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE);
+            ((ObjectVector) vector).allocateNew(batchRowCount);
           } else {
             vector.allocateNew();
           }
@@ -252,17 +272,12 @@ public abstract class HashAggTemplate implements HashAggregator {
       catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
     }
 
-    private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) {
-      outStartIdxHolder.value = batchOutputCount;
-      outNumRecordsHolder.value = 0;
-      for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
-        try { outputRecordValues(i, batchOutputCount); }
-        catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
-        if (EXTRA_DEBUG_2) {
-          logger.debug("Outputting values to output index: {}", batchOutputCount);
+    private void outputValues() {
+      for (int i = 0; i <= maxOccupiedIdx; i++) {
+        try {
+          outputRecordValues(i, i);
         }
-        batchOutputCount++;
-        outNumRecordsHolder.value++;
+        catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
       }
     }
 
@@ -275,7 +290,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     private int getNumPendingOutput() {
-      return getNumGroups() - batchOutputCount;
+      return getNumGroups();
     }
 
     // Code-generated methods (implemented in HashAggBatch)
@@ -349,9 +364,6 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     this.htIdxHolder = new IndexPointer();
-    this.outStartIdxHolder = new IndexPointer();
-    this.outNumRecordsHolder = new IndexPointer();
-
     materializedValueFields = new MaterializedField[valueFieldIds.size()];
 
     if (valueFieldIds.size() > 0) {
@@ -513,7 +525,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   private void updateEstMaxBatchSize(RecordBatch incoming) {
     if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or varchar) change
     // Use the sizer to get the input row width and the length of the longest varchar column
-    RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+    RecordBatchSizer sizer = outgoing.getRecordBatchMemoryManager().getRecordBatchSizer();
     logger.trace("Incoming sizer: {}",sizer);
     // An empty batch only has the schema, can not tell actual length of varchars
     // else use the actual varchars length, each capped at 50 (to match the space allocation)
@@ -654,6 +666,8 @@ public abstract class HashAggTemplate implements HashAggregator {
           // remember EMIT, but continue like handling OK
 
         case OK:
+          outgoing.getRecordBatchMemoryManager().update();
+
           currentBatchRecordCount = incoming.getRecordCount(); // size of next batch
 
           resetIndex(); // initialize index (a new batch needs to be processed)
@@ -790,6 +804,22 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   @Override
+  public void adjustOutputCount(int outputBatchSize, int oldRowWidth, int newRowWidth) {
+    for (int i = 0; i < numPartitions; i++ ) {
+      if (batchHolders[i] == null || batchHolders[i].size() == 0) {
+        continue;
+      }
+      BatchHolder bh = batchHolders[i].get(batchHolders[i].size()-1);
+      // Divide remaining memory by new row width.
+      final int remainingRows = RecordBatchSizer.safeDivide(Math.max((outputBatchSize - (bh.getCurrentRowCount() * oldRowWidth)), 0), newRowWidth);
+      // Do not go beyond the current target row count as this might cause reallocs for fixed width vectors.
+      final int newRowCount = Math.min(bh.getTargetBatchRowCount(), bh.getCurrentRowCount() + remainingRows);
+      bh.setTargetBatchRowCount(newRowCount);
+      htables[i].setTargetBatchRowCount(newRowCount);
+    }
+  }
+
+  @Override
   public void cleanup() {
     if ( schema == null ) { return; } // not set up; nothing to clean
     if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
@@ -836,8 +866,6 @@ public abstract class HashAggTemplate implements HashAggregator {
     spillSet.close(); // delete the spill directory(ies)
     htIdxHolder = null;
     materializedValueFields = null;
-    outStartIdxHolder = null;
-    outNumRecordsHolder = null;
   }
 
   // First free the memory used by the given (spilled) partition (i.e., hash table plus batches)
@@ -853,6 +881,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
     batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
 
+    outBatchIndex[part] = 0;
     // in case the reserve memory was used, try to restore
     restoreReservedMemory();
   }
@@ -962,17 +991,14 @@ public abstract class HashAggTemplate implements HashAggregator {
     for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) {
 
       // get the number of records in the batch holder that are pending output
-      int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
+      int numOutputRecords = currPartition.get(currOutBatchIndex).getNumPendingOutput();
 
-      rowsInPartition += numPendingOutput;  // for logging
-      rowsSpilled += numPendingOutput;
+      rowsInPartition += numOutputRecords;  // for logging
+      rowsSpilled += numOutputRecords;
 
-      allocateOutgoing(numPendingOutput);
-
-      currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
-      int numOutputRecords = outNumRecordsHolder.value;
-
-      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value, numPendingOutput);
+      allocateOutgoing(numOutputRecords);
+      currPartition.get(currOutBatchIndex).outputValues();
+      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, numOutputRecords);
 
       // set the value count for outgoing batch value vectors
       /* int i = 0; */
@@ -992,8 +1018,8 @@ public abstract class HashAggTemplate implements HashAggregator {
         */
       }
 
-      outContainer.setRecordCount(numPendingOutput);
-      WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
+      outContainer.setRecordCount(numOutputRecords);
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(numOutputRecords, outContainer, false);
       try {
         writers[part].write(batch, null);
       } catch (IOException ioe) {
@@ -1004,7 +1030,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         batch.clear();
       }
       outContainer.zeroVectors();
-      logger.trace("HASH AGG: Took {} us to spill {} records", writers[part].time(TimeUnit.MICROSECONDS), numPendingOutput);
+      logger.trace("HASH AGG: Took {} us to spill {} records", writers[part].time(TimeUnit.MICROSECONDS), numOutputRecords);
     }
 
     spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches
@@ -1012,9 +1038,9 @@ public abstract class HashAggTemplate implements HashAggregator {
     logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part);
   }
 
-  private void addBatchHolder(int part) {
+  private void addBatchHolder(int part, int batchRowCount) {
 
-    BatchHolder bh = newBatchHolder();
+    BatchHolder bh = newBatchHolder(batchRowCount);
     batchHolders[part].add(bh);
     if (EXTRA_DEBUG_1) {
       logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size());
@@ -1024,8 +1050,8 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   // These methods are overridden in the generated class when created as plain Java code.
-  protected BatchHolder newBatchHolder() {
-    return new BatchHolder();
+  protected BatchHolder newBatchHolder(int batchRowCount) {
+    return new BatchHolder(batchRowCount);
   }
 
   /**
@@ -1161,20 +1187,21 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     allocateOutgoing(numPendingOutput);
 
-    currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
-    int numOutputRecords = outNumRecordsHolder.value;
-
-    if (EXTRA_DEBUG_1) {
-      logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value);
-    }
-
-    this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value, numPendingOutput);
+    currPartition.get(currOutBatchIndex).outputValues();
+    int numOutputRecords = numPendingOutput;
+    this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, numPendingOutput);
 
     // set the value count for outgoing batch value vectors
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(numOutputRecords);
     }
 
+    outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(outgoing));
+    }
+
     this.outcome = IterOutcome.OK;
 
     if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
@@ -1271,6 +1298,10 @@ public abstract class HashAggTemplate implements HashAggregator {
     return errmsg;
   }
 
+  private int getTargetBatchCount() {
+    return outgoing.getOutputRowCount();
+  }
+
   // Check if a group is present in the hash table; if not, insert it in the hash table.
   // The htIdxHolder contains the index of the group in the hash table container; this same
   // index is also used for the aggregation values maintained by the hash aggregate.
@@ -1338,7 +1369,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     // ==========================================
     try {
 
-      putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
+      putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode, getTargetBatchCount());
 
     } catch (RetryAfterSpillException re) {
       if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
@@ -1372,7 +1403,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
         useReservedValuesMemory(); // try to preempt an OOM by using the reserve
 
-        addBatchHolder(currentPartition);  // allocate a new (internal) values batch
+        addBatchHolder(currentPartition, getTargetBatchCount());  // allocate a new (internal) values batch
 
         restoreReservedMemory(); // restore the reserve, if possible
         // A reason to check for a spill - In case restore-reserve failed
@@ -1408,8 +1439,8 @@ public abstract class HashAggTemplate implements HashAggregator {
     // Locate the matching aggregate columns and perform the aggregation
     // =================================================================
     int currentIdx = htIdxHolder.value;
-    BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & HashTable.BATCH_MASK);
-    int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
+    BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & BATCH_MASK);
+    int idxWithinBatch = currentIdx & BATCH_MASK;
     if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
       numGroupedRecords++;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 35e6d53..f58be89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -68,4 +68,6 @@ public interface HashAggregator {
   boolean earlyOutput();
 
   RecordBatch getNewIncoming();
+
+  void adjustOutputCount(int outputBatchSize, int oldRowWidth, int newRowWidth);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index d80237c..eaccd33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -53,6 +53,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE;
+
 /**
  * <h2>Overview</h2>
  * <p>
@@ -498,7 +500,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
       for (int recInd = 0; recInd < currentRecordCount; recInd++) {
         int hashCode = HV_vector.getAccessor().get(recInd);
         try {
-          hashTable.put(recInd, htIndex, hashCode);
+          hashTable.put(recInd, htIndex, hashCode, BATCH_SIZE);
         } catch (RetryAfterSpillException RE) {
           throw new OutOfMemoryException("HT put");
         } // Hash Join does not retry
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 194c865..3bf4b86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -82,7 +82,7 @@ public interface HashTable {
    */
   int getProbeHashCode(int incomingRowIdx) throws SchemaChangeException;
 
-  PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
+  PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode, int batchSize) throws SchemaChangeException, RetryAfterSpillException;
 
   /**
    * @param incomingRowIdx The index of the key in the probe batch.
@@ -130,12 +130,10 @@ public interface HashTable {
    * Retrieves the key columns and transfers them to the output container. Note this operation removes the key columns from the {@link HashTable}.
    * @param batchIdx The index of a {@link HashTableTemplate.BatchHolder} in the HashTable.
    * @param outContainer The destination container for the key columns.
-   * @param outStartIndex The start index of the key records to transfer.
    * @param numRecords The number of key recorts to transfer.
-   * @param numExpectedRecords
    * @return
    */
-  boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
+  boolean outputKeys(int batchIdx, VectorContainer outContainer, int numRecords);
 
   /**
    * Returns a message containing memory usage statistics. Intended to be used for printing debugging or error messages.
@@ -148,6 +146,10 @@ public interface HashTable {
    * @return
    */
   long getActualSize();
+
+  void setTargetBatchRowCount(int batchRowCount);
+
+  int getTargetBatchRowCount();
 }
 
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java
index d72278d..7f38ee6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTracker.java
@@ -32,42 +32,35 @@ class HashTableAllocationTracker
   }
 
   private final HashTableConfig config;
-  private final int maxBatchHolderSize;
-
   private State state = State.NO_ALLOCATION_IN_PROGRESS;
   private int remainingCapacity;
 
-  protected HashTableAllocationTracker(final HashTableConfig config,
-                                       final int maxBatchHolderSize)
+  protected HashTableAllocationTracker(final HashTableConfig config)
   {
     this.config = Preconditions.checkNotNull(config);
-    this.maxBatchHolderSize = maxBatchHolderSize;
-
     remainingCapacity = config.getInitialCapacity();
   }
 
-  public int getNextBatchHolderSize() {
+  public int getNextBatchHolderSize(int batchSize) {
     state = State.ALLOCATION_IN_PROGRESS;
 
     if (!config.getInitialSizeIsFinal()) {
-      // We don't know the final size of the hash table, so return the default max batch holder size
-      return maxBatchHolderSize;
+      // We don't know the final size of the hash table, so just return the batch size.
+      return batchSize;
     } else {
       // We know the final size of the hash table so we need to compute the next batch holder size.
-
       Preconditions.checkState(remainingCapacity > 0);
-      return computeNextBatchHolderSize();
+      return computeNextBatchHolderSize(batchSize);
     }
   }
 
-  private int computeNextBatchHolderSize() {
-    return Math.min(remainingCapacity, maxBatchHolderSize);
+  private int computeNextBatchHolderSize(int batchSize) {
+    return Math.min(batchSize, remainingCapacity);
   }
 
-  public void commit() {
+  public void commit(int batchSize) {
     Preconditions.checkState(state.equals(State.ALLOCATION_IN_PROGRESS));
-
-    remainingCapacity -= computeNextBatchHolderSize();
+    remainingCapacity -= batchSize;
     state = State.NO_ALLOCATION_IN_PROGRESS;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index da916f3..756b3f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -65,7 +65,9 @@ public abstract class HashTableTemplate implements HashTable {
   // Array of batch holders..each batch holder can hold up to BATCH_SIZE entries
   private ArrayList<BatchHolder> batchHolders;
 
-  private int totalBatchHoldersSize; // the size of all batchHolders
+  private int totalIndexSize; // index size of all batchHolders including current batch
+  private int prevIndexSize; // index size of all batchHolders not including current batch
+  private int currentIndexSize; // prevIndexSize + current batch count.
 
   // Current size of the hash table in terms of number of buckets
   private int tableSize = 0;
@@ -127,13 +129,21 @@ public abstract class HashTableTemplate implements HashTable {
     private IntVector hashValues;
 
     private int maxOccupiedIdx = -1;
-//    private int batchOutputCount = 0;
-
+    private int targetBatchRowCount;
     private int batchIndex = 0;
 
+    public void setTargetBatchRowCount(int targetBatchRowCount) {
+      this.targetBatchRowCount = targetBatchRowCount;
+    }
+
+    public int getTargetBatchRowCount() {
+      return targetBatchRowCount;
+    }
+
     public BatchHolder(int idx, int newBatchHolderSize) {
 
       this.batchIndex = idx;
+      this.targetBatchRowCount = newBatchHolderSize;
 
       htContainer = new VectorContainer();
       boolean success = false;
@@ -152,7 +162,7 @@ public abstract class HashTableTemplate implements HashTable {
           } else if (vv instanceof VariableWidthVector) {
             long beforeMem = allocator.getAllocatedMemory();
             ((VariableWidthVector) vv).allocateNew(MAX_VARCHAR_SIZE * newBatchHolderSize, newBatchHolderSize);
-            logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, MAX_VARCHAR_SIZE);
+            logger.trace("HT allocated {} for varchar of max width {}", allocator.getAllocatedMemory() - beforeMem, MAX_VARCHAR_SIZE);
           } else {
             vv.allocateNew();
           }
@@ -164,7 +174,9 @@ public abstract class HashTableTemplate implements HashTable {
       } finally {
         if (!success) {
           htContainer.clear();
-          if (links != null) { links.clear();}
+          if (links != null) {
+            links.clear();
+          }
         }
       }
     }
@@ -190,15 +202,14 @@ public abstract class HashTableTemplate implements HashTable {
     private boolean isKeyMatch(int incomingRowIdx,
         IndexPointer currentIdxHolder,
         boolean isProbe) throws SchemaChangeException {
-
       int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
       boolean match;
 
-      if (currentIdxWithinBatch >= HashTable.BATCH_SIZE) {
-        logger.debug("Batch size = {}, incomingRowIdx = {}, currentIdxWithinBatch = {}.", HashTable.BATCH_SIZE,
-            incomingRowIdx, currentIdxWithinBatch);
+      if (currentIdxWithinBatch >= batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK).getTargetBatchRowCount()) {
+        logger.debug("Batch size = {}, incomingRowIdx = {}, currentIdxWithinBatch = {}.",
+          batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK).getTargetBatchRowCount(), incomingRowIdx, currentIdxWithinBatch);
       }
-      assert (currentIdxWithinBatch < HashTable.BATCH_SIZE);
+      assert (currentIdxWithinBatch < batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK).getTargetBatchRowCount());
       assert (incomingRowIdx < HashTable.BATCH_SIZE);
 
       if (isProbe) {
@@ -217,7 +228,6 @@ public abstract class HashTableTemplate implements HashTable {
     // container at the specified index
     private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) throws SchemaChangeException {
       int currentIdxWithinBatch = currentIdx & BATCH_MASK;
-
       setValue(incomingRowIdx, currentIdxWithinBatch);
       // setValue may OOM when doubling of one of the VarChar Key Value Vectors
       // This would be caught and retried later (setValue() is idempotent)
@@ -280,8 +290,7 @@ public abstract class HashTableTemplate implements HashTable {
           while (true) {
             if (idx != EMPTY_SLOT) {
               idxWithinBatch = idx & BATCH_MASK;
-              int batchIdx = ((idx >>> 16) & BATCH_MASK);
-              bh = batchHolders.get(batchIdx);
+              bh = batchHolders.get((idx >>> 16) & BATCH_MASK);
             }
 
             if (bh == this && newLinks.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) {
@@ -332,7 +341,7 @@ public abstract class HashTableTemplate implements HashTable {
       hashValues = newHashValues;
     }
 
-    private boolean outputKeys(VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords) {
+    private boolean outputKeys(VectorContainer outContainer, int numRecords) {
       // set the value count for htContainer's value vectors before the transfer ..
       setValueCount();
 
@@ -344,18 +353,9 @@ public abstract class HashTableTemplate implements HashTable {
         @SuppressWarnings("resource")
         ValueVector targetVV = outgoingIter.next().getValueVector();
         TransferPair tp = sourceVV.makeTransferPair(targetVV);
-        if ( outStartIndex == 0 && numRecords == numExpectedRecords ) {
-          // The normal case: The whole column key(s) are transfered as is
-          tp.transfer();
-        } else {
-          // Transfer just the required section (does this ever happen ?)
-          // Requires an expensive allocation and copy
-          logger.debug("Performing partial output of keys, from index {}, num {} (out of {})",
-              outStartIndex,numRecords,numExpectedRecords);
-          tp.splitAndTransfer(outStartIndex, numRecords);
-        }
+        // The normal case: The whole column key(s) are transfered as is
+        tp.transfer();
       }
-
       return true;
     }
 
@@ -469,7 +469,7 @@ public abstract class HashTableTemplate implements HashTable {
     this.incomingProbe = incomingProbe;
     this.outgoing = outgoing;
     this.htContainerOrig = htContainerOrig;
-    this.allocationTracker = new HashTableAllocationTracker(htConfig, BATCH_SIZE);
+    this.allocationTracker = new HashTableAllocationTracker(htConfig);
 
     // round up the initial capacity to nearest highest power of 2
     tableSize = roundUpToPowerOf2(initialCap);
@@ -486,9 +486,12 @@ public abstract class HashTableTemplate implements HashTable {
 
     // Create the first batch holder
     batchHolders = new ArrayList<BatchHolder>();
-    totalBatchHoldersSize = 0;
     // First BatchHolder is created when the first put request is received.
 
+    prevIndexSize = 0;
+    currentIndexSize = 0;
+    totalIndexSize = 0;
+
     try {
       doSetup(incomingBuild, incomingProbe);
     } catch (SchemaChangeException e) {
@@ -501,7 +504,7 @@ public abstract class HashTableTemplate implements HashTable {
   @Override
   public void updateInitialCapacity(int initialCapacity) {
     htConfig = htConfig.withInitialCapacity(initialCapacity);
-    allocationTracker = new HashTableAllocationTracker(htConfig, BATCH_SIZE);
+    allocationTracker = new HashTableAllocationTracker(htConfig);
     enlargeEmptyHashTableIfNeeded(initialCapacity);
   }
 
@@ -548,7 +551,9 @@ public abstract class HashTableTemplate implements HashTable {
       }
       batchHolders.clear();
       batchHolders = null;
-      totalBatchHoldersSize = 0;
+      prevIndexSize = 0;
+      currentIndexSize = 0;
+      totalIndexSize = 0;
     }
     startIndices.clear();
     // currentIdxHolder = null; // keep IndexPointer in case HT is reused
@@ -574,10 +579,15 @@ public abstract class HashTableTemplate implements HashTable {
     if ( batchAdded ) {
       logger.trace("OOM - Removing index {} from the batch holders list",batchHolders.size() - 1);
       BatchHolder bh = batchHolders.remove(batchHolders.size() - 1);
-      totalBatchHoldersSize -= BATCH_SIZE;
+      prevIndexSize = batchHolders.size() > 1 ? (batchHolders.size()-1) * BATCH_SIZE : 0;
+      currentIndexSize = prevIndexSize + (batchHolders.size() > 0 ? batchHolders.get(batchHolders.size()-1).getTargetBatchRowCount() : 0);
+      totalIndexSize = batchHolders.size() * BATCH_SIZE;
+      // update freeIndex to point to end of last batch + 1
+      freeIndex = totalIndexSize + 1;
       bh.clear();
+    } else {
+      freeIndex--;
     }
-    freeIndex--;
     throw new RetryAfterSpillException();
   }
 
@@ -619,7 +629,7 @@ public abstract class HashTableTemplate implements HashTable {
    * @return Status - the key(s) was ADDED or was already PRESENT
    */
   @Override
-  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException {
+  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode, int targetBatchRowCount) throws SchemaChangeException, RetryAfterSpillException {
 
     int bucketIndex = getBucketIndex(hashCode, numBuckets());
     int startIdx = startIndices.getAccessor().get(bucketIndex);
@@ -634,7 +644,7 @@ public abstract class HashTableTemplate implements HashTable {
           /* isKeyMatch() below also advances the currentIdxHolder to the next link */) {
 
       // remember the current link, which would be the last when the next link is empty
-      lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK);
+      lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK);
       lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
 
       if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
@@ -647,14 +657,18 @@ public abstract class HashTableTemplate implements HashTable {
     currentIdx = freeIndex++;
     boolean addedBatch = false;
     try {  // ADD A BATCH
-      addedBatch = addBatchIfNeeded(currentIdx);
+      addedBatch = addBatchIfNeeded(currentIdx, targetBatchRowCount);
+      if (addedBatch) {
+        // If we just added the batch, update the current index to point to beginning of new batch.
+        currentIdx = (batchHolders.size() - 1) * BATCH_SIZE;
+        freeIndex = currentIdx + 1;
+      }
     } catch (OutOfMemoryException OOME) {
-      retryAfterOOM( currentIdx < batchHolders.size() * BATCH_SIZE );
+      retryAfterOOM( currentIdx < totalIndexSize);
     }
 
     try { // INSERT ENTRY
       BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
-
       bh.insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch);
       numEntries++;
     } catch (OutOfMemoryException OOME) { retryAfterOOM( addedBatch ); }
@@ -684,7 +698,7 @@ public abstract class HashTableTemplate implements HashTable {
     }
     htIdxHolder.value = currentIdx;
     return  addedBatch ? PutStatus.NEW_BATCH_ADDED :
-        ( freeIndex + 1 > totalBatchHoldersSize /* batchHolders.size() * BATCH_SIZE */ ) ?
+        (freeIndex + 1 > currentIndexSize) ?
         PutStatus.KEY_ADDED_LAST : // the last key in the batch
         PutStatus.KEY_ADDED;     // otherwise
   }
@@ -716,20 +730,22 @@ public abstract class HashTableTemplate implements HashTable {
   // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied
   // currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds
   // the capacity, we will add a new BatchHolder. Return true if a new batch was added.
-  private boolean addBatchIfNeeded(int currentIdx) throws SchemaChangeException {
-    // int totalBatchSize = batchHolders.size() * BATCH_SIZE;
-
-    if (currentIdx >= totalBatchHoldersSize) {
-      BatchHolder bh = newBatchHolder(batchHolders.size(), allocationTracker.getNextBatchHolderSize());
+  private boolean addBatchIfNeeded(int currentIdx, int batchRowCount) throws SchemaChangeException {
+     // Add a new batch if this is the first batch or
+     // index is greater than current batch target count i.e. we reached the limit of current batch.
+     if (batchHolders.size() == 0 || (currentIdx >= currentIndexSize)) {
+      final int allocationSize = allocationTracker.getNextBatchHolderSize(batchRowCount);
+      final BatchHolder bh = newBatchHolder(batchHolders.size(), allocationSize);
       batchHolders.add(bh);
+      prevIndexSize = batchHolders.size() > 1 ? (batchHolders.size()-1)*BATCH_SIZE : 0;
+      currentIndexSize = prevIndexSize + batchHolders.get(batchHolders.size()-1).getTargetBatchRowCount();
+      totalIndexSize = batchHolders.size() * BATCH_SIZE;
       bh.setup();
       if (EXTRA_DEBUG) {
         logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
       }
 
-      allocationTracker.commit();
-
-      totalBatchHoldersSize += BATCH_SIZE; // total increased by 1 batch
+      allocationTracker.commit(allocationSize);
       return true;
     }
     return false;
@@ -782,10 +798,12 @@ public abstract class HashTableTemplate implements HashTable {
 
     IntVector newStartIndices = allocMetadataVector(tableSize, EMPTY_SLOT);
 
+    int idx = 0;
     for (int i = 0; i < batchHolders.size(); i++) {
       BatchHolder bh = batchHolders.get(i);
-      int batchStartIdx = i * BATCH_SIZE;
+      int batchStartIdx = idx;
       bh.rehash(tableSize, newStartIndices, batchStartIdx);
+      idx += bh.getTargetBatchRowCount();
     }
 
     startIndices.clear();
@@ -796,8 +814,8 @@ public abstract class HashTableTemplate implements HashTable {
       logger.debug("Number of buckets = {}.", startIndices.getAccessor().getValueCount());
       for (int i = 0; i < startIndices.getAccessor().getValueCount(); i++) {
         logger.debug("Bucket: {}, startIdx[ {} ] = {}.", i, i, startIndices.getAccessor().get(i));
-        int idx = startIndices.getAccessor().get(i);
-        BatchHolder bh = batchHolders.get((idx >>> 16) & BATCH_MASK);
+        int startIdx = startIndices.getAccessor().get(i);
+        BatchHolder bh = batchHolders.get((startIdx >>> 16) & BATCH_MASK);
         bh.dump(idx);
       }
     }
@@ -831,7 +849,9 @@ public abstract class HashTableTemplate implements HashTable {
     freeIndex = 0; // all batch holders are gone
     // reallocate batch holders, and the hash table to the original size
     batchHolders = new ArrayList<BatchHolder>();
-    totalBatchHoldersSize = 0;
+    prevIndexSize = 0;
+    currentIndexSize = 0;
+    totalIndexSize = 0;
     startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
   }
   public void updateIncoming(VectorContainer newIncoming, RecordBatch newIncomingProbe) {
@@ -846,9 +866,9 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   @Override
-  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords) {
+  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int numRecords) {
     assert batchIdx < batchHolders.size();
-    return batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, numRecords, numExpectedRecords);
+    return batchHolders.get(batchIdx).outputKeys(outContainer, numRecords);
   }
 
   private IntVector allocMetadataVector(int size, int initialValue) {
@@ -891,4 +911,14 @@ public abstract class HashTableTemplate implements HashTable {
     return String.format("[numBuckets = %d, numEntries = %d, numBatchHolders = %d, actualSize = %s]",
       numBuckets(), numEntries, batchHolders.size(), HashJoinMemoryCalculator.PartitionStatSet.prettyPrintBytes(getActualSize()));
   }
+
+  @Override
+  public void setTargetBatchRowCount(int batchRowCount) {
+    batchHolders.get(batchHolders.size()-1).targetBatchRowCount = batchRowCount;
+  }
+
+  @Override
+  public int getTargetBatchRowCount() {
+    return batchHolders.get(batchHolders.size()-1).targetBatchRowCount;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index a270ced..79b28db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -94,19 +94,19 @@ public class RecordBatchMemoryManager {
   }
 
   public long getNumIncomingBatches() {
-    return inputBatchStats[DEFAULT_INPUT_INDEX].getNumBatches();
+    return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getNumBatches();
   }
 
   public long getAvgInputBatchSize() {
-    return inputBatchStats[DEFAULT_INPUT_INDEX].getAvgBatchSize();
+    return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getAvgBatchSize();
   }
 
   public long getAvgInputRowWidth() {
-    return inputBatchStats[DEFAULT_INPUT_INDEX].getAvgRowWidth();
+    return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getAvgRowWidth();
   }
 
   public long getTotalInputRecords() {
-    return inputBatchStats[DEFAULT_INPUT_INDEX].getTotalRecords();
+    return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getTotalRecords();
   }
 
   public long getNumIncomingBatches(int index) {
@@ -176,6 +176,22 @@ public class RecordBatchMemoryManager {
     return getOutputRowCount();
   }
 
+  public boolean updateIfNeeded(int newOutgoingRowWidth) {
+    // We do not want to keep adjusting batch holders target row count
+    // for small variations in row width.
+    // If row width changes, calculate actual adjusted row count i.e. row count
+    // rounded down to nearest power of two and do nothing if that does not change.
+    if (newOutgoingRowWidth == outgoingRowWidth ||
+      computeOutputRowCount(outputBatchSize, newOutgoingRowWidth) == computeOutputRowCount(outputBatchSize, outgoingRowWidth)) {
+      return false;
+    }
+
+    // Set number of rows in outgoing batch. This number will be used for new batch creation.
+    setOutputRowCount(outputBatchSize, newOutgoingRowWidth);
+    setOutgoingRowWidth(newOutgoingRowWidth);
+    return true;
+  }
+
   public int getOutputRowCount() {
     return outputRowCount;
   }
@@ -201,7 +217,7 @@ public class RecordBatchMemoryManager {
     return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS)));
   }
 
-  public static int computeRowCount(int batchSize, int rowWidth) {
+  public static int computeOutputRowCount(int batchSize, int rowWidth) {
     return adjustOutputRowCount(RecordBatchSizer.safeDivide(batchSize, rowWidth));
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java
index 131d82f..0140943 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashTableAllocationTrackerTest.java
@@ -21,40 +21,42 @@ import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE;
+
 public class HashTableAllocationTrackerTest {
   @Test
   public void testDoubleGetNextCall() {
     final HashTableConfig config = new HashTableConfig(100, true, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
-    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config);
 
     for (int counter = 0; counter < 100; counter++) {
-      Assert.assertEquals(30, tracker.getNextBatchHolderSize());
+      Assert.assertEquals(100, tracker.getNextBatchHolderSize(BATCH_SIZE));
     }
   }
 
   @Test(expected = IllegalStateException.class)
   public void testPrematureCommit() {
     final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
-    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config);
 
-    tracker.commit();
+    tracker.commit(30);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testDoubleCommit() {
     final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
-    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config);
 
-    tracker.commit();
-    tracker.commit();
+    tracker.commit(30);
+    tracker.commit(30);
   }
 
   @Test
   public void testOverAsking() {
     final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
-    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config);
 
-    tracker.getNextBatchHolderSize();
+    tracker.getNextBatchHolderSize(30);
   }
 
   /**
@@ -63,11 +65,11 @@ public class HashTableAllocationTrackerTest {
   @Test
   public void testLifecycle1() {
     final HashTableConfig config = new HashTableConfig(100, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
-    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config);
 
     for (int counter = 0; counter < 100; counter++) {
-      Assert.assertEquals(30, tracker.getNextBatchHolderSize());
-      tracker.commit();
+      Assert.assertEquals(30, tracker.getNextBatchHolderSize(30));
+      tracker.commit(30);
     }
   }
 
@@ -77,21 +79,21 @@ public class HashTableAllocationTrackerTest {
   @Test
   public void testLifecycle() {
     final HashTableConfig config = new HashTableConfig(100, true, .5f, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
-    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config, 30);
+    final HashTableAllocationTracker tracker = new HashTableAllocationTracker(config);
 
-    Assert.assertEquals(30, tracker.getNextBatchHolderSize());
-    tracker.commit();
-    Assert.assertEquals(30, tracker.getNextBatchHolderSize());
-    tracker.commit();
-    Assert.assertEquals(30, tracker.getNextBatchHolderSize());
-    tracker.commit();
-    Assert.assertEquals(10, tracker.getNextBatchHolderSize());
-    tracker.commit();
+    Assert.assertEquals(30, tracker.getNextBatchHolderSize(30));
+    tracker.commit(30);
+    Assert.assertEquals(30, tracker.getNextBatchHolderSize(30));
+    tracker.commit(30);
+    Assert.assertEquals(30, tracker.getNextBatchHolderSize(30));
+    tracker.commit(30);
+    Assert.assertEquals(10, tracker.getNextBatchHolderSize(30));
+    tracker.commit(30);
 
     boolean caughtException = false;
 
     try {
-      tracker.getNextBatchHolderSize();
+      tracker.getNextBatchHolderSize(BATCH_SIZE);
     } catch (IllegalStateException ex) {
       caughtException = true;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index fd0b494..471f1b8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -26,11 +26,13 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -2089,6 +2091,203 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
   }
 
   @Test
+  public void testSimpleHashAgg() {
+    HashAggregate aggConf = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+    List<String> inputJsonBatches = Lists.newArrayList(
+       "[{\"a\": 5, \"b\" : 1 }]",
+         "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+
+    opTestBuilder()
+      .physicalOperator(aggConf)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("b_sum", "a")
+      .baselineValues(6l, 5l)
+      .baselineValues(8l, 3l)
+      .go();
+  }
+
+  @Test
+  public void testHashAggSum() throws ExecutionSetupException {
+    HashAggregate hashAgg = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+
+    // create input rows like this.
+    // "a" : 1, "b" : 1
+    // "a" : 1, "b" : 1
+    // "a" : 1, "b" : 1
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+        batchString.append("{\"a\": " + i + ", \"b\": " + i + "},");
+        batchString.append("{\"a\": " + i + ", \"b\": " + i + "},");
+        batchString.append("{\"a\": " + i + ", \"b\": " + i + "},");
+    }
+    batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}," );
+    batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}," );
+    batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}" );
+
+    batchString.append("]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of hash agg for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 1, "b" : 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": " + i + ", \"b\": " + (3*i) + "},");
+    }
+    expectedBatchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}" );
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately get 2 batches and max of 4.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashAgg)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b_sum")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize/2); // verify batch size.
+
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues((long)i, (long)3*i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testHashAggAvg() throws ExecutionSetupException {
+    HashAggregate hashAgg = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("avg(b)", "b_avg"), 1.0f);
+
+    // create input rows like this.
+    // "a" : 1, "b" : 1
+    // "a" : 1, "b" : 1
+    // "a" : 1, "b" : 1
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": " + i + ", \"b\": " + i + "},");
+      batchString.append("{\"a\": " + i + ", \"b\": " + i + "},");
+      batchString.append("{\"a\": " + i + ", \"b\": " + i + "},");
+    }
+    batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}," );
+    batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}," );
+    batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}" );
+
+    batchString.append("]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of hash agg for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 1, "b" : 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": " + i + ", \"b\": " + (3*i) + "},");
+    }
+    expectedBatchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}" );
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately get 2 batches and max of 4.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashAgg)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b_avg")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize/2); // verify batch size.
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues((long)i, (double)i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testHashAggMax() throws ExecutionSetupException {
+    HashAggregate hashAgg = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("max(b)", "b_max"), 1.0f);
+
+    // create input rows like this.
+    // "a" : 1, "b" : "a"
+    // "a" : 2, "b" : "aa"
+    // "a" : 3, "b" : "aaa"
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": " + i + ", \"b\": " + "\"a\"" + "},");
+      batchString.append("{\"a\": " + i + ", \"b\": " + "\"aa\"" + "},");
+      batchString.append("{\"a\": " + i + ", \"b\": " + "\"aaa\"" + "},");
+    }
+    batchString.append("{\"a\": " + numRows + ", \"b\": " + "\"a\"" + "}," );
+    batchString.append("{\"a\": " + numRows + ", \"b\": " + "\"aa\"" + "}," );
+    batchString.append("{\"a\": " + numRows + ", \"b\": " + "\"aaa\"" + "}" );
+
+    batchString.append("]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of hash agg for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 1, "b" : "aaa"
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": " + i + ", \"b\": " + "\"aaa\"" + "},");
+    }
+    expectedBatchString.append("{\"a\": " + numRows + ", \"b\": " + "\"aaa\"" + "}" );
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately get 2 batches and max of 4.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(hashAgg)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b_max")
+      .expectedNumBatches(2)  // verify number of batches
+      .expectedBatchSize(totalSize); // verify batch size.
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues((long)i, "aaa");
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
   public void testSizerRepeatedList() throws Exception {
     List<String> inputJsonBatches = Lists.newArrayList();
     StringBuilder batchString = new StringBuilder();