You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 00:19:13 UTC

[2/2] drill git commit: DRILL-5457: Spill implementation for Hash Aggregate

DRILL-5457: Spill implementation for Hash Aggregate

closes #822


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c16e5f80
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c16e5f80
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c16e5f80

Branch: refs/heads/master
Commit: c16e5f8072f3e5d18157767143f9ccc7669c4380
Parents: be43a9e
Author: Boaz Ben-Zvi <bo...@BBenZvi-E754-MBP13.local>
Authored: Mon Jun 19 19:04:30 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Tue Jun 20 17:01:01 2017 -0700

----------------------------------------------------------------------
 .../src/resources/drill-override-example.conf   |   22 +
 .../org/apache/drill/exec/ExecConstants.java    |   22 +
 .../cache/VectorAccessibleSerializable.java     |   56 +
 .../drill/exec/physical/base/AbstractBase.java  |   28 +-
 .../exec/physical/base/PhysicalOperator.java    |   15 +
 .../exec/physical/config/ExternalSort.java      |   17 +-
 .../exec/physical/config/HashAggregate.java     |   25 +-
 .../physical/impl/aggregate/HashAggBatch.java   |   46 +-
 .../impl/aggregate/HashAggTemplate.java         | 1113 +++++++++++++++---
 .../physical/impl/aggregate/HashAggregator.java |   19 +-
 .../impl/aggregate/SpilledRecordbatch.java      |  175 +++
 .../physical/impl/common/ChainedHashTable.java  |   10 +-
 .../exec/physical/impl/common/HashTable.java    |   26 +-
 .../physical/impl/common/HashTableStats.java    |    7 +
 .../physical/impl/common/HashTableTemplate.java |  255 ++--
 .../exec/physical/impl/join/HashJoinBatch.java  |    5 +-
 .../physical/impl/spill/RecordBatchSizer.java   |   78 +-
 .../exec/physical/impl/spill/SpillSet.java      |   59 +-
 .../impl/xsort/managed/ExternalSortBatch.java   |    6 +-
 .../exec/planner/physical/AggPrelBase.java      |    2 +-
 .../exec/planner/physical/AggPruleBase.java     |    3 +
 .../exec/planner/physical/HashAggPrel.java      |    2 +-
 .../exec/planner/physical/PlannerSettings.java  |    5 +
 .../apache/drill/exec/record/RecordBatch.java   |    2 +-
 .../server/options/SystemOptionManager.java     |    4 +
 .../exec/util/MemoryAllocationUtilities.java    |   21 +-
 .../apache/drill/exec/work/foreman/Foreman.java |    2 +-
 .../drill/exec/work/user/PlanSplitter.java      |    2 +-
 .../src/main/resources/drill-module.conf        |   35 +-
 .../java/org/apache/drill/TestBugFixes.java     |    5 +-
 .../drill/TestTpchDistributedConcurrent.java    |    2 +-
 .../physical/impl/agg/TestHashAggrSpill.java    |  141 +++
 .../physical/unit/BasicPhysicalOpUnitTest.java  |    3 +-
 exec/jdbc/pom.xml                               |    1 +
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |    2 +-
 .../templates/VariableLengthVectors.java        |    1 +
 36 files changed, 1800 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index b9d09a8..8010f85 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -142,6 +142,13 @@ drill.exec: {
     }
   },
   cache.hazel.subnets: ["*.*.*.*"],
+  spill: {
+     # These options are common to all spilling operators.
+     # They can be overriden, per operator (but this is just for
+     # backward compatibility, and may be deprecated in the future)
+     directories : [ "/tmp/drill/spill" ],
+     fs : "file:///"
+  }
   sort: {
     purge.threshold : 100,
     external: {
@@ -150,11 +157,26 @@ drill.exec: {
         batch.size : 4000,
         group.size : 100,
         threshold : 200,
+        # The 2 options below override the common ones
+        # they should be deprecated in the future
         directories : [ "/tmp/drill/spill" ],
         fs : "file:///"
       }
     }
   },
+  hashagg: {
+    # The partitions divide the work inside the hashagg, to ease
+    # handling spilling. This initial figure is tuned down when
+    # memory is limited.
+    #  Setting this option to 1 disables spilling !
+    num_partitions: 32,
+    spill: {
+        # The 2 options below override the common ones
+        # they should be deprecated in the future
+        directories : [ "/tmp/drill/spill" ],
+        fs : "file:///"
+    }
+  },
   memory: {
     top.max: 1000000000000,
     operator: {

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 18f69d5..537377d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -64,6 +64,12 @@ public interface ExecConstants {
   String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
   String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";
 
+  // Spill boot-time Options common to all spilling operators
+  // (Each individual operator may override the common options)
+
+  String SPILL_FILESYSTEM = "drill.exec.spill.fs";
+  String SPILL_DIRS = "drill.exec.spill.directories";
+
   // External Sort Boot configuration
 
   String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
@@ -86,6 +92,22 @@ public interface ExecConstants {
 
   BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed", false);
 
+  // Hash Aggregate Options
+
+  String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
+  String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
+  LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128, 32); // 1 means - no spilling
+  String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
+  String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
+  LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE, 0);
+  // min batches is used for tuning (each partition needs so many batches when planning the number of partitions,
+  // or reserve this number when calculating whether the remaining available memory is too small and requires a spill.)
+  // Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
+  String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition";
+  String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
+  LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5, 3);
+  String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
+  String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs";
 
   String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
   String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 9d0182f..d569ae5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -35,6 +35,8 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.codahale.metrics.MetricRegistry;
@@ -138,6 +140,60 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
     va = container;
   }
 
+  // Like above, only preserve the original container and list of value-vectors
+  public void readFromStreamWithContainer(VectorContainer myContainer, InputStream input) throws IOException {
+    final VectorContainer container = new VectorContainer();
+    final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+    recordCount = batchDef.getRecordCount();
+    if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {
+
+      if (sv2 == null) {
+        sv2 = new SelectionVector2(allocator);
+      }
+      sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
+      sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
+      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    }
+    final List<ValueVector> vectorList = Lists.newArrayList();
+    final List<SerializedField> fieldList = batchDef.getFieldList();
+    for (SerializedField metaData : fieldList) {
+      final int dataLength = metaData.getBufferLength();
+      final MaterializedField field = MaterializedField.create(metaData);
+      final DrillBuf buf = allocator.buffer(dataLength);
+      final ValueVector vector;
+      try {
+        buf.writeBytes(input, dataLength);
+        vector = TypeHelper.getNewVector(field, allocator);
+        vector.load(metaData, buf);
+      } finally {
+        buf.release();
+      }
+      vectorList.add(vector);
+    }
+    container.addCollection(vectorList);
+    container.setRecordCount(recordCount);
+    myContainer.transferIn(container); // transfer the vectors
+    myContainer.buildSchema(svMode);
+    myContainer.setRecordCount(recordCount);
+    /*
+    // for debugging -- show values from the first row
+    Object tmp0 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
+    Object tmp1 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
+    Object tmp2 = (myContainer).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
+    if (tmp0 != null && tmp1 != null && tmp2 != null) {
+      NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
+      NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
+      NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2);
+
+      try {
+        logger.info("HASH AGG: Got a row = {} , {} , {}", vv0.getAccessor().get(0), vv1.getAccessor().get(0), vv2.getAccessor().get(0));
+      } catch (Exception e) { logger.info("HASH AGG: Got an exception = {}",e); }
+    }
+    else { logger.info("HASH AGG: got nulls !!!"); }
+    */
+    va = myContainer;
+  }
+
   public void writeToStreamAndRetain(OutputStream output) throws IOException {
     retain = true;
     writeToStream(output);

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index a547e26..6f42250 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.base;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.graph.GraphVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
@@ -102,17 +104,31 @@ public abstract class AbstractBase implements PhysicalOperator{
     this.cost = cost;
   }
 
-  // Not available. Presumably because Drill does not currently use
-  // this value, though it does appear in some test physical plans.
-//  public void setMaxAllocation(long alloc) {
-//    maxAllocation = alloc;
-//  }
-
   @Override
   public long getMaxAllocation() {
     return maxAllocation;
   }
 
+  /**
+   * Any operator that supports spilling should override this method
+   * @param maxAllocation The max memory allocation to be set
+   */
+  @Override
+  public void setMaxAllocation(long maxAllocation) {
+    this.maxAllocation = maxAllocation;
+    /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/
+  }
+
+  /**
+   * Any operator that supports spilling should override this method (and return true)
+   * @return false
+   */
+  @Override @JsonIgnore
+  public boolean isBufferedOperator() { return false; }
+
+  // @Override
+  // public void setBufferedOperator(boolean bo) {}
+
   @Override
   public String getUserName() {
     return userName;

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index b1954ca..980f32c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -83,6 +83,21 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
    */
   public long getMaxAllocation();
 
+  /**
+   *
+   * @param maxAllocation The max memory allocation to be set
+   */
+  public void setMaxAllocation(long maxAllocation);
+
+  /**
+   *
+   * @return True iff this operator manages its memory (including disk spilling)
+   */
+  @JsonIgnore
+  public boolean isBufferedOperator();
+
+  // public void setBufferedOperator(boolean bo);
+
   @JsonProperty("@id")
   public int getOperatorId();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index 17848d0..cb9679d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -49,12 +49,19 @@ public class ExternalSort extends Sort {
     return CoreOperatorType.EXTERNAL_SORT_VALUE;
   }
 
-  // Set here, rather than the base class, because this is the only
-  // operator, at present, that makes use of the maximum allocation.
-  // Remove this, in favor of the base class version, when Drill
-  // sets the memory allocation for all operators.
-
+  /**
+   *
+   * @param maxAllocation The max memory allocation to be set
+   */
+  @Override
   public void setMaxAllocation(long maxAllocation) {
     this.maxAllocation = maxAllocation;
   }
+
+  /**
+   * The External Sort operator supports spilling
+   * @return true
+   */
+  @Override
+  public boolean isBufferedOperator() { return true; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index 4dafbe8..0614dc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -34,6 +35,7 @@ public class HashAggregate extends AbstractSingle {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregate.class);
 
+  private final AggPrelBase.OperatorPhase aggPhase;
   private final List<NamedExpression> groupByExprs;
   private final List<NamedExpression> aggrExprs;
 
@@ -41,15 +43,19 @@ public class HashAggregate extends AbstractSingle {
 
   @JsonCreator
   public HashAggregate(@JsonProperty("child") PhysicalOperator child,
+                       @JsonProperty("phase") AggPrelBase.OperatorPhase aggPhase,
                        @JsonProperty("keys") List<NamedExpression> groupByExprs,
                        @JsonProperty("exprs") List<NamedExpression> aggrExprs,
                        @JsonProperty("cardinality") float cardinality) {
     super(child);
+    this.aggPhase = aggPhase;
     this.groupByExprs = groupByExprs;
     this.aggrExprs = aggrExprs;
     this.cardinality = cardinality;
   }
 
+  public AggPrelBase.OperatorPhase getAggPhase() { return aggPhase; }
+
   public List<NamedExpression> getGroupByExprs() {
     return groupByExprs;
   }
@@ -69,7 +75,9 @@ public class HashAggregate extends AbstractSingle {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new HashAggregate(child, groupByExprs, aggrExprs, cardinality);
+    HashAggregate newHAG = new HashAggregate(child, aggPhase, groupByExprs, aggrExprs, cardinality);
+    newHAG.setMaxAllocation(getMaxAllocation());
+    return newHAG;
   }
 
   @Override
@@ -77,5 +85,18 @@ public class HashAggregate extends AbstractSingle {
     return CoreOperatorType.HASH_AGGREGATE_VALUE;
   }
 
-
+  /**
+   *
+   * @param maxAllocation The max memory allocation to be set
+   */
+  @Override
+  public void setMaxAllocation(long maxAllocation) {
+    this.maxAllocation = maxAllocation;
+  }
+  /**
+   * The Hash Aggregate operator supports spilling
+   * @return true
+   */
+  @Override
+  public boolean isBufferedOperator() { return true; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index dc913b1..97e0599 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -55,7 +56,6 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
-import com.google.common.collect.Lists;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JVar;
 
@@ -63,12 +63,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatch.class);
 
   private HashAggregator aggregator;
-  private final RecordBatch incoming;
+  private RecordBatch incoming;
   private LogicalExpression[] aggrExprs;
   private TypedFieldId[] groupByOutFieldIds;
   private TypedFieldId[] aggrOutFieldIds;      // field ids for the outgoing batch
   private final List<Comparator> comparators;
   private BatchSchema incomingSchema;
+  private boolean wasKilled;
 
   private final GeneratorMapping UPDATE_AGGR_INSIDE =
       GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
@@ -87,6 +88,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
     super(popConfig, context);
     this.incoming = incoming;
+    wasKilled = false;
 
     final int numGrpByExprs = popConfig.getGroupByExprs().size();
     comparators = Lists.newArrayListWithExpectedSize(numGrpByExprs);
@@ -136,15 +138,36 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       return IterOutcome.NONE;
     }
 
-    if (aggregator.buildComplete() && !aggregator.allFlushed()) {
-      // aggregation is complete and not all records have been output yet
-      return aggregator.outputCurrentBatch();
+    // if aggregation is complete and not all records have been output yet
+    if (aggregator.buildComplete() ||
+        // or: 1st phase need to return (not fully grouped) partial output due to memory pressure
+        aggregator.earlyOutput()) {
+      // then output the next batch downstream
+      HashAggregator.AggIterOutcome aggOut = aggregator.outputCurrentBatch();
+      // if Batch returned, or end of data - then return the appropriate iter outcome
+      if ( aggOut == HashAggregator.AggIterOutcome.AGG_NONE ) { return IterOutcome.NONE; }
+      if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; }
+      // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming
+      incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed
+    }
+
+    if (wasKilled) { // if kill() was called before, then finish up
+      aggregator.cleanup();
+      incoming.kill(false);
+      return IterOutcome.NONE;
     }
 
-    logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
+    // Read and aggregate records
+    // ( may need to run again if the spilled partition that was read
+    //   generated new partitions that were all spilled )
+    AggOutcome out;
+    do {
+      //
+      //  Read incoming batches and process their records
+      //
+      out = aggregator.doWork();
+    } while (out == AggOutcome.CALL_WORK_AGAIN);
 
-    AggOutcome out = aggregator.doWork();
-    logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
     switch (out) {
     case CLEANUP_AND_RETURN:
       container.zeroVectors();
@@ -153,6 +176,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       // fall through
     case RETURN_OUTCOME:
       return aggregator.getOutcome();
+
     case UPDATE_AGGREGATOR:
       context.fail(UserException.unsupportedError()
           .message(SchemaChangeException.schemaChanged(
@@ -175,7 +199,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
    * @return true if the aggregator was setup successfully. false if there was a failure.
    */
   private boolean createAggregator() {
-    logger.debug("Creating new aggregator.");
     try {
       stats.startSetup();
       this.aggregator = createAggregatorInternal();
@@ -198,7 +221,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
     top.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-//    top.saveCodeForDebugging(true);
+    // top.saveCodeForDebugging(true);
 
     container.clear();
 
@@ -266,7 +289,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
             HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);
 
     agg.setup(popConfig, htConfig, context, this.stats,
-        oContext.getAllocator(), incoming, this,
+        oContext, incoming, this,
         aggrExprs,
         cgInner.getWorkspaceTypes(),
         groupByOutFieldIds,
@@ -314,6 +337,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   @Override
   protected void killIncoming(boolean sendUpstream) {
+    wasKilled = true;
     incoming.kill(sendUpstream);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 1615200..38f0222 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -18,82 +18,155 @@
 package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import javax.inject.Named;
 
+import com.google.common.base.Stopwatch;
+
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
+
+import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.IndexPointer;
+
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
+
+import org.apache.drill.exec.proto.UserBitShared;
+
 import org.apache.drill.exec.record.MaterializedField;
+
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.BatchSchema;
+
 import org.apache.drill.exec.record.VectorContainer;
+
+import org.apache.drill.exec.record.TypedFieldId;
+
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+
 import org.apache.drill.exec.vector.AllocationHelper;
+
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ObjectVector;
 import org.apache.drill.exec.vector.ValueVector;
+
 import org.apache.drill.exec.vector.VariableWidthVector;
 
+import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE;
+
 public abstract class HashAggTemplate implements HashAggregator {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
+  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
 
-//  private static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
-//  private static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
-  private static final int VARIABLE_WIDTH_VALUE_SIZE = 50;
+  private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50;
+  private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
 
   private static final boolean EXTRA_DEBUG_1 = false;
   private static final boolean EXTRA_DEBUG_2 = false;
-//  private static final String TOO_BIG_ERROR =
-//      "Couldn't add value to an empty batch.  This likely means that a single value is too long for a varlen field.";
-//  private boolean newSchema = false;
+  private static final boolean EXTRA_DEBUG_SPILL = false;
+
+  // Fields needed for partitioning (the groups into partitions)
+  private int numPartitions = 0; // must be 2 to the power of bitsInMask (set in setup())
+  private int partitionMask; // numPartitions - 1
+  private int bitsInMask; // number of bits in the MASK
+  private int nextPartitionToReturn = 0; // which partition to return the next batch from
+  // The following members are used for logging, metrics, etc.
+  private int rowsInPartition = 0; // counts #rows in each partition
+  private int rowsNotSpilled = 0;
+  private int rowsSpilled = 0;
+  private int rowsSpilledReturned = 0;
+  private int rowsReturnedEarly = 0;
+
+  private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
+  private boolean is2ndPhase = false;
+  private boolean canSpill = true; // make it false in case can not spill
+  private ChainedHashTable baseHashTable;
+  private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
+  private int earlyPartition = 0; // which partition to return early
+
+  private long memoryLimit; // max memory to be used by this oerator
+  private long estMaxBatchSize = 0; // used for adjusting #partitions
+  private long estRowWidth = 0;
+  private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
+  private long minBatchesPerPartition; // for tuning - num partitions and spill decision
+  private long plannedBatches = 0; // account for planned, but not yet allocated batches
+
   private int underlyingIndex = 0;
   private int currentIndex = 0;
   private IterOutcome outcome;
-//  private int outputCount = 0;
   private int numGroupedRecords = 0;
-  private int outBatchIndex = 0;
+  private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount()
+
   private int lastBatchOutputCount = 0;
   private RecordBatch incoming;
-//  private BatchSchema schema;
+  private BatchSchema schema;
   private HashAggBatch outgoing;
   private VectorContainer outContainer;
-//  private FragmentContext context;
+
+  private FragmentContext context;
+  private OperatorContext oContext;
   private BufferAllocator allocator;
 
-//  private HashAggregate hashAggrConfig;
-  private HashTable htable;
-  private ArrayList<BatchHolder> batchHolders;
+  private HashTable htables[];
+  private ArrayList<BatchHolder> batchHolders[];
+  private int outBatchIndex[];
+
+  // For handling spilling
+  private SpillSet spillSet;
+  SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming"
+  private OutputStream outputStream[]; // an output stream for each spilled partition
+  private int spilledBatchesCount[]; // count number of batches spilled, in each partition
+  private String spillFiles[];
+  private int cycleNum = 0; // primary, secondary, tertiary, etc.
+  private int originalPartition = -1; // the partition a secondary reads from
+
+  private static class SpilledPartition { public int spilledBatches; public String spillFile; int cycleNum; int origPartn; int prevOrigPartn; }
+
+  private ArrayList<SpilledPartition> spilledPartitionsList;
+  private int operatorId; // for the spill file name
+
   private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put()
   private IndexPointer outStartIdxHolder;
   private IndexPointer outNumRecordsHolder;
   private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields
-
-  ErrorCollector collector = new ErrorCollectorImpl();
+  private TypedFieldId[] groupByOutFieldIds;
 
   private MaterializedField[] materializedValueFields;
   private boolean allFlushed = false;
   private boolean buildComplete = false;
+  private boolean handlingSpills = false; // True once starting to process spill files
 
   private OperatorStats stats = null;
   private HashTableStats htStats = new HashTableStats();
@@ -103,7 +176,15 @@ public abstract class HashAggTemplate implements HashAggregator {
     NUM_BUCKETS,
     NUM_ENTRIES,
     NUM_RESIZING,
-    RESIZING_TIME;
+    RESIZING_TIME,
+    NUM_PARTITIONS,
+    SPILLED_PARTITIONS, // number of partitions spilled to disk
+    SPILL_MB,         // Number of MB of data spilled to disk. This amount is first written,
+                      // then later re-read. So, disk I/O is twice this amount.
+                      // For first phase aggr -- this is an estimate of the amount of data
+                      // returned early (analogous to a spill in the 2nd phase).
+    SPILL_CYCLE       // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+    ;
 
     // duplicate for hash ag
 
@@ -121,7 +202,6 @@ public abstract class HashAggTemplate implements HashAggregator {
     private int batchOutputCount = 0;
 
     private int capacity = Integer.MAX_VALUE;
-    private boolean allocatedNextBatch = false;
 
     @SuppressWarnings("resource")
     public BatchHolder() {
@@ -145,8 +225,8 @@ public abstract class HashAggTemplate implements HashAggregator {
           if (vector instanceof FixedWidthVector) {
             ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE);
           } else if (vector instanceof VariableWidthVector) {
-            ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE,
-                HashTable.BATCH_SIZE);
+            // This case is never used .... a varchar falls under ObjectVector which is allocated on the heap !
+            ((VariableWidthVector) vector).allocateNew(maxColumnWidth, HashTable.BATCH_SIZE);
           } else if (vector instanceof ObjectVector) {
             ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE);
           } else {
@@ -166,20 +246,23 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) {
-      updateAggrValuesInternal(incomingRowIdx, idxWithinBatch);
+      try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); }
+      catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc); }
       maxOccupiedIdx = Math.max(maxOccupiedIdx, idxWithinBatch);
       return true;
     }
 
     private void setup() {
-      setupInterior(incoming, outgoing, aggrValuesContainer);
+      try { setupInterior(incoming, outgoing, aggrValuesContainer); }
+      catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
     }
 
     private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) {
       outStartIdxHolder.value = batchOutputCount;
       outNumRecordsHolder.value = 0;
       for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
-        outputRecordValues(i, batchOutputCount);
+        try { outputRecordValues(i, batchOutputCount); }
+        catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
         if (EXTRA_DEBUG_2) {
           logger.debug("Outputting values to output index: {}", batchOutputCount);
         }
@@ -204,24 +287,23 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     @RuntimeOverridden
     public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing,
-        @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) {
+        @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException {
     }
 
     @RuntimeOverridden
-    public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+    public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{
     }
 
     @RuntimeOverridden
-    public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
+    public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{
     }
   }
 
-
   @Override
   public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
-      OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
-      LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
-      VectorContainer outContainer) throws SchemaChangeException, ClassTransformationException, IOException {
+                    OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+                    LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
+                    VectorContainer outContainer) throws SchemaChangeException, IOException {
 
     if (valueExprs == null || valueFieldIds == null) {
       throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
@@ -230,15 +312,34 @@ public abstract class HashAggTemplate implements HashAggregator {
       throw new IllegalArgumentException("Wrong number of workspace variables.");
     }
 
-//    this.context = context;
+    this.context = context;
     this.stats = stats;
-    this.allocator = allocator;
+    this.allocator = oContext.getAllocator();
+    this.oContext = oContext;
     this.incoming = incoming;
-//    this.schema = incoming.getSchema();
     this.outgoing = outgoing;
     this.outContainer = outContainer;
+    this.operatorId = hashAggrConfig.getOperatorId();
+
+    is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
+    isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1;
+    canSpill = isTwoPhase; // single phase can not spill
+
+    // Typically for testing - force a spill after a partition has more than so many batches
+    minBatchesPerPartition = context.getConfig().getLong(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION);
+
+    // Set the memory limit
+    memoryLimit = allocator.getLimit();
+    // Optional configured memory limit, typically used only for testing.
+    long configLimit = context.getConfig().getLong(ExecConstants.HASHAGG_MAX_MEMORY);
+    if (configLimit > 0) {
+      logger.warn("Memory limit was changed to {}",configLimit);
+      memoryLimit = Math.min(memoryLimit, configLimit);
+      allocator.setLimit(memoryLimit); // enforce at the allocator
+    }
 
-//    this.hashAggrConfig = hashAggrConfig;
+    // All the settings that require the number of partitions were moved into delayedSetup()
+    // which would be called later, after the actuall data first arrives
 
     // currently, hash aggregation is only applicable if there are group-by expressions.
     // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no
@@ -266,112 +367,278 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
     }
 
-    ChainedHashTable ht =
+    spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+    baseHashTable =
         new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing);
-    this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+    this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill)
     numGroupByOutFields = groupByOutFieldIds.length;
-    batchHolders = new ArrayList<BatchHolder>();
-    // First BatchHolder is created when the first put request is received.
 
     doSetup(incoming);
   }
 
+  /**
+   *  Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming
+   *  This data is used to compute the number of partitions.
+   */
+  private void delayedSetup() {
+
+    // Set the number of partitions from the configuration (raise to a power of two, if needed)
+    numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
+    if ( numPartitions == 1 ) {
+      canSpill = false;
+      logger.warn("Spilling was disabled due to configuration setting of num_partitions to 1");
+    }
+    numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
+
+    if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
+    else {
+      // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
+      updateEstMaxBatchSize(incoming);
+    }
+    long memAvail = memoryLimit - allocator.getAllocatedMemory();
+    if ( !canSpill ) { // single phase, or spill disabled by configuation
+      numPartitions = 1; // single phase should use only a single partition (to save memory)
+    } else { // two phase
+      // Adjust down the number of partitions if needed - when the memory available can not hold as
+      // many batches (configurable option), plus overhead (e.g. hash table, links, hash values))
+      while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) {
+        numPartitions /= 2;
+        if ( numPartitions < 2) {
+          if ( is2ndPhase ) {
+            canSpill = false;  // 2nd phase needs at least 2 to make progress
+            logger.warn("Spilling was disabled - not enough memory available for internal partitioning");
+          }
+          break;
+        }
+      }
+    }
+    logger.debug("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+        numPartitions, canSpill ? "Can" : "Cannot");
+
+    // The following initial safety check should be revisited once we can lower the number of rows in a batch
+    // In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table)
+    if ( numPartitions == 1 ) {
+      // if too little memory - behave like the old code -- no memory limit for hash aggregate
+      allocator.setLimit(AbstractBase.MAX_ALLOCATION);  // 10_000_000_000L
+    }
+    // Based on the number of partitions: Set the mask and bit count
+    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+    // Create arrays (one entry per partition)
+    htables = new HashTable[numPartitions] ;
+    batchHolders = (ArrayList<BatchHolder>[]) new ArrayList<?>[numPartitions] ;
+    outBatchIndex = new int[numPartitions] ;
+    outputStream = new OutputStream[numPartitions];
+    spilledBatchesCount = new int[numPartitions];
+    spillFiles = new String[numPartitions];
+    spilledPartitionsList = new ArrayList<SpilledPartition>();
+
+    plannedBatches = numPartitions; // each partition should allocate its first batch
+
+    // initialize every (per partition) entry in the arrays
+    for (int i = 0; i < numPartitions; i++ ) {
+      try {
+        this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+        this.htables[i].setMaxVarcharSize(maxColumnWidth);
+      } catch (ClassTransformationException e) {
+        throw UserException.unsupportedError(e)
+            .message("Code generation error - likely an error in the code.")
+            .build(logger);
+      } catch (IOException e) {
+        throw UserException.resourceError(e)
+            .message("IO Error while creating a hash table.")
+            .build(logger);
+      } catch (SchemaChangeException sce) {
+        throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
+      }
+      this.batchHolders[i] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
+    }
+  }
+  /**
+   * get new incoming: (when reading spilled files like an "incoming")
+   * @return The (newly replaced) incoming
+   */
+  @Override
+  public RecordBatch getNewIncoming() { return newIncoming; }
+
+  private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, IOException {
+    baseHashTable.updateIncoming(newIncoming); // after a spill - a new incoming
+    this.incoming = newIncoming;
+    currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file
+    nextPartitionToReturn = 0;
+    for (int i = 0; i < numPartitions; i++ ) {
+      htables[i].reinit(newIncoming);
+      if ( batchHolders[i] != null) {
+        for (BatchHolder bh : batchHolders[i]) {
+          bh.clear();
+        }
+        batchHolders[i].clear();
+        batchHolders[i] = new ArrayList<BatchHolder>();
+      }
+      outBatchIndex[i] = 0;
+      outputStream[i] = null;
+      spilledBatchesCount[i] = 0;
+      spillFiles[i] = null;
+    }
+  }
+
+  /**
+   *  Update the estimated max batch size to be used in the Hash Aggr Op.
+   *  using the record batch size to get the row width.
+   * @param incoming
+   */
+  private void updateEstMaxBatchSize(RecordBatch incoming) {
+    if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or varchar) change
+    RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+    logger.trace("Incoming sizer: {}",sizer);
+    // An empty batch only has the schema, can not tell actual length of varchars
+    // else use the actual varchars length, each capped at 50 (to match the space allocation)
+    estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
+    estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+
+    // Get approx max (varchar) column width to get better memory allocation
+    maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
+    maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
+
+    logger.trace("{} phase. Estimated row width: {}  batch size: {}  memory limit: {}  max column width: {}",
+        isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+
+    if ( estMaxBatchSize > memoryLimit ) {
+      logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,memoryLimit);
+    }
+  }
+
+  /**
+   *  Read and process (i.e., insert into the hash table and aggregate) records from the current batch.
+   *  Once complete, get the incoming NEXT batch and process it as well, etc.
+   *  For 1st phase, may return when an early output needs to be performed.
+   *
+   * @return Agg outcome status
+   */
   @Override
   public AggOutcome doWork() {
-    try {
-      // Note: Keeping the outer and inner try blocks here to maintain some similarity with
-      // StreamingAggregate which does somethings conditionally in the outer try block.
-      // In the future HashAggregate may also need to perform some actions conditionally
-      // in the outer try block.
-
-      outside:
-      while (true) {
-        // loop through existing records, aggregating the values as necessary.
-        if (EXTRA_DEBUG_1) {
-          logger.debug("Starting outer loop of doWork()...");
+
+    while (true) {
+
+      // This would be called only once - first time actual data arrives on incoming
+      if ( schema == null && incoming.getRecordCount() > 0 ) {
+        this.schema = incoming.getSchema();
+        currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch
+        // Calculate the number of partitions based on actual incoming data
+        delayedSetup();
+      }
+
+      //
+      //  loop through existing records in this batch, aggregating the values as necessary.
+      //
+      if (EXTRA_DEBUG_1) {
+        logger.debug("Starting outer loop of doWork()...");
+      }
+      for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+        if (EXTRA_DEBUG_2) {
+          logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
         }
-        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if (EXTRA_DEBUG_2) {
-            logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
-          }
-          checkGroupAndAggrValues(currentIndex);
+        checkGroupAndAggrValues(currentIndex);
+        // If adding a group discovered a memory pressure during 1st phase, then start
+        // outputing some partition downstream in order to free memory.
+        if ( earlyOutput ) {
+          outputCurrentBatch();
+          incIndex(); // next time continue with the next incoming row
+          return AggOutcome.RETURN_OUTCOME;
         }
+      }
+
+      if (EXTRA_DEBUG_1) {
+        logger.debug("Processed {} records", underlyingIndex);
+      }
 
-        if (EXTRA_DEBUG_1) {
-          logger.debug("Processed {} records", underlyingIndex);
+      // Cleanup the previous batch since we are done processing it.
+      for (VectorWrapper<?> v : incoming) {
+        v.getValueVector().clear();
+      }
+      //
+      // Get the NEXT input batch, initially from the upstream, later (if there was a spill)
+      // from one of the spill files (The spill case is handled differently here to avoid
+      // collecting stats on the spilled records)
+      //
+      if ( handlingSpills ) {
+        outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP;
+      } else {
+        long beforeAlloc = allocator.getAllocatedMemory();
+
+        // Get the next RecordBatch from the incoming (i.e. upstream operator)
+        outcome = outgoing.next(0, incoming);
+
+        // If incoming batch is bigger than our estimate - adjust the estimate to match
+        long afterAlloc = allocator.getAllocatedMemory();
+        long incomingBatchSize = afterAlloc - beforeAlloc;
+        if ( estMaxBatchSize < incomingBatchSize) {
+          logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}", incomingBatchSize, estMaxBatchSize);
+          estMaxBatchSize = incomingBatchSize;
         }
+      }
 
-        try {
+      if (EXTRA_DEBUG_1) {
+        logger.debug("Received IterOutcome of {}", outcome);
+      }
 
-          while (true) {
-            // Cleanup the previous batch since we are done processing it.
-            for (VectorWrapper<?> v : incoming) {
-              v.getValueVector().clear();
-            }
-            IterOutcome out = outgoing.next(0, incoming);
-            if (EXTRA_DEBUG_1) {
-              logger.debug("Received IterOutcome of {}", out);
-            }
-            switch (out) {
-              case OUT_OF_MEMORY:
-              case NOT_YET:
-                this.outcome = out;
-                return AggOutcome.RETURN_OUTCOME;
-
-              case OK_NEW_SCHEMA:
-                if (EXTRA_DEBUG_1) {
-                  logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
-                }
-//                newSchema = true;
-                this.cleanup();
-                // TODO: new schema case needs to be handled appropriately
-                return AggOutcome.UPDATE_AGGREGATOR;
-
-              case OK:
-                resetIndex();
-                if (incoming.getRecordCount() == 0) {
-                  continue;
-                } else {
-                  checkGroupAndAggrValues(currentIndex);
-                  incIndex();
-
-                  if (EXTRA_DEBUG_1) {
-                    logger.debug("Continuing outside loop");
-                  }
-                  continue outside;
-                }
-
-              case NONE:
-                // outcome = out;
-
-                buildComplete = true;
-
-                updateStats(htable);
-
-                // output the first batch; remaining batches will be output
-                // in response to each next() call by a downstream operator
-
-                outputCurrentBatch();
-
-                // return setOkAndReturn();
-                return AggOutcome.RETURN_OUTCOME;
-
-              case STOP:
-              default:
-                outcome = out;
-                return AggOutcome.CLEANUP_AND_RETURN;
-            }
+      // Handle various results from getting the next batch
+      switch (outcome) {
+        case OUT_OF_MEMORY:
+        case NOT_YET:
+          return AggOutcome.RETURN_OUTCOME;
+
+        case OK_NEW_SCHEMA:
+          if (EXTRA_DEBUG_1) {
+            logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
           }
+          this.cleanup();
+          // TODO: new schema case needs to be handled appropriately
+          return AggOutcome.UPDATE_AGGREGATOR;
 
-        } finally {
-          // placeholder...
-        }
+        case OK:
+          currentBatchRecordCount = incoming.getRecordCount(); // size of next batch
+
+          resetIndex(); // initialize index (a new batch needs to be processed)
+
+          if (EXTRA_DEBUG_1) {
+            logger.debug("Continue to start processing the next batch");
+          }
+          break;
+
+        case NONE:
+          resetIndex(); // initialize index (in case spill files need to be processed)
+
+          buildComplete = true;
+
+          updateStats(htables);
+
+          // output the first batch; remaining batches will be output
+          // in response to each next() call by a downstream operator
+          AggIterOutcome aggOutcome = outputCurrentBatch();
+
+          if ( aggOutcome == AggIterOutcome.AGG_RESTART ) {
+            // Output of first batch returned a RESTART (all new partitions were spilled)
+            return AggOutcome.CALL_WORK_AGAIN; // need to read/process the next partition
+          }
+
+          if ( aggOutcome != AggIterOutcome.AGG_NONE ) { outcome = IterOutcome.OK; }
+
+          return AggOutcome.RETURN_OUTCOME;
+
+        case STOP:
+        default:
+          return AggOutcome.CLEANUP_AND_RETURN;
       }
-    } finally {
     }
   }
 
+  /**
+   *   Allocate space for the returned aggregate columns
+   *   (Note DRILL-5588: Maybe can eliminate this allocation (and copy))
+   * @param records
+   */
   private void allocateOutgoing(int records) {
     // Skip the keys and only allocate for outputting the workspace values
     // (keys will be output through splitAndTransfer)
@@ -382,14 +649,8 @@ public abstract class HashAggTemplate implements HashAggregator {
     while (outgoingIter.hasNext()) {
       @SuppressWarnings("resource")
       ValueVector vv = outgoingIter.next().getValueVector();
-//      MajorType type = vv.getField().getType();
 
-      /*
-       * In build schema we use the allocation model that specifies exact record count
-       * so we need to stick with that allocation model until DRILL-2211 is resolved. Using
-       * 50 as the average bytes per value as is used in HashTable.
-       */
-      AllocationHelper.allocatePrecomputedChildCount(vv, records, VARIABLE_WIDTH_VALUE_SIZE, 0);
+      AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
     }
   }
 
@@ -400,45 +661,82 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   @Override
   public int getOutputCount() {
-    // return outputCount;
     return lastBatchOutputCount;
   }
 
   @Override
   public void cleanup() {
-    if (htable != null) {
-      htable.clear();
-      htable = null;
+    if ( schema == null ) { return; } // not set up; nothing to clean
+    if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+      stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+          (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+    }
+    // clean (and deallocate) each partition
+    for ( int i = 0; i < numPartitions; i++) {
+          if (htables[i] != null) {
+              htables[i].clear();
+              htables[i] = null;
+          }
+          if ( batchHolders[i] != null) {
+              for (BatchHolder bh : batchHolders[i]) {
+                    bh.clear();
+              }
+              batchHolders[i].clear();
+              batchHolders[i] = null;
+          }
+
+          // delete any (still active) output spill file
+          if ( outputStream[i] != null && spillFiles[i] != null) {
+            try {
+              outputStream[i].close();
+              outputStream[i] = null;
+              spillSet.delete(spillFiles[i]);
+              spillFiles[i] = null;
+            } catch(IOException e) {
+              logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]);
+            }
+          }
     }
+    // delete any spill file left in unread spilled partitions
+    while ( ! spilledPartitionsList.isEmpty() ) {
+        SpilledPartition sp = spilledPartitionsList.remove(0);
+        try {
+          spillSet.delete(sp.spillFile);
+        } catch(IOException e) {
+          logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
+        }
+    }
+    // Delete the currently handled (if any) spilled file
+    if ( newIncoming != null ) { newIncoming.close();  }
+    spillSet.close(); // delete the spill directory(ies)
     htIdxHolder = null;
     materializedValueFields = null;
     outStartIdxHolder = null;
     outNumRecordsHolder = null;
+  }
 
-    if (batchHolders != null) {
-      for (BatchHolder bh : batchHolders) {
+  // First free the memory used by the given (spilled) partition (i.e., hash table plus batches)
+  // then reallocate them in pristine state to allow the partition to continue receiving rows
+  private void reinitPartition(int part) /* throws SchemaChangeException /*, IOException */ {
+    assert htables[part] != null;
+    htables[part].reset();
+    if ( batchHolders[part] != null) {
+      for (BatchHolder bh : batchHolders[part]) {
         bh.clear();
       }
-      batchHolders.clear();
-      batchHolders = null;
+      batchHolders[part].clear();
     }
+    batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
   }
 
-//  private final AggOutcome setOkAndReturn() {
-//    this.outcome = IterOutcome.OK;
-//    for (VectorWrapper<?> v : outgoing) {
-//      v.getValueVector().getMutator().setValueCount(outputCount);
-//    }
-//    return AggOutcome.RETURN_OUTCOME;
-//  }
-
   private final void incIndex() {
     underlyingIndex++;
-    if (underlyingIndex >= incoming.getRecordCount()) {
+    if (underlyingIndex >= currentBatchRecordCount) {
       currentIndex = Integer.MAX_VALUE;
       return;
     }
-    currentIndex = getVectorIndex(underlyingIndex);
+    try { currentIndex = getVectorIndex(underlyingIndex); }
+    catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
   }
 
   private final void resetIndex() {
@@ -446,71 +744,337 @@ public abstract class HashAggTemplate implements HashAggregator {
     incIndex();
   }
 
-  private void addBatchHolder() {
+  private boolean isSpilled(int part) {
+    return outputStream[part] != null;
+  }
+  /**
+   * Which partition to choose for flushing out (i.e. spill or return) ?
+   * - The current partition (to which a new bach holder is added) has a priority,
+   *   because its last batch holder is full.
+   * - Also the largest prior spilled partition has some priority, as it is already spilled;
+   *   but spilling too few rows (e.g. a single batch) gets us nothing.
+   * - So the largest non-spilled partition has some priority, to get more memory freed.
+   * Need to weigh the above three options.
+   *
+   *  @param currPart - The partition that hit the memory limit (gets a priority)
+   *  @return The partition (number) chosen to be spilled
+   */
+  private int chooseAPartitionToFlush(int currPart) {
+    if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition
+    int currPartSize = batchHolders[currPart].size();
+    if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1
+    // first find the largest spilled partition
+    int maxSizeSpilled = -1;
+    int indexMaxSpilled = -1;
+    for (int isp = 0; isp < numPartitions; isp++ ) {
+      if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
+        maxSizeSpilled = batchHolders[isp].size();
+        indexMaxSpilled = isp;
+      }
+    }
+    // Give the current (if already spilled) some priority
+    if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
+      maxSizeSpilled = currPartSize ;
+      indexMaxSpilled = currPart;
+    }
+    // now find the largest non-spilled partition
+    int maxSize = -1;
+    int indexMax = -1;
+    // Use the largest spilled (if found) as a base line, with a factor of 4
+    if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
+      indexMax = indexMaxSpilled;
+      maxSize = 4 * maxSizeSpilled ;
+    }
+    for ( int insp = 0; insp < numPartitions; insp++) {
+      if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
+        indexMax = insp;
+        maxSize = batchHolders[insp].size();
+      }
+    }
+    // again - priority to the current partition
+    if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
+      return currPart;
+    }
+    if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch!
+      return -1; // try skipping this spill
+    }
+    return indexMax;
+  }
+
+  /**
+   * Iterate through the batches of the given partition, writing them to a file
+   *
+   * @param part The partition (number) to spill
+   */
+  private void spillAPartition(int part) {
+
+    ArrayList<BatchHolder> currPartition = batchHolders[part];
+    rowsInPartition = 0;
+    if ( EXTRA_DEBUG_SPILL ) {
+      logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size());
+    }
+
+    if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill
+
+    // If this is the first spill for this partition, create an output stream
+    if ( ! isSpilled(part) ) {
+
+      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null);
+
+      try {
+        outputStream[part] = spillSet.openForOutput(spillFiles[part]);
+      } catch (IOException ioe) {
+        throw UserException.resourceError(ioe)
+            .message("Hash Aggregation failed to open spill file: " + spillFiles[part])
+            .build(logger);
+      }
+    }
+
+    for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) {
+
+      // get the number of records in the batch holder that are pending output
+      int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
+
+      rowsInPartition += numPendingOutput;  // for logging
+      rowsSpilled += numPendingOutput;
+
+      allocateOutgoing(numPendingOutput);
+
+      currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+      int numOutputRecords = outNumRecordsHolder.value;
+
+      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+
+      // set the value count for outgoing batch value vectors
+      /* int i = 0; */
+      for (VectorWrapper<?> v : outgoing) {
+        v.getValueVector().getMutator().setValueCount(numOutputRecords);
+        /*
+        // print out the first row to be spilled ( varchar, varchar, bigint )
+        try {
+          if (i++ < 2) {
+            NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector());
+            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+          } else {
+            NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector());
+            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+          }
+        } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); }
+        */
+      }
+
+      outContainer.setRecordCount(numPendingOutput);
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
+      VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
+      Stopwatch watch = Stopwatch.createStarted();
+      try {
+        outputBatch.writeToStream(outputStream[part]);
+      } catch (IOException ioe) {
+        throw UserException.dataWriteError(ioe)
+            .message("Hash Aggregation failed to write to output stream: " + outputStream[part].toString())
+            .build(logger);
+      }
+      outContainer.zeroVectors();
+      logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
+    }
+
+    spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches
+
+    logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part);
+  }
+
+  private void addBatchHolder(int part) {
+
     BatchHolder bh = newBatchHolder();
-    batchHolders.add(bh);
+    batchHolders[part].add(bh);
 
     if (EXTRA_DEBUG_1) {
-      logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+      logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size());
     }
 
     bh.setup();
   }
 
-  // Overridden in the generated class when created as plain Java code.
-
+  // These methods are overridden in the generated class when created as plain Java code.
   protected BatchHolder newBatchHolder() {
     return new BatchHolder();
   }
 
+  /**
+   * Output the next batch from partition "nextPartitionToReturn"
+   *
+   * @return iteration outcome (e.g., OK, NONE ...)
+   */
   @Override
-  public IterOutcome outputCurrentBatch() {
-    if (outBatchIndex >= batchHolders.size()) {
-      this.outcome = IterOutcome.NONE;
-      return outcome;
+  public AggIterOutcome outputCurrentBatch() {
+
+    // when incoming was an empty batch, just finish up
+    if ( schema == null ) {
+      logger.trace("Incoming was empty; output is an empty batch.");
+      this.outcome = IterOutcome.NONE; // no records were read
+      allFlushed = true;
+      return AggIterOutcome.AGG_NONE;
     }
 
-    // get the number of records in the batch holder that are pending output
-    int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput();
+    // Initialization (covers the case of early output)
+    ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition];
+    int currOutBatchIndex = outBatchIndex[earlyPartition];
+    int partitionToReturn = earlyPartition;
+
+    if ( ! earlyOutput ) {
+      // Update the next partition to return (if needed)
+      // skip fully returned (or spilled) partitions
+      while (nextPartitionToReturn < numPartitions) {
+        //
+        // If this partition was spilled - spill the rest of it and skip it
+        //
+        if ( isSpilled(nextPartitionToReturn) ) {
+          spillAPartition(nextPartitionToReturn); // spill the rest
+          SpilledPartition sp = new SpilledPartition();
+          sp.spillFile = spillFiles[nextPartitionToReturn];
+          sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
+          sp.cycleNum = cycleNum; // remember the current cycle
+          sp.origPartn = nextPartitionToReturn; // for debugging / filename
+          sp.prevOrigPartn = originalPartition; // for debugging / filename
+          spilledPartitionsList.add(sp);
+
+          reinitPartition(nextPartitionToReturn); // free the memory
+          long posn = spillSet.getPosition(outputStream[nextPartitionToReturn]);
+          spillSet.tallyWriteBytes(posn); // for the IO stats
+          try {
+            outputStream[nextPartitionToReturn].close();
+          } catch (IOException ioe) {
+            throw UserException.resourceError(ioe)
+                .message("IO Error while closing output stream")
+                .build(logger);
+          }
+          outputStream[nextPartitionToReturn] = null;
+        }
+        else {
+          currPartition = batchHolders[nextPartitionToReturn];
+          currOutBatchIndex = outBatchIndex[nextPartitionToReturn];
+          // If curr batch (partition X index) is not empty - proceed to return it
+          if (currOutBatchIndex < currPartition.size() && 0 != currPartition.get(currOutBatchIndex).getNumPendingOutput()) {
+            break;
+          }
+        }
+        nextPartitionToReturn++; // else check next partition
+      }
+
+      // if passed the last partition - either done or need to restart and read spilled partitions
+      if (nextPartitionToReturn >= numPartitions) {
+        // The following "if" is probably never used; due to a similar check at the end of this method
+        if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
+          allFlushed = true;
+          this.outcome = IterOutcome.NONE;
+          if ( is2ndPhase ) {
+            stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+                (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+          }
+          return AggIterOutcome.AGG_NONE;  // then return NONE
+        }
+        // Else - there are still spilled partitions to process - pick one and handle just like a new incoming
+        buildComplete = false; // go back and call doWork() again
+        handlingSpills = true; // beginning to work on the spill files
+        // pick a spilled partition; set a new incoming ...
+        SpilledPartition sp = spilledPartitionsList.remove(0);
+        // Create a new "incoming" out of the spilled partition spill file
+        newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches, context, schema, oContext, spillSet);
+        originalPartition = sp.origPartn; // used for the filename
+        logger.trace("Reading back spilled original partition {} as an incoming",originalPartition);
+        // Initialize .... new incoming, new set of partitions
+        try { initializeSetup(newIncoming); } catch (Exception e) { throw new RuntimeException(e); }
+        // update the cycle num if needed
+        // The current cycle num should always be one larger than in the spilled partition
+        if ( cycleNum == sp.cycleNum ) {
+          cycleNum = 1 + sp.cycleNum;
+          stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
+          // report first spill or memory stressful situations
+          if ( cycleNum == 1 ) { logger.info("Started reading spilled records "); }
+          if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
+          if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); }
+          if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); }
+          if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); }
+        }
+        if ( EXTRA_DEBUG_SPILL ) {
+          logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {} batches). More {} spilled partitions left.",
+              sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, spilledPartitionsList.size());
+        }
+        return AggIterOutcome.AGG_RESTART;
+      }
+
+      partitionToReturn = nextPartitionToReturn ;
 
-    if (numPendingOutput == 0) {
-      this.outcome = IterOutcome.NONE;
-      return outcome;
     }
 
+    // get the number of records in the batch holder that are pending output
+    int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
+
+    // The following accounting is for logging, metrics, etc.
+    rowsInPartition += numPendingOutput ;
+    if ( ! handlingSpills ) { rowsNotSpilled += numPendingOutput; }
+    else { rowsSpilledReturned += numPendingOutput; }
+    if ( earlyOutput ) { rowsReturnedEarly += numPendingOutput; }
+
     allocateOutgoing(numPendingOutput);
 
-    batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+    currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
     int numOutputRecords = outNumRecordsHolder.value;
 
     if (EXTRA_DEBUG_1) {
       logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value);
     }
-    this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+
+    this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
 
     // set the value count for outgoing batch value vectors
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(numOutputRecords);
     }
 
-//    outputCount += numOutputRecords;
-
     this.outcome = IterOutcome.OK;
 
-    logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords);
+    if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
+      logger.debug("So far returned {} + SpilledReturned {}  total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned,
+        rowsNotSpilled+rowsSpilledReturned,
+        rowsSpilled);
+    }
 
     lastBatchOutputCount = numOutputRecords;
-    outBatchIndex++;
-    if (outBatchIndex == batchHolders.size()) {
-      allFlushed = true;
+    outBatchIndex[partitionToReturn]++;
+    // if just flushed the last batch in the partition
+    if (outBatchIndex[partitionToReturn] == currPartition.size()) {
+
+      if ( EXTRA_DEBUG_SPILL ) {
+        logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {} rows",
+            earlyOutput ? "(Early)" : "",
+            partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition);
+      }
+      rowsInPartition = 0; // reset to count for the next partition
+
+      // deallocate memory used by this partition, and re-initialize
+      reinitPartition(partitionToReturn);
 
-      logger.debug("HashAggregate: All batches flushed.");
+      if ( earlyOutput ) {
 
-      // cleanup my internal state since there is nothing more to return
-      this.cleanup();
+        if ( EXTRA_DEBUG_SPILL ) {
+          logger.debug("HASH AGG: Finished (early) re-init partition {}, mem allocated: {}", earlyPartition, allocator.getAllocatedMemory());
+        }
+        outBatchIndex[earlyPartition] = 0; // reset, for next time
+        earlyOutput = false ; // done with early output
+      }
+      else if ( (partitionToReturn + 1 == numPartitions) && spilledPartitionsList.isEmpty() ) { // last partition ?
+
+        allFlushed = true; // next next() call will return NONE
+
+        logger.trace("HashAggregate: All batches flushed.");
+
+        // cleanup my internal state since there is nothing more to return
+        this.cleanup();
+      }
     }
 
-    return this.outcome;
+    return AggIterOutcome.AGG_OK;
   }
 
   @Override
@@ -522,11 +1086,33 @@ public abstract class HashAggTemplate implements HashAggregator {
   public boolean buildComplete() {
     return buildComplete;
   }
+  @Override
+  public boolean earlyOutput() { return earlyOutput; }
 
   public int numGroupedRecords() {
     return numGroupedRecords;
   }
 
+  /**
+   *  Generate a detailed error message in case of "Out Of Memory"
+   * @return err msg
+   */
+  private String getOOMErrorMsg() {
+    String errmsg;
+    if ( !isTwoPhase ) {
+      errmsg = "Single Phase Hash Aggregate operator can not spill." ;
+    } else if ( ! canSpill ) {  // 2nd phase, with only 1 partition
+      errmsg = "Too little memory available to operator to facilitate spilling.";
+    } else { // a bug ?
+      errmsg = "OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions +
+      ". Estimated batch size: " + estMaxBatchSize + ". Planned batches: " + plannedBatches;
+      if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
+    }
+    errmsg += " Memory limit: " + allocator.getLimit() + " so far allocated: " + allocator.getAllocatedMemory() + ". ";
+
+    return errmsg;
+  }
+
   // Check if a group is present in the hash table; if not, insert it in the hash table.
   // The htIdxHolder contains the index of the group in the hash table container; this same
   // index is also used for the aggregation values maintained by the hash aggregate.
@@ -535,6 +1121,8 @@ public abstract class HashAggTemplate implements HashAggregator {
       throw new IllegalArgumentException("Invalid incoming row index.");
     }
 
+    assert ! earlyOutput;
+
     /** for debugging
      Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
      BigIntVector vv0 = null;
@@ -546,44 +1134,189 @@ public abstract class HashAggTemplate implements HashAggregator {
      holder.value = vv0.getAccessor().get(incomingRowIdx) ;
      }
      */
+    /*
+    if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
+      // for debugging -- show the first row from a spilled batch
+      Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
+      Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
+      Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
+
+      if (tmp0 != null && tmp1 != null && tmp2 != null) {
+        NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
+        NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
+        NullableBigIntVector  vv2 = ((NullableBigIntVector) tmp2);
+        logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx));
+      }
+    }
+    */
+    // The hash code is computed once, then its lower bits are used to determine the
+    // partition to use, and the higher bits determine the location in the hash table.
+    int hashCode;
+    try {
+      htables[0].updateBatches();
+      hashCode = htables[0].getHashCode(incomingRowIdx);
+    } catch (SchemaChangeException e) {
+      throw new UnsupportedOperationException("Unexpected schema change", e);
+    }
 
-    htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
+    // right shift hash code for secondary (or tertiary...) spilling
+    for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; }
 
-    int currentIdx = htIdxHolder.value;
+    int currentPartition = hashCode & partitionMask ;
+    hashCode >>>= bitsInMask;
+    HashTable.PutStatus putStatus = null;
+    long allocatedBefore = allocator.getAllocatedMemory();
 
-    // get the batch index and index within the batch
-    if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
-      addBatchHolder();
+    // Insert the key columns into the hash table
+    try {
+      putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
+    } catch (OutOfMemoryException exc) {
+      throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+    } catch (SchemaChangeException e) {
+      throw new UnsupportedOperationException("Unexpected schema change", e);
     }
-    BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
-    int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
+    int currentIdx = htIdxHolder.value;
 
-    // Check if we have almost filled up the workspace vectors and add a batch if necessary
-    if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) {
-      htable.addNewKeyBatch();
-      addBatchHolder();
-      bh.allocatedNextBatch = true;
+    long addedMem = allocator.getAllocatedMemory() - allocatedBefore;
+    if ( addedMem > 0 ) {
+      logger.trace("MEMORY CHECK HT: allocated {}  added {} partition {}",allocatedBefore,addedMem,currentPartition);
     }
 
+    // Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch
+    // (for the aggregate columns) needs to be created
+    if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
+      try {
+        long allocatedBeforeAggCol = allocator.getAllocatedMemory();
+
+        addBatchHolder(currentPartition);
+
+        if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch
+        long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore;
+        logger.trace("MEMORY CHECK AGG: added {}  total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem);
+        // resize the batch estimate if needed (e.g., varchars may take more memory than estimated)
+        if ( totalAddedMem > estMaxBatchSize ) {
+          logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem);
+          estMaxBatchSize = totalAddedMem;
+        }
+      } catch (OutOfMemoryException exc) {
+        throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+      }
+    }
+    BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & HashTable.BATCH_MASK);
+    int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
 
     if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
       numGroupedRecords++;
     }
+
+    // ===================================================================================
+    // If the last batch just became full - that is the time to check the memory limits !!
+    // If exceeded, then need to spill (if 2nd phase) or output early (1st)
+    // (Skip this if cannot spill; in such case an OOM may be encountered later)
+    // ===================================================================================
+    if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST && canSpill ) {
+
+      plannedBatches++; // planning to allocate one more batch
+
+      // calculate the (max) new memory needed now
+      long hashTableDoublingSizeNeeded = 0; // in case the hash table(s) would resize
+      for ( HashTable ht : htables ) {
+        hashTableDoublingSizeNeeded += ht.extraMemoryNeededForResize();
+      }
+
+      // Plan ahead for at least MIN batches, to account for size changing, and some overhead
+      long maxMemoryNeeded = minBatchesPerPartition * plannedBatches *
+          ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) ) +
+          hashTableDoublingSizeNeeded;
+
+      // log a detailed debug message explaining why a spill may be needed
+      logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " +
+          "Memory needed {}, Est batch size {}, mem limit {}",
+          allocator.getAllocatedMemory(), isTwoPhase?(is2ndPhase?"2ND":"1ST"):"Single", currentPartition,
+          batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, memoryLimit);
+      //
+      //   Spill if the allocated memory plus the memory needed exceeds the memory limit.
+      //
+      if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
+
+        // Pick a "victim" partition to spill or return
+        int victimPartition = chooseAPartitionToFlush(currentPartition);
+
+        // In case no partition has more than one batch -- try and "push the limits"; maybe next
+        // time the spill could work.
+        if ( victimPartition < 0 ) { return; }
+
+        if ( is2ndPhase ) {
+          long before = allocator.getAllocatedMemory();
+
+          spillAPartition(victimPartition);
+          logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition);
+
+          // Re-initialize (free memory, then recreate) the partition just spilled/returned
+          reinitPartition(victimPartition);
+
+          // in some "edge" cases (e.g. testing), spilling one partition may not be enough
+          if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
+              int victimPartition2 = chooseAPartitionToFlush(victimPartition);
+              if ( victimPartition2 < 0 ) { return; }
+              long after = allocator.getAllocatedMemory();
+              spillAPartition(victimPartition2);
+              reinitPartition(victimPartition2);
+              logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}",
+                  before, after, allocator.getAllocatedMemory(), maxMemoryNeeded);
+              logger.trace("Second Partition Spilled: {}",victimPartition2);
+          }
+        }
+        else {
+          // 1st phase need to return a partition early in order to free some memory
+          earlyOutput = true;
+          earlyPartition = victimPartition;
+
+          if ( EXTRA_DEBUG_SPILL ) {
+            logger.debug("picked partition {} for early output", victimPartition);
+          }
+        }
+      }
+    }
   }
 
-  private void updateStats(HashTable htable) {
-    htable.getStats(htStats);
+  /**
+   * Updates the stats at the time after all the input was read.
+   * Note: For spilled partitions, their hash-table stats from before the spill are lost.
+   * And the SPILLED_PARTITIONS only counts the spilled partitions in the primary, not SECONDARY etc.
+   * @param htables
+   */
+  private void updateStats(HashTable[] htables) {
+    if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
+    long numSpilled = 0;
+    HashTableStats newStats = new HashTableStats();
+    // sum the stats from all the partitions
+    for (int ind = 0; ind < numPartitions; ind++) {
+      htables[ind].getStats(newStats);
+      htStats.addStats(newStats);
+      if (isSpilled(ind)) {
+        numSpilled++;
+      }
+    }
     this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
     this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
     this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
     this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+    this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
+    if ( is2ndPhase ) {
+      this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
+    }
+    if ( rowsReturnedEarly > 0 ) {
+      stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early
+          (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0));
+    }
   }
 
   // Code-generated methods (implemented in HashAggBatch)
-  public abstract void doSetup(@Named("incoming") RecordBatch incoming);
+  public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException;
 
-  public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
+  public abstract int getVectorIndex(@Named("recordIndex") int recordIndex) throws SchemaChangeException;
 
-  public abstract boolean resetValues();
+  public abstract boolean resetValues() throws SchemaChangeException;
 
 }